diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..c7597d5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +/.venv +/.idea +/.vscode +/.git +/samples +/__pycache__ diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e5c29bf --- /dev/null +++ b/.gitignore @@ -0,0 +1,161 @@ +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/#use-with-ide +.pdm.toml + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +.idea/ diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..4c9aa96 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,16 @@ +# docker build . --platform linux/amd64 --push -t leafdev.top/models/nailong-detector:v0.0.2 +#FROM python:3.12.7 +FROM docker.io/pytorch/pytorch:2.5.1-cuda12.4-cudnn9-runtime + +# +WORKDIR /app + +COPY requirements.txt /app +RUN pip install --no-cache-dir --upgrade -r requirements.txt +RUN apt update && apt-get install ffmpeg libsm6 libxext6 -y +RUN rm -rf /var/cache/apt/archives /var/lib/apt/lists/*. +# +COPY . /app + +# +CMD ["uvicorn", "main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"] \ No newline at end of file diff --git a/README.md b/README.md index a158741..bf50c60 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,3 @@ -# 使用方法 -先安装 文件所需要的依赖模块 -pip install -r requirements.txt -再运行mainapi.py文件即可 +# ICP 备案查询API -# 环境要求 -Python >= 3 - -# 请求示例 - -如图箭头显示 - -![url链接](https://raw.githubusercontent.com/Suxiaoqinx/icp_api/refs/heads/main/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png) - -## 参数列表 - -请求链接选择 http://ip:port/query - -请求方式 GET - -| 参数列表 | 参数说明 | -| ---- | ---- | -| domain | 域名url地址| -| type | cache(则缓存) 为空则不缓存 | - -# 感谢 -[Ravizhan](https://github.com/ravizhan) -[原项目地址](https://github.com/ravizhan/ICP-spider) - -# 反馈方法 -请在Github的lssues反馈 或者到我[博客](https://www.toubiec.cn)反馈 +感谢: https://github.com/Suxiaoqinx/icp_api \ No newline at end of file diff --git a/cache.py b/cache.py new file mode 100644 index 0000000..fb7c9d7 --- /dev/null +++ b/cache.py @@ -0,0 +1,26 @@ +import config +import redis +from models import QueryResponse, ICPRecord +from typing import List, Optional + +# test redis +r = redis.Redis(host=config.REDIS_HOST, port=config.REDIS_PORT, password=config.REDIS_PASSWORD, db=config.REDIS_DB) + +# if ping failed, raise exception +if not r.ping(): + raise Exception("Redis ping failed") + + +MAX_EXPIRE_TIME = 6 * 60 * 60 + +def save_to_cache(domain: str, data: List[ICPRecord]): + # 将data转换为json字符串 + # 直接转换 + data_json = QueryResponse(cached=False, count=len(data), data=data).model_dump_json() + r.set(f"{config.REDIS_PREFIX}:{domain}", data_json, ex=MAX_EXPIRE_TIME) + +def load_from_cache(domain: str) -> Optional[List[ICPRecord]]: + data_json = r.get(f"{config.REDIS_PREFIX}:{domain}") + if data_json: + return QueryResponse.model_validate_json(data_json).data + return None diff --git a/config.py b/config.py new file mode 100644 index 0000000..e515c1c --- /dev/null +++ b/config.py @@ -0,0 +1,14 @@ +import os + + +REDIS_HOST = os.environ.get("REDIS_HOST", "localhost") + +REDIS_PORT = os.environ.get("REDIS_PORT", "6379") + +REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "") + +REDIS_DB = os.environ.get("REDIS_DB", "0") + +# prefix +REDIS_PREFIX = os.environ.get("REDIS_PREFIX", "icp:") + diff --git a/crack.py b/detect.py similarity index 93% rename from crack.py rename to detect.py index 7d876ac..282520b 100644 --- a/crack.py +++ b/detect.py @@ -4,7 +4,7 @@ import cv2 import numpy as np -class Crack: +class Detect: def __init__(self): pass @@ -22,7 +22,7 @@ class Crack: def detect(self, big_img): confidence_thres = 0.7 iou_thres = 0.7 - session = onnxruntime.InferenceSession("yolov8.onnx") + session = onnxruntime.InferenceSession("./models/yolov8.onnx") model_inputs = session.get_inputs() self.big_img = self.read_base64_image(big_img) @@ -58,7 +58,7 @@ class Crack: return new_boxes def siamese(self, small_img, boxes): - session = onnxruntime.InferenceSession("siamese.onnx") + session = onnxruntime.InferenceSession("./models/siamese.onnx") positions = [165, 200, 231, 265] result_list = [] for x in positions: @@ -93,8 +93,3 @@ class Crack: break return result_list - -if __name__ == '__main__': - crack = Crack() - boxes = crack.detect("./1.png") - print(crack.siamese("./2.png", boxes)) diff --git a/main.py b/main.py index 1503c3b..4eb63e4 100644 --- a/main.py +++ b/main.py @@ -3,13 +3,57 @@ import json import requests import hashlib import time +import os from urllib import parse -from crack import Crack import uuid +from fastapi import FastAPI, HTTPException, Query +from fastapi.responses import JSONResponse +from pydantic import BaseModel, Field +from typing import List, Optional from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend +from detect import Detect +import functools +import logging +from models import QueryResponse +from cache import save_to_cache, load_from_cache +import random +import re + +# 配置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) +def retry_with_backoff(retries=3, backoff_in_seconds=1): + def decorator(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + x = 0 + while True: + try: + return func(*args, **kwargs) + except Exception as e: + if x == retries: + logger.error(f"Failed after {retries} retries. Error: {str(e)}") + raise + # wait = (backoff_in_seconds * (2 ** x) + + # random.uniform(0, 1)) + # logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying in {wait:.2f} seconds...") + # time.sleep(wait) + logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying...") + x += 1 + return wrapper + return decorator + +app = FastAPI( + title="ICP查询API", + description="提供ICP备案信息查询服务", + version="1.0.0" +) + + +@retry_with_backoff(retries=3) def auth(): t = str(round(time.time())) data = { @@ -26,18 +70,71 @@ def auth(): "Accept-Language": "zh-CN,zh;q=0.9", "Origin": "https://beian.miit.gov.cn" } - try: - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers, - data=parse.urlencode(data)).text - return json.loads(resp)["params"]["bussiness"] - except Exception: - time.sleep(5) - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers, - data=parse.urlencode(data)).text - return json.loads(resp)["params"]["bussiness"] + + resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", + headers=headers, + data=parse.urlencode(data)) + resp.raise_for_status() + resp_data = resp.json() + + # 验证响应格式和状态 + if not isinstance(resp_data, dict): + raise ValueError("Response is not a valid JSON object") + + if not all(key in resp_data for key in ['code', 'success']): + raise ValueError("Missing required fields in response") + + if resp_data['code'] != 200 or not resp_data['success']: + raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") + + if 'params' not in resp_data or 'bussiness' not in resp_data['params']: + raise ValueError("Missing params.bussiness in response") + + return resp_data["params"]["bussiness"] -def getImage(): + + + +@retry_with_backoff(retries=3) +def query(sign, uuid_token, domain, token): + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Referer": "https://beian.miit.gov.cn/", + "Token": token, + "Sign": sign, + "Uuid": uuid_token, + "Connection": "keep-alive", + "Accept": "application/json, text/plain, */*", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9", + "Origin": "https://beian.miit.gov.cn", + "Content-Type": "application/json", + "Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32]) + } + data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1} + + resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition", + headers=headers, + data=json.dumps(data).replace(" ", "")) + resp.raise_for_status() + resp_data = resp.json() + + # 验证响应格式和状态 + if not isinstance(resp_data, dict): + raise ValueError("Response is not a valid JSON object") + + if not all(key in resp_data for key in ['code', 'success']): + raise ValueError("Missing required fields in response") + + if resp_data['code'] != 200 or not resp_data['success']: + raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") + + return json.dumps(resp_data) + + +@retry_with_backoff(retries=3) +def getImage(token): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", @@ -51,15 +148,26 @@ def getImage(): payload = { "clientUid": "point-" + str(uuid.uuid4()) } - try: - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", - headers=headers, json=payload).json() - return resp["params"], payload["clientUid"] - except Exception: - time.sleep(5) - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", - headers=headers, json=payload).json() - return resp["params"], payload["clientUid"] + + resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", + headers=headers, json=payload) + resp.raise_for_status() + resp_data = resp.json() + + # 验证响应格式和状态 + if not isinstance(resp_data, dict): + raise ValueError("Response is not a valid JSON object") + + if not all(key in resp_data for key in ['code', 'success']): + raise ValueError("Missing required fields in response") + + if resp_data['code'] != 200 or not resp_data['success']: + raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") + + if 'params' not in resp_data: + raise ValueError("Missing params in response") + + return resp_data["params"], payload["clientUid"] def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16): @@ -76,22 +184,20 @@ def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16): def generate_pointjson(big_img, small_img, secretKey): - boxes = crack.detect(big_img) - if boxes: - print("文字检测成功") - else: - print("文字检测失败,请重试") - raise Exception("文字检测失败,请重试") - points = crack.siamese(small_img, boxes) - print("文字匹配成功") + d = Detect() + boxes = d.detect(big_img) + if not boxes: + logger.error("文字检测失败。") + raise Exception("文字检测失败") + points = d.siamese(small_img, boxes) new_points = [[p[0] + 20, p[1] + 20] for p in points] pointJson = [{"x": p[0], "y": p[1]} for p in new_points] - # print(json.dumps(pointJson)) enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode()) return enc_pointJson -def checkImage(uuid_token, secretKey, clientUid, pointJson): +@retry_with_backoff(retries=3) +def checkImage(uuid_token, secretKey, clientUid, pointJson, token): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", @@ -108,45 +214,105 @@ def checkImage(uuid_token, secretKey, clientUid, pointJson): "clientUid": clientUid, "pointJson": pointJson } - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers, - json=data).json() - if resp["code"] == 200: - # print(resp["params"]) - return resp["params"]["sign"] - return False + + resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", + headers=headers, + json=data) + resp.raise_for_status() + resp_data = resp.json() + + # 验证响应格式和状态 + if not isinstance(resp_data, dict): + raise ValueError("Response is not a valid JSON object") + + if not all(key in resp_data for key in ['code', 'success']): + raise ValueError("Missing required fields in response") + + if resp_data['code'] != 200 or not resp_data['success']: + return False + + if 'params' not in resp_data or 'sign' not in resp_data['params']: + raise ValueError("Missing params.sign in response") + + return resp_data["params"]["sign"] +@retry_with_backoff(retries=3) +def verify_process(domain): + """整个验证流程的函数,包含获取token、图片验证等所有步骤""" + token = auth() + params, clientUid = getImage(token) + pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"]) + sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token) + + if not sign: + raise ValueError("验证码校验失败") + + result = query(sign, params["uuid"], domain, token) + response = json.loads(result) + return response['params']['list'] -def query(sign, uuid_token, domain): - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Referer": "https://beian.miit.gov.cn/", - "Token": token, - "Sign": sign, - "Uuid": uuid_token, - "Connection": "keep-alive", - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9", - "Origin": "https://beian.miit.gov.cn", - "Content-Type": "application/json", - "Cookie": "__jsluid_s="+str(uuid.uuid4().hex[:32]) - } - data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1} - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition", - headers=headers, data=json.dumps(data).replace(" ","")).text - return resp +@app.get("/query", response_model=QueryResponse, tags=["查询"]) +async def query_api( + domain: str = Query(..., description="要查询的域名"), +): + """ + 查询域名的ICP备案信息 + + - **domain**: 要查询的域名(纯域名,不能包含 https:// 或 http://,也不能包含子域名、端口号和路径) + + 返回: + - 成功时返回ICP备案信息列表 + - 失败时返回错误信息 + """ + try: + if not domain: + raise HTTPException(status_code=400, detail="Missing 'domain' parameter") + # 添加域名判断 + # 域名格式正则:只允许字母、数字、连字符和点,必须有一个点,不能以点或连字符开始或结束 + domain_pattern = r'^(?!-)[A-Za-z0-9-]{1,63}(? 1: + raise HTTPException(status_code=400, detail="不支持子域名,请使用主域名") + + # 检查域名格式 + if not re.match(domain_pattern, domain): + raise HTTPException(status_code=400, detail="域名格式不正确,请使用正确的域名格式(如 example.com)") -crack = Crack() -token = auth() -time.sleep(0.1) -print("正在获取验证码") -params, clientUid = getImage() -pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"]) -time.sleep(0.5) -sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson) -time.sleep(0.5) -if sign: - print(query(sign, params["uuid"],"baidu.com")) -else: - print("failed") + # 从缓存中获取数据 + cached_data = load_from_cache(domain) + if cached_data: + return QueryResponse(cached=True, count=len(cached_data), data=cached_data) + + # 执行验证流程(包含重试机制) + result_list = verify_process(domain) + + # 将结果保存到缓存 + save_to_cache(domain, result_list) + + return QueryResponse(cached=False, count=len(result_list), data=result_list) + + except ValueError as e: + # 验证码校验失败等特定错误 + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + # 其他未预期的错误 + logger.error(f"Unexpected error: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/mainapi.py b/mainapi.py deleted file mode 100644 index 03ec1bb..0000000 --- a/mainapi.py +++ /dev/null @@ -1,211 +0,0 @@ -import base64 -import json -import requests -import hashlib -import time -import os -from urllib import parse -import uuid -from flask import Flask, request, jsonify ,Response -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -from cryptography.hazmat.backends import default_backend -from crack import Crack # 假设 Crack 是一个你自己实现的类 - -app = Flask(__name__) - -CACHE_DIR = './cache' # 缓存文件目录 - -# 创建缓存目录(如果不存在) -if not os.path.exists(CACHE_DIR): - os.makedirs(CACHE_DIR) - - -def auth(): - t = str(round(time.time())) - data = { - "authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(), - "timeStamp": t - } - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Referer": "https://beian.miit.gov.cn/", - "Content-Type": "application/x-www-form-urlencoded", - "Connection": "keep-alive", - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9", - "Origin": "https://beian.miit.gov.cn" - } - try: - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers, - data=parse.urlencode(data)).text - return json.loads(resp)["params"]["bussiness"] - except Exception: - time.sleep(5) - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers, - data=parse.urlencode(data)).text - return json.loads(resp)["params"]["bussiness"] - - -def getImage(token): - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Referer": "https://beian.miit.gov.cn/", - "Token": token, - "Connection": "keep-alive", - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9", - "Origin": "https://beian.miit.gov.cn" - } - payload = { - "clientUid": "point-" + str(uuid.uuid4()) - } - try: - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", - headers=headers, json=payload).json() - return resp["params"], payload["clientUid"] - except Exception: - time.sleep(5) - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", - headers=headers, json=payload).json() - return resp["params"], payload["clientUid"] - - -def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16): - backend = default_backend() - cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend) - - padding_length = block_size - (len(plaintext) % block_size) - plaintext_padded = plaintext + bytes([padding_length]) * padding_length - - encryptor = cipher.encryptor() - ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize() - - return base64.b64encode(ciphertext).decode('utf-8') - - -def generate_pointjson(big_img, small_img, secretKey): - crack = Crack() # 假设 Crack 是一个你实现的类 - boxes = crack.detect(big_img) - if boxes: - print("文字检测成功") - else: - print("文字检测失败,请重试") - raise Exception("文字检测失败,请重试") - points = crack.siamese(small_img, boxes) - print("文字匹配成功") - new_points = [[p[0] + 20, p[1] + 20] for p in points] - pointJson = [{"x": p[0], "y": p[1]} for p in new_points] - enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode()) - return enc_pointJson - - -def checkImage(uuid_token, secretKey, clientUid, pointJson, token): - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Referer": "https://beian.miit.gov.cn/", - "Token": token, - "Connection": "keep-alive", - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9", - "Origin": "https://beian.miit.gov.cn" - } - data = { - "token": uuid_token, - "secretKey": secretKey, - "clientUid": clientUid, - "pointJson": pointJson - } - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers, - json=data).json() - if resp["code"] == 200: - return resp["params"]["sign"] - return False - - -def query(sign, uuid_token, domain, token): - headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Referer": "https://beian.miit.gov.cn/", - "Token": token, - "Sign": sign, - "Uuid": uuid_token, - "Connection": "keep-alive", - "Accept": "application/json, text/plain, */*", - "Accept-Encoding": "gzip, deflate, br", - "Accept-Language": "zh-CN,zh;q=0.9", - "Origin": "https://beian.miit.gov.cn", - "Content-Type": "application/json", - "Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32]) - } - data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1} - resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition", - headers=headers, data=json.dumps(data).replace(" ", "")).text - return resp - - -def save_to_cache(domain, data): - """保存数据到缓存文件""" - cache_path = os.path.join(CACHE_DIR, f"{domain}.json") - with open(cache_path, 'w', encoding='utf-8') as f: - json.dump(data, f) - - -def load_from_cache(domain): - """从缓存文件中加载数据""" - cache_path = os.path.join(CACHE_DIR, f"{domain}.json") - if os.path.exists(cache_path): - with open(cache_path, 'r', encoding='utf-8') as f: - return json.load(f) - return None - - -@app.route('/query', methods=['GET']) -def query_api(): - try: - # 从 URL 查询参数中获取 domain 和 type - domain = request.args.get('domain') - query_type = request.args.get('type', '') # 默认为空字符串 - - if not domain: - return jsonify({"status": "failed", "message": "Missing 'domain' parameter"}), 400 - - # 如果 type=cache,则检查缓存 - if query_type == 'cache': - cached_data = load_from_cache(domain) - if cached_data: - json_data = json.dumps({"status": "successful", "data": cached_data}) - return Response(json_data, content_type='application/json') - - # 如果没有缓存,或未指定 type=cache,则继续进行查询 - crack = Crack() - token = auth() - time.sleep(0.1) - params, clientUid = getImage(token) - pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"]) - time.sleep(0.5) - sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token) - time.sleep(0.5) - - if sign: - result = query(sign, params["uuid"], domain, token) - response = json.loads(result) - json_data = json.dumps({"status": "successful", "data": response['params']['list']}) - - # 如果 type=cache,则将查询结果缓存 - if query_type == 'cache': - save_to_cache(domain, response['params']['list']) - - return Response(json_data, content_type='application/json') - - else: - return jsonify({"status": "failed", "message": "Captcha verification failed"}), 400 - - except Exception as e: - return jsonify({"status": "error", "message": str(e)}), 500 - - -if __name__ == '__main__': - app.run(debug=True, port=5000) diff --git a/models.py b/models.py new file mode 100644 index 0000000..2f9bd2c --- /dev/null +++ b/models.py @@ -0,0 +1,30 @@ +from pydantic import BaseModel, Field +from typing import List, Optional + +# Pydantic 模型 +class ICPRecord(BaseModel): + """ICP备案记录模型""" + contentTypeName: str = Field(default="", description="内容类型名称") + domain: str = Field(default="", description="域名") + domainId: str | int = Field(default="", description="域名ID") + leaderName: str = Field(default="", description="负责人姓名") + limitAccess: str = Field(default="", description="限制访问") + mainId: str | int = Field(default="", description="主体ID") + mainLicence: str = Field(default="", description="主体许可证") + natureName: str = Field(default="", description="性质名称") + serviceId: str | int = Field(default="", description="服务ID") + serviceLicence: str = Field(default="", description="服务许可证") + unitName: str = Field(default="", description="单位名称") + updateRecordTime: str = Field(default="", description="更新记录时间") + + class Config: + """配置模型""" + json_encoders = { + int: str # 将整数类型自动转换为字符串 + } + +class QueryResponse(BaseModel): + """查询响应模型""" + cached: bool = Field(default=False, description="是否从缓存中获取") + count: int = Field(default=0, description="数量") + data: List[ICPRecord] = Field(default_factory=list, description="ICP 记录列表") \ No newline at end of file diff --git a/siamese.onnx b/models/siamese.onnx similarity index 100% rename from siamese.onnx rename to models/siamese.onnx diff --git a/yolov8.onnx b/models/yolov8.onnx similarity index 100% rename from yolov8.onnx rename to models/yolov8.onnx diff --git a/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png b/old/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png similarity index 100% rename from 7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png rename to old/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png diff --git a/old/README.md b/old/README.md new file mode 100644 index 0000000..a158741 --- /dev/null +++ b/old/README.md @@ -0,0 +1,31 @@ +# 使用方法 +先安装 文件所需要的依赖模块 +pip install -r requirements.txt +再运行mainapi.py文件即可 + +# 环境要求 +Python >= 3 + +# 请求示例 + +如图箭头显示 + +![url链接](https://raw.githubusercontent.com/Suxiaoqinx/icp_api/refs/heads/main/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png) + +## 参数列表 + +请求链接选择 http://ip:port/query + +请求方式 GET + +| 参数列表 | 参数说明 | +| ---- | ---- | +| domain | 域名url地址| +| type | cache(则缓存) 为空则不缓存 | + +# 感谢 +[Ravizhan](https://github.com/ravizhan) +[原项目地址](https://github.com/ravizhan/ICP-spider) + +# 反馈方法 +请在Github的lssues反馈 或者到我[博客](https://www.toubiec.cn)反馈 diff --git a/逆向分析.md b/old/逆向分析.md similarity index 100% rename from 逆向分析.md rename to old/逆向分析.md diff --git a/requirements.txt b/requirements.txt index 817c2b3..0a86692 100644 Binary files a/requirements.txt and b/requirements.txt differ