icp-api/main.py

318 lines
12 KiB
Python
Raw Normal View History

2024-10-04 07:39:12 +00:00
import base64
import json
import requests
import hashlib
import time
2025-01-22 19:34:16 +00:00
import os
2024-10-04 07:39:12 +00:00
from urllib import parse
import uuid
2025-01-22 19:34:16 +00:00
from fastapi import FastAPI, HTTPException, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
2024-10-04 07:39:12 +00:00
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
2025-01-22 19:34:16 +00:00
from detect import Detect
import functools
import logging
from models import QueryResponse
from cache import save_to_cache, load_from_cache
import random
import re
2024-10-04 07:39:12 +00:00
2025-01-22 19:34:16 +00:00
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
2024-10-04 07:39:12 +00:00
2025-01-22 19:34:16 +00:00
def retry_with_backoff(retries=3, backoff_in_seconds=1):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
x = 0
while True:
try:
return func(*args, **kwargs)
except Exception as e:
if x == retries:
logger.error(f"Failed after {retries} retries. Error: {str(e)}")
raise
# wait = (backoff_in_seconds * (2 ** x) +
# random.uniform(0, 1))
# logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying in {wait:.2f} seconds...")
# time.sleep(wait)
logger.warning(f"Attempt {x + 1} failed: {str(e)}. Retrying...")
x += 1
return wrapper
return decorator
app = FastAPI(
title="ICP查询API",
description="提供ICP备案信息查询服务",
version="1.0.0"
)
@retry_with_backoff(retries=3)
2024-10-04 07:39:12 +00:00
def auth():
t = str(round(time.time()))
data = {
"authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(),
"timeStamp": t
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Content-Type": "application/x-www-form-urlencoded",
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
2025-01-22 19:34:16 +00:00
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth",
headers=headers,
data=parse.urlencode(data))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data or 'bussiness' not in resp_data['params']:
raise ValueError("Missing params.bussiness in response")
return resp_data["params"]["bussiness"]
2024-10-04 07:39:12 +00:00
2025-01-22 19:34:16 +00:00
@retry_with_backoff(retries=3)
def query(sign, uuid_token, domain, token):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Sign": sign,
"Uuid": uuid_token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn",
"Content-Type": "application/json",
"Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32])
}
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
headers=headers,
data=json.dumps(data).replace(" ", ""))
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
return json.dumps(resp_data)
@retry_with_backoff(retries=3)
def getImage(token):
2024-10-04 07:39:12 +00:00
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
payload = {
"clientUid": "point-" + str(uuid.uuid4())
}
2025-01-22 19:34:16 +00:00
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
headers=headers, json=payload)
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
raise ValueError(f"API error: {resp_data.get('msg', 'Unknown error')}")
if 'params' not in resp_data:
raise ValueError("Missing params in response")
return resp_data["params"], payload["clientUid"]
2024-10-04 07:39:12 +00:00
def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
backend = default_backend()
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
padding_length = block_size - (len(plaintext) % block_size)
plaintext_padded = plaintext + bytes([padding_length]) * padding_length
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize()
return base64.b64encode(ciphertext).decode('utf-8')
def generate_pointjson(big_img, small_img, secretKey):
2025-01-22 19:34:16 +00:00
d = Detect()
boxes = d.detect(big_img)
if not boxes:
logger.error("文字检测失败。")
raise Exception("文字检测失败")
points = d.siamese(small_img, boxes)
2024-10-04 07:39:12 +00:00
new_points = [[p[0] + 20, p[1] + 20] for p in points]
pointJson = [{"x": p[0], "y": p[1]} for p in new_points]
enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode())
return enc_pointJson
2025-01-22 19:34:16 +00:00
@retry_with_backoff(retries=3)
def checkImage(uuid_token, secretKey, clientUid, pointJson, token):
2024-10-04 07:39:12 +00:00
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Referer": "https://beian.miit.gov.cn/",
"Token": token,
"Connection": "keep-alive",
"Accept": "application/json, text/plain, */*",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Origin": "https://beian.miit.gov.cn"
}
data = {
"token": uuid_token,
"secretKey": secretKey,
"clientUid": clientUid,
"pointJson": pointJson
}
2025-01-22 19:34:16 +00:00
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage",
headers=headers,
json=data)
resp.raise_for_status()
resp_data = resp.json()
# 验证响应格式和状态
if not isinstance(resp_data, dict):
raise ValueError("Response is not a valid JSON object")
if not all(key in resp_data for key in ['code', 'success']):
raise ValueError("Missing required fields in response")
if resp_data['code'] != 200 or not resp_data['success']:
return False
if 'params' not in resp_data or 'sign' not in resp_data['params']:
raise ValueError("Missing params.sign in response")
return resp_data["params"]["sign"]
2024-10-04 07:39:12 +00:00
2025-01-22 19:34:16 +00:00
@retry_with_backoff(retries=3)
def verify_process(domain):
"""整个验证流程的函数包含获取token、图片验证等所有步骤"""
token = auth()
params, clientUid = getImage(token)
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token)
if not sign:
raise ValueError("验证码校验失败")
result = query(sign, params["uuid"], domain, token)
response = json.loads(result)
return response['params']['list']
2024-10-04 07:39:12 +00:00
2025-01-22 19:34:16 +00:00
@app.get("/query", response_model=QueryResponse, tags=["查询"])
async def query_api(
domain: str = Query(..., description="要查询的域名"),
):
"""
查询域名的ICP备案信息
- **domain**: 要查询的域名纯域名不能包含 https:// http://也不能包含子域名端口号和路径
返回
- 成功时返回ICP备案信息列表
- 失败时返回错误信息
"""
try:
if not domain:
raise HTTPException(status_code=400, detail="Missing 'domain' parameter")
# 添加域名判断
# 域名格式正则:只允许字母、数字、连字符和点,必须有一个点,不能以点或连字符开始或结束
domain_pattern = r'^(?!-)[A-Za-z0-9-]{1,63}(?<!-)\.(?!-)[A-Za-z0-9-]{1,63}(?<!-)$'
# 检查是否包含协议
if '://' in domain:
raise HTTPException(status_code=400, detail="域名不能包含协议(如 http:// 或 https://")
# 检查是否包含端口号
if ':' in domain:
raise HTTPException(status_code=400, detail="域名不能包含端口号")
# 检查是否包含路径
if '/' in domain:
raise HTTPException(status_code=400, detail="域名不能包含路径")
# 检查是否为子域名
if domain.count('.') > 1:
raise HTTPException(status_code=400, detail="不支持子域名,请使用主域名")
# 检查域名格式
if not re.match(domain_pattern, domain):
raise HTTPException(status_code=400, detail="域名格式不正确,请使用正确的域名格式(如 example.com")
# 从缓存中获取数据
cached_data = load_from_cache(domain)
if cached_data:
return QueryResponse(cached=True, count=len(cached_data), data=cached_data)
# 执行验证流程(包含重试机制)
result_list = verify_process(domain)
# 将结果保存到缓存
save_to_cache(domain, result_list)
return QueryResponse(cached=False, count=len(result_list), data=result_list)
except ValueError as e:
# 验证码校验失败等特定错误
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
# 其他未预期的错误
logger.error(f"Unexpected error: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)