icp-api/main.py
2025-02-07 18:20:13 +08:00

338 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
import requests
import hashlib
import time
from urllib import parse
import uuid
from fastapi import FastAPI, HTTPException, Query
import uvicorn
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from pydantic import BaseModel
from starlette import status
from detect import Detect
import functools
import logging
from models import QueryResponse
from cache import save_to_cache, load_from_cache
import re
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def retry_with_backoff(retries=3, backoff_in_seconds=1):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
x = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
if x == retries:
logger.error(f"Failed after {retries} retries. Error: {str(e)}")
raise
# wait = (backoff_in_seconds * (2 ** x) +
# random.uniform(0, 1))
# logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying in {wait:.2f} seconds...")
# time.sleep(wait)
logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying...")
x += 1
return wrapper
return decorator
app = FastAPI(
title="ICP查询API",
description="提供ICP备案信息查询服务",
version="1.0.0"
)
class HealthCheck(BaseModel):
"""Response model to validate and return when performing a health check."""
status: str = "OK"
@app.get(
"/health",
tags=["healthcheck"],
summary="Perform a Health Check",
response_description="Return HTTP Status Code 200 (OK)",
status_code=status.HTTP_200_OK,
response_model=HealthCheck,
)
def get_health() -> HealthCheck:
"""
## Perform a Health Check
Endpoint to perform a healthcheck on. This endpoint can primarily be used Docker
to ensure a robust container orchestration and management is in place. Other
services which rely on proper functioning of the API service will not deploy if this
endpoint returns any other HTTP status code except 200 (OK).
Returns:
HealthCheck: Returns a JSON response with the health status
"""
return HealthCheck(status="OK")
@retry_with_backoff(retries=3)
def auth():
t = str(round(time.time()))
data = {
"authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(),
"timeStamp": t
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth",
headers=headers,
data=parse.urlencode(data))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data or 'bussiness' not in resp_data['params']:
raise ValueError("Missing params.bussiness in response")
return resp_data["params"]["bussiness"]
@retry_with_backoff(retries=3)
def query(sign, uuid_token, domain, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Sign": sign,
"Uuid": uuid_token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn",
"Content-Type": "application/json",
"Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32])
}
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
headers=headers,
data=json.dumps(data).replace(" ", ""))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
return json.dumps(resp_data)
@retry_with_backoff(retries=3)
def getImage(token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
payload = {
"clientUid": "point-" + str(uuid.uuid4())
}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload)
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data:
raise ValueError("Missing params in response")
return resp_data["params"], payload["clientUid"]
def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
padding_length = block_size - (len(plaintext) % block_size)
plaintext_padded = plaintext + bytes([padding_length]) * padding_length
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize()
return base64.b64encode(ciphertext).decode('utf-8')
def generate_pointjson(big_img, small_img, secretKey):
d = Detect()
boxes = d.detect(big_img)
if not boxes:
logger.error("文字检测失败。")
raise Exception("文字检测失败")
points = d.siamese(small_img, boxes)
new_points = [[p[0] + 20, p[1] + 20] for p in points]
pointJson = [{"x": p[0], "y": p[1]} for p in new_points]
enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode())
return enc_pointJson
@retry_with_backoff(retries=3)
def checkImage(uuid_token, secretKey, clientUid, pointJson, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
data = {
"token": uuid_token,
"secretKey": secretKey,
"clientUid": clientUid,
"pointJson": pointJson
}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage",
headers=headers,
json=data)
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
return False
if 'params' not in resp_data or 'sign' not in resp_data['params']:
raise ValueError("Missing params.sign in response")
return resp_data["params"]["sign"]
@retry_with_backoff(retries=3)
def verify_process(domain):
"""整个验证流程的函数包含获取token、图片验证等所有步骤"""
token = auth()
params, clientUid = getImage(token)
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token)
if not sign:
raise ValueError("验证码校验失败")
result = query(sign, params["uuid"], domain, token)
response = json.loads(result)
return response['params']['list']
@app.get("/query", response_model=QueryResponse, tags=["查询"])
async def query_api(
domain: str = Query(..., description="要查询的域名"),
):
"""
查询域名的ICP备案信息
- **domain**: 要查询的域名(纯域名,不能包含 https:// 或 http://,也不能包含子域名、端口号和路径)
返回:
- 成功时返回ICP备案信息列表
- 失败时返回错误信息
"""
try:
if not domain:
raise HTTPException(status_code=400, detail="Missing 'domain' parameter")
# 检查是否包含协议
if '://' in domain:
raise HTTPException(status_code=400, detail="域名不能包含协议(如 http:// 或 https://")
# 检查是否包含端口号
if ':' in domain:
raise HTTPException(status_code=400, detail="域名不能包含端口号")
# 检查是否包含路径
if '/' in domain:
raise HTTPException(status_code=400, detail="域名不能包含路径")
# 域名至少包含一个 .
if '.' not in domain:
raise HTTPException(status_code=400, detail="域名格式错误")
# 从缓存中获取数据
cached_data = load_from_cache(domain)
if cached_data:
return QueryResponse(cached=True, count=len(cached_data), data=cached_data)
# 执行验证流程(包含重试机制)
result_list = verify_process(domain)
# 将结果保存到缓存
save_to_cache(domain, result_list)
return QueryResponse(cached=False, count=len(result_list), data=result_list)
except ValueError as e:
# 验证码校验失败等特定错误
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# 其他未预期的错误
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)