import base64 import json import requests import hashlib import time from urllib import parse import uuid from fastapi import FastAPI, HTTPException, Query import uvicorn from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from pydantic import BaseModel from starlette import status from detect import Detect import functools import logging from models import QueryResponse from cache import save_to_cache, load_from_cache import re # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def retry_with_backoff(retries=3, backoff_in_seconds=1): def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): x = 0 while True: try: return func(*args, **kwargs) except Exception as e: if x == retries: logger.error(f"Failed after {retries} retries. Error: {str(e)}") raise # wait = (backoff_in_seconds * (2 ** x) + # random.uniform(0, 1)) # logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying in {wait:.2f} seconds...") # time.sleep(wait) logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying...") x += 1 return wrapper return decorator app = FastAPI( title="ICP查询API", description="提供ICP备案信息查询服务", version="1.0.0" ) class HealthCheck(BaseModel): """Response model to validate and return when performing a health check.""" status: str = "OK" @app.get( "/health", tags=["healthcheck"], summary="Perform a Health Check", response_description="Return HTTP Status Code 200 (OK)", status_code=status.HTTP_200_OK, response_model=HealthCheck, ) def get_health() -> HealthCheck: """ ## Perform a Health Check Endpoint to perform a healthcheck on. This endpoint can primarily be used Docker to ensure a robust container orchestration and management is in place. Other services which rely on proper functioning of the API service will not deploy if this endpoint returns any other HTTP status code except 200 (OK). Returns: HealthCheck: Returns a JSON response with the health status """ return HealthCheck(status="OK") @retry_with_backoff(retries=3) def auth(): t = str(round(time.time())) data = { "authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(), "timeStamp": t } headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", "Content-Type": "application/x-www-form-urlencoded", "Connection": "keep-alive", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Origin": "https://beian.miit.gov.cn" } resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers, data=parse.urlencode(data)) resp.raise_for_status() resp_data = resp.json() # 验证响应格式和状态 if not isinstance(resp_data, dict): raise ValueError("Response is not a valid JSON object") if not all(key in resp_data for key in ['code', 'success']): raise ValueError("Missing required fields in response") if resp_data['code'] != 200 or not resp_data['success']: raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") if 'params' not in resp_data or 'bussiness' not in resp_data['params']: raise ValueError("Missing params.bussiness in response") return resp_data["params"]["bussiness"] @retry_with_backoff(retries=3) def query(sign, uuid_token, domain, token): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", "Token": token, "Sign": sign, "Uuid": uuid_token, "Connection": "keep-alive", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Origin": "https://beian.miit.gov.cn", "Content-Type": "application/json", "Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32]) } data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1} resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition", headers=headers, data=json.dumps(data).replace(" ", "")) resp.raise_for_status() resp_data = resp.json() # 验证响应格式和状态 if not isinstance(resp_data, dict): raise ValueError("Response is not a valid JSON object") if not all(key in resp_data for key in ['code', 'success']): raise ValueError("Missing required fields in response") if resp_data['code'] != 200 or not resp_data['success']: raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") return json.dumps(resp_data) @retry_with_backoff(retries=3) def getImage(token): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", "Token": token, "Connection": "keep-alive", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Origin": "https://beian.miit.gov.cn" } payload = { "clientUid": "point-" + str(uuid.uuid4()) } resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint", headers=headers, json=payload) resp.raise_for_status() resp_data = resp.json() # 验证响应格式和状态 if not isinstance(resp_data, dict): raise ValueError("Response is not a valid JSON object") if not all(key in resp_data for key in ['code', 'success']): raise ValueError("Missing required fields in response") if resp_data['code'] != 200 or not resp_data['success']: raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}") if 'params' not in resp_data: raise ValueError("Missing params in response") return resp_data["params"], payload["clientUid"] def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16): backend = default_backend() cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend) padding_length = block_size - (len(plaintext) % block_size) plaintext_padded = plaintext + bytes([padding_length]) * padding_length encryptor = cipher.encryptor() ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize() return base64.b64encode(ciphertext).decode('utf-8') def generate_pointjson(big_img, small_img, secretKey): d = Detect() boxes = d.detect(big_img) if not boxes: logger.error("文字检测失败。") raise Exception("文字检测失败") points = d.siamese(small_img, boxes) new_points = [[p[0] + 20, p[1] + 20] for p in points] pointJson = [{"x": p[0], "y": p[1]} for p in new_points] enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode()) return enc_pointJson @retry_with_backoff(retries=3) def checkImage(uuid_token, secretKey, clientUid, pointJson, token): headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", "Referer": "https://beian.miit.gov.cn/", "Token": token, "Connection": "keep-alive", "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate, br", "Accept-Language": "zh-CN,zh;q=0.9", "Origin": "https://beian.miit.gov.cn" } data = { "token": uuid_token, "secretKey": secretKey, "clientUid": clientUid, "pointJson": pointJson } resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers, json=data) resp.raise_for_status() resp_data = resp.json() # 验证响应格式和状态 if not isinstance(resp_data, dict): raise ValueError("Response is not a valid JSON object") if not all(key in resp_data for key in ['code', 'success']): raise ValueError("Missing required fields in response") if resp_data['code'] != 200 or not resp_data['success']: return False if 'params' not in resp_data or 'sign' not in resp_data['params']: raise ValueError("Missing params.sign in response") return resp_data["params"]["sign"] @retry_with_backoff(retries=3) def verify_process(domain): """整个验证流程的函数,包含获取token、图片验证等所有步骤""" token = auth() params, clientUid = getImage(token) pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"]) sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token) if not sign: raise ValueError("验证码校验失败") result = query(sign, params["uuid"], domain, token) response = json.loads(result) return response['params']['list'] @app.get("/query", response_model=QueryResponse, tags=["查询"]) async def query_api( domain: str = Query(..., description="要查询的域名"), ): """ 查询域名的ICP备案信息 - **domain**: 要查询的域名(纯域名,不能包含 https:// 或 http://,也不能包含子域名、端口号和路径) 返回: - 成功时返回ICP备案信息列表 - 失败时返回错误信息 """ try: if not domain: raise HTTPException(status_code=400, detail="Missing 'domain' parameter") # 添加域名判断 # 域名格式正则:只允许字母、数字、连字符和点,必须有一个点,不能以点或连字符开始或结束 domain_pattern = r'^(?!-)[A-Za-z0-9-]{1,63}(? 1: raise HTTPException(status_code=400, detail="不支持子域名,请使用主域名") # 检查域名格式 if not re.match(domain_pattern, domain): raise HTTPException(status_code=400, detail="域名格式不正确,请使用正确的域名格式(如 example.com)") # 从缓存中获取数据 cached_data = load_from_cache(domain) if cached_data: return QueryResponse(cached=True, count=len(cached_data), data=cached_data) # 执行验证流程(包含重试机制) result_list = verify_process(domain) # 将结果保存到缓存 save_to_cache(domain, result_list) return QueryResponse(cached=False, count=len(result_list), data=result_list) except ValueError as e: # 验证码校验失败等特定错误 raise HTTPException(status_code=400, detail=str(e)) except Exception as e: # 其他未预期的错误 logger.error(f"Unexpected error: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)