This commit is contained in:
ivamp 2025-01-23 03:34:16 +08:00
parent 1e7b08c571
commit 218679b67b
16 changed files with 523 additions and 317 deletions

6
.dockerignore Normal file
View File

@ -0,0 +1,6 @@
/.venv
/.idea
/.vscode
/.git
/samples
/__pycache__

161
.gitignore vendored Normal file
View File

@ -0,0 +1,161 @@
### Python template
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

16
Dockerfile Normal file
View File

@ -0,0 +1,16 @@
# docker build . --platform linux/amd64 --push -t leafdev.top/models/nailong-detector:v0.0.2
#FROM python:3.12.7
FROM docker.io/pytorch/pytorch:2.5.1-cuda12.4-cudnn9-runtime
#
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir --upgrade -r requirements.txt
RUN apt update && apt-get install ffmpeg libsm6 libxext6 -y
RUN rm -rf /var/cache/apt/archives /var/lib/apt/lists/*.
#
COPY . /app
#
CMD ["uvicorn", "main:app", "--proxy-headers", "--host", "0.0.0.0", "--port", "80"]

View File

@ -1,31 +1,3 @@
# 使用方法
先安装 文件所需要的依赖模块
pip install -r requirements.txt
再运行mainapi.py文件即可
# ICP 备案查询API
# 环境要求
Python >= 3
# 请求示例
如图箭头显示
![url链接](https://raw.githubusercontent.com/Suxiaoqinx/icp_api/refs/heads/main/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png)
## 参数列表
请求链接选择 http://ip:port/query
请求方式 GET
| 参数列表 | 参数说明 |
| ---- | ---- |
| domain | 域名url地址|
| type | cache(则缓存) 为空则不缓存 |
# 感谢
[Ravizhan](https://github.com/ravizhan)
[原项目地址](https://github.com/ravizhan/ICP-spider)
# 反馈方法
请在Github的lssues反馈 或者到我[博客](https://www.toubiec.cn)反馈
感谢: https://github.com/Suxiaoqinx/icp_api

26
cache.py Normal file
View File

@ -0,0 +1,26 @@
import config
import redis
from models import QueryResponse, ICPRecord
from typing import List, Optional
# test redis
r = redis.Redis(host=config.REDIS_HOST, port=config.REDIS_PORT, password=config.REDIS_PASSWORD, db=config.REDIS_DB)
# if ping failed, raise exception
if not r.ping():
raise Exception("Redis ping failed")
MAX_EXPIRE_TIME = 6 * 60 * 60
def save_to_cache(domain: str, data: List[ICPRecord]):
# 将data转换为json字符串
# 直接转换
data_json = QueryResponse(cached=False, count=len(data), data=data).model_dump_json()
r.set(f"{config.REDIS_PREFIX}:{domain}", data_json, ex=MAX_EXPIRE_TIME)
def load_from_cache(domain: str) -> Optional[List[ICPRecord]]:
data_json = r.get(f"{config.REDIS_PREFIX}:{domain}")
if data_json:
return QueryResponse.model_validate_json(data_json).data
return None

14
config.py Normal file
View File

@ -0,0 +1,14 @@
import os
REDIS_HOST = os.environ.get("REDIS_HOST", "localhost")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")
REDIS_PASSWORD = os.environ.get("REDIS_PASSWORD", "")
REDIS_DB = os.environ.get("REDIS_DB", "0")
# prefix
REDIS_PREFIX = os.environ.get("REDIS_PREFIX", "icp:")

View File

@ -4,7 +4,7 @@ import cv2
import numpy as np
class Crack:
class Detect:
def __init__(self):
pass
@ -22,7 +22,7 @@ class Crack:
def detect(self, big_img):
confidence_thres = 0.7
iou_thres = 0.7
session = onnxruntime.InferenceSession("yolov8.onnx")
session = onnxruntime.InferenceSession("./models/yolov8.onnx")
model_inputs = session.get_inputs()
self.big_img = self.read_base64_image(big_img)
@ -58,7 +58,7 @@ class Crack:
return new_boxes
def siamese(self, small_img, boxes):
session = onnxruntime.InferenceSession("siamese.onnx")
session = onnxruntime.InferenceSession("./models/siamese.onnx")
positions = [165, 200, 231, 265]
result_list = []
for x in positions:
@ -93,8 +93,3 @@ class Crack:
break
return result_list
if __name__ == '__main__':
crack = Crack()
boxes = crack.detect("./1.png")
print(crack.siamese("./2.png", boxes))

302
main.py
View File

@ -3,13 +3,57 @@ import json
import requests
import hashlib
import time
import os
from urllib import parse
from crack import Crack
import uuid
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from detect import Detect
import functools
import logging
from models import QueryResponse
from cache import save_to_cache, load_from_cache
import random
import re
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def retry_with_backoff(retries=3, backoff_in_seconds=1):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
x = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
if x == retries:
logger.error(f"Failed after {retries} retries. Error: {str(e)}")
raise
# wait = (backoff_in_seconds * (2 ** x) +
# random.uniform(0, 1))
# logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying in {wait:.2f} seconds...")
# time.sleep(wait)
logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying...")
x += 1
return wrapper
return decorator
app = FastAPI(
title="ICP查询API",
description="提供ICP备案信息查询服务",
version="1.0.0"
)
@retry_with_backoff(retries=3)
def auth():
t = str(round(time.time()))
data = {
@ -26,18 +70,71 @@ def auth():
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
try:
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
data=parse.urlencode(data)).text
return json.loads(resp)["params"]["bussiness"]
except Exception:
time.sleep(5)
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
data=parse.urlencode(data)).text
return json.loads(resp)["params"]["bussiness"]
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth",
headers=headers,
data=parse.urlencode(data))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data or 'bussiness' not in resp_data['params']:
raise ValueError("Missing params.bussiness in response")
return resp_data["params"]["bussiness"]
def getImage():
@retry_with_backoff(retries=3)
def query(sign, uuid_token, domain, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Sign": sign,
"Uuid": uuid_token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn",
"Content-Type": "application/json",
"Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32])
}
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
headers=headers,
data=json.dumps(data).replace(" ", ""))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
return json.dumps(resp_data)
@retry_with_backoff(retries=3)
def getImage(token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
@ -51,15 +148,26 @@ def getImage():
payload = {
"clientUid": "point-" + str(uuid.uuid4())
}
try:
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload).json()
return resp["params"], payload["clientUid"]
except Exception:
time.sleep(5)
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload).json()
return resp["params"], payload["clientUid"]
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload)
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data:
raise ValueError("Missing params in response")
return resp_data["params"], payload["clientUid"]
def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
@ -76,22 +184,20 @@ def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
def generate_pointjson(big_img, small_img, secretKey):
boxes = crack.detect(big_img)
if boxes:
print("文字检测成功")
else:
print("文字检测失败,请重试")
raise Exception("文字检测失败,请重试")
points = crack.siamese(small_img, boxes)
print("文字匹配成功")
d = Detect()
boxes = d.detect(big_img)
if not boxes:
logger.error("文字检测失败。")
raise Exception("文字检测失败")
points = d.siamese(small_img, boxes)
new_points = [[p[0] + 20, p[1] + 20] for p in points]
pointJson = [{"x": p[0], "y": p[1]} for p in new_points]
# print(json.dumps(pointJson))
enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode())
return enc_pointJson
def checkImage(uuid_token, secretKey, clientUid, pointJson):
@retry_with_backoff(retries=3)
def checkImage(uuid_token, secretKey, clientUid, pointJson, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
@ -108,45 +214,105 @@ def checkImage(uuid_token, secretKey, clientUid, pointJson):
"clientUid": clientUid,
"pointJson": pointJson
}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers,
json=data).json()
if resp["code"] == 200:
# print(resp["params"])
return resp["params"]["sign"]
return False
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage",
headers=headers,
json=data)
resp.raise_for_status()
resp_data = resp.json()
def query(sign, uuid_token, domain):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Sign": sign,
"Uuid": uuid_token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn",
"Content-Type": "application/json",
"Cookie": "__jsluid_s="+str(uuid.uuid4().hex[:32])
}
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
headers=headers, data=json.dumps(data).replace(" ","")).text
return resp
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
crack = Crack()
token = auth()
time.sleep(0.1)
print("正在获取验证码")
params, clientUid = getImage()
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
time.sleep(0.5)
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson)
time.sleep(0.5)
if sign:
print(query(sign, params["uuid"],"baidu.com"))
else:
print("failed")
if resp_data['code'] != 200 or not resp_data['success']:
return False
if 'params' not in resp_data or 'sign' not in resp_data['params']:
raise ValueError("Missing params.sign in response")
return resp_data["params"]["sign"]
@retry_with_backoff(retries=3)
def verify_process(domain):
"""整个验证流程的函数包含获取token、图片验证等所有步骤"""
token = auth()
params, clientUid = getImage(token)
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token)
if not sign:
raise ValueError("验证码校验失败")
result = query(sign, params["uuid"], domain, token)
response = json.loads(result)
return response['params']['list']
@app.get("/query", response_model=QueryResponse, tags=["查询"])
async def query_api(
domain: str = Query(..., description="要查询的域名"),
):
"""
查询域名的ICP备案信息
- **domain**: 要查询的域名纯域名不能包含 https:// http://也不能包含子域名端口号和路径
返回
- 成功时返回ICP备案信息列表
- 失败时返回错误信息
"""
try:
if not domain:
raise HTTPException(status_code=400, detail="Missing 'domain' parameter")
# 添加域名判断
# 域名格式正则:只允许字母、数字、连字符和点,必须有一个点,不能以点或连字符开始或结束
domain_pattern = r'^(?!-)[A-Za-z0-9-]{1,63}(?<!-)\.(?!-)[A-Za-z0-9-]{1,63}(?<!-)$'
# 检查是否包含协议
if '://' in domain:
raise HTTPException(status_code=400, detail="域名不能包含协议(如 http:// 或 https://")
# 检查是否包含端口号
if ':' in domain:
raise HTTPException(status_code=400, detail="域名不能包含端口号")
# 检查是否包含路径
if '/' in domain:
raise HTTPException(status_code=400, detail="域名不能包含路径")
# 检查是否为子域名
if domain.count('.') > 1:
raise HTTPException(status_code=400, detail="不支持子域名,请使用主域名")
# 检查域名格式
if not re.match(domain_pattern, domain):
raise HTTPException(status_code=400, detail="域名格式不正确,请使用正确的域名格式(如 example.com")
# 从缓存中获取数据
cached_data = load_from_cache(domain)
if cached_data:
return QueryResponse(cached=True, count=len(cached_data), data=cached_data)
# 执行验证流程(包含重试机制)
result_list = verify_process(domain)
# 将结果保存到缓存
save_to_cache(domain, result_list)
return QueryResponse(cached=False, count=len(result_list), data=result_list)
except ValueError as e:
# 验证码校验失败等特定错误
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# 其他未预期的错误
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@ -1,211 +0,0 @@
import base64
import json
import requests
import hashlib
import time
import os
from urllib import parse
import uuid
from flask import Flask, request, jsonify ,Response
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from crack import Crack # 假设 Crack 是一个你自己实现的类
app = Flask(__name__)
CACHE_DIR = './cache' # 缓存文件目录
# 创建缓存目录(如果不存在)
if not os.path.exists(CACHE_DIR):
os.makedirs(CACHE_DIR)
def auth():
t = str(round(time.time()))
data = {
"authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(),
"timeStamp": t
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
try:
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
data=parse.urlencode(data)).text
return json.loads(resp)["params"]["bussiness"]
except Exception:
time.sleep(5)
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
data=parse.urlencode(data)).text
return json.loads(resp)["params"]["bussiness"]
def getImage(token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
payload = {
"clientUid": "point-" + str(uuid.uuid4())
}
try:
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload).json()
return resp["params"], payload["clientUid"]
except Exception:
time.sleep(5)
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload).json()
return resp["params"], payload["clientUid"]
def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
padding_length = block_size - (len(plaintext) % block_size)
plaintext_padded = plaintext + bytes([padding_length]) * padding_length
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize()
return base64.b64encode(ciphertext).decode('utf-8')
def generate_pointjson(big_img, small_img, secretKey):
crack = Crack() # 假设 Crack 是一个你实现的类
boxes = crack.detect(big_img)
if boxes:
print("文字检测成功")
else:
print("文字检测失败,请重试")
raise Exception("文字检测失败,请重试")
points = crack.siamese(small_img, boxes)
print("文字匹配成功")
new_points = [[p[0] + 20, p[1] + 20] for p in points]
pointJson = [{"x": p[0], "y": p[1]} for p in new_points]
enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode())
return enc_pointJson
def checkImage(uuid_token, secretKey, clientUid, pointJson, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
data = {
"token": uuid_token,
"secretKey": secretKey,
"clientUid": clientUid,
"pointJson": pointJson
}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers,
json=data).json()
if resp["code"] == 200:
return resp["params"]["sign"]
return False
def query(sign, uuid_token, domain, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Sign": sign,
"Uuid": uuid_token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn",
"Content-Type": "application/json",
"Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32])
}
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
headers=headers, data=json.dumps(data).replace(" ", "")).text
return resp
def save_to_cache(domain, data):
"""保存数据到缓存文件"""
cache_path = os.path.join(CACHE_DIR, f"{domain}.json")
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(data, f)
def load_from_cache(domain):
"""从缓存文件中加载数据"""
cache_path = os.path.join(CACHE_DIR, f"{domain}.json")
if os.path.exists(cache_path):
with open(cache_path, 'r', encoding='utf-8') as f:
return json.load(f)
return None
@app.route('/query', methods=['GET'])
def query_api():
try:
# 从 URL 查询参数中获取 domain 和 type
domain = request.args.get('domain')
query_type = request.args.get('type', '') # 默认为空字符串
if not domain:
return jsonify({"status": "failed", "message": "Missing 'domain' parameter"}), 400
# 如果 type=cache则检查缓存
if query_type == 'cache':
cached_data = load_from_cache(domain)
if cached_data:
json_data = json.dumps({"status": "successful", "data": cached_data})
return Response(json_data, content_type='application/json')
# 如果没有缓存,或未指定 type=cache则继续进行查询
crack = Crack()
token = auth()
time.sleep(0.1)
params, clientUid = getImage(token)
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
time.sleep(0.5)
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token)
time.sleep(0.5)
if sign:
result = query(sign, params["uuid"], domain, token)
response = json.loads(result)
json_data = json.dumps({"status": "successful", "data": response['params']['list']})
# 如果 type=cache则将查询结果缓存
if query_type == 'cache':
save_to_cache(domain, response['params']['list'])
return Response(json_data, content_type='application/json')
else:
return jsonify({"status": "failed", "message": "Captcha verification failed"}), 400
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, port=5000)

30
models.py Normal file
View File

@ -0,0 +1,30 @@
from pydantic import BaseModel, Field
from typing import List, Optional
# Pydantic 模型
class ICPRecord(BaseModel):
"""ICP备案记录模型"""
contentTypeName: str = Field(default="", description="内容类型名称")
domain: str = Field(default="", description="域名")
domainId: str | int = Field(default="", description="域名ID")
leaderName: str = Field(default="", description="负责人姓名")
limitAccess: str = Field(default="", description="限制访问")
mainId: str | int = Field(default="", description="主体ID")
mainLicence: str = Field(default="", description="主体许可证")
natureName: str = Field(default="", description="性质名称")
serviceId: str | int = Field(default="", description="服务ID")
serviceLicence: str = Field(default="", description="服务许可证")
unitName: str = Field(default="", description="单位名称")
updateRecordTime: str = Field(default="", description="更新记录时间")
class Config:
"""配置模型"""
json_encoders = {
int: str # 将整数类型自动转换为字符串
}
class QueryResponse(BaseModel):
"""查询响应模型"""
cached: bool = Field(default=False, description="是否从缓存中获取")
count: int = Field(default=0, description="数量")
data: List[ICPRecord] = Field(default_factory=list, description="ICP 记录列表")

View File

Before

Width:  |  Height:  |  Size: 46 KiB

After

Width:  |  Height:  |  Size: 46 KiB

31
old/README.md Normal file
View File

@ -0,0 +1,31 @@
# 使用方法
先安装 文件所需要的依赖模块
pip install -r requirements.txt
再运行mainapi.py文件即可
# 环境要求
Python >= 3
# 请求示例
如图箭头显示
![url链接](https://raw.githubusercontent.com/Suxiaoqinx/icp_api/refs/heads/main/7a87c3a5-037f-4bd4-81a4-72b1afc71cde.png)
## 参数列表
请求链接选择 http://ip:port/query
请求方式 GET
| 参数列表 | 参数说明 |
| ---- | ---- |
| domain | 域名url地址|
| type | cache(则缓存) 为空则不缓存 |
# 感谢
[Ravizhan](https://github.com/ravizhan)
[原项目地址](https://github.com/ravizhan/ICP-spider)
# 反馈方法
请在Github的lssues反馈 或者到我[博客](https://www.toubiec.cn)反馈

Binary file not shown.