212 lines
7.9 KiB
Python
212 lines
7.9 KiB
Python
|
import base64
|
|||
|
import json
|
|||
|
import requests
|
|||
|
import hashlib
|
|||
|
import time
|
|||
|
import os
|
|||
|
from urllib import parse
|
|||
|
import uuid
|
|||
|
from flask import Flask, request, jsonify ,Response
|
|||
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|||
|
from cryptography.hazmat.backends import default_backend
|
|||
|
from crack import Crack # 假设 Crack 是一个你自己实现的类
|
|||
|
|
|||
|
app = Flask(__name__)
|
|||
|
|
|||
|
CACHE_DIR = './cache' # 缓存文件目录
|
|||
|
|
|||
|
# 创建缓存目录(如果不存在)
|
|||
|
if not os.path.exists(CACHE_DIR):
|
|||
|
os.makedirs(CACHE_DIR)
|
|||
|
|
|||
|
|
|||
|
def auth():
|
|||
|
t = str(round(time.time()))
|
|||
|
data = {
|
|||
|
"authKey": hashlib.md5(("testtest" + t).encode()).hexdigest(),
|
|||
|
"timeStamp": t
|
|||
|
}
|
|||
|
headers = {
|
|||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|||
|
"Referer": "https://beian.miit.gov.cn/",
|
|||
|
"Content-Type": "application/x-www-form-urlencoded",
|
|||
|
"Connection": "keep-alive",
|
|||
|
"Accept": "application/json, text/plain, */*",
|
|||
|
"Accept-Encoding": "gzip, deflate, br",
|
|||
|
"Accept-Language": "zh-CN,zh;q=0.9",
|
|||
|
"Origin": "https://beian.miit.gov.cn"
|
|||
|
}
|
|||
|
try:
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
|
|||
|
data=parse.urlencode(data)).text
|
|||
|
return json.loads(resp)["params"]["bussiness"]
|
|||
|
except Exception:
|
|||
|
time.sleep(5)
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/auth", headers=headers,
|
|||
|
data=parse.urlencode(data)).text
|
|||
|
return json.loads(resp)["params"]["bussiness"]
|
|||
|
|
|||
|
|
|||
|
def getImage(token):
|
|||
|
headers = {
|
|||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|||
|
"Referer": "https://beian.miit.gov.cn/",
|
|||
|
"Token": token,
|
|||
|
"Connection": "keep-alive",
|
|||
|
"Accept": "application/json, text/plain, */*",
|
|||
|
"Accept-Encoding": "gzip, deflate, br",
|
|||
|
"Accept-Language": "zh-CN,zh;q=0.9",
|
|||
|
"Origin": "https://beian.miit.gov.cn"
|
|||
|
}
|
|||
|
payload = {
|
|||
|
"clientUid": "point-" + str(uuid.uuid4())
|
|||
|
}
|
|||
|
try:
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
|
|||
|
headers=headers, json=payload).json()
|
|||
|
return resp["params"], payload["clientUid"]
|
|||
|
except Exception:
|
|||
|
time.sleep(5)
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/getCheckImagePoint",
|
|||
|
headers=headers, json=payload).json()
|
|||
|
return resp["params"], payload["clientUid"]
|
|||
|
|
|||
|
|
|||
|
def aes_ecb_encrypt(plaintext: bytes, key: bytes, block_size=16):
|
|||
|
backend = default_backend()
|
|||
|
cipher = Cipher(algorithms.AES(key), modes.ECB(), backend=backend)
|
|||
|
|
|||
|
padding_length = block_size - (len(plaintext) % block_size)
|
|||
|
plaintext_padded = plaintext + bytes([padding_length]) * padding_length
|
|||
|
|
|||
|
encryptor = cipher.encryptor()
|
|||
|
ciphertext = encryptor.update(plaintext_padded) + encryptor.finalize()
|
|||
|
|
|||
|
return base64.b64encode(ciphertext).decode('utf-8')
|
|||
|
|
|||
|
|
|||
|
def generate_pointjson(big_img, small_img, secretKey):
|
|||
|
crack = Crack() # 假设 Crack 是一个你实现的类
|
|||
|
boxes = crack.detect(big_img)
|
|||
|
if boxes:
|
|||
|
print("文字检测成功")
|
|||
|
else:
|
|||
|
print("文字检测失败,请重试")
|
|||
|
raise Exception("文字检测失败,请重试")
|
|||
|
points = crack.siamese(small_img, boxes)
|
|||
|
print("文字匹配成功")
|
|||
|
new_points = [[p[0] + 20, p[1] + 20] for p in points]
|
|||
|
pointJson = [{"x": p[0], "y": p[1]} for p in new_points]
|
|||
|
enc_pointJson = aes_ecb_encrypt(json.dumps(pointJson).replace(" ", "").encode(), secretKey.encode())
|
|||
|
return enc_pointJson
|
|||
|
|
|||
|
|
|||
|
def checkImage(uuid_token, secretKey, clientUid, pointJson, token):
|
|||
|
headers = {
|
|||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|||
|
"Referer": "https://beian.miit.gov.cn/",
|
|||
|
"Token": token,
|
|||
|
"Connection": "keep-alive",
|
|||
|
"Accept": "application/json, text/plain, */*",
|
|||
|
"Accept-Encoding": "gzip, deflate, br",
|
|||
|
"Accept-Language": "zh-CN,zh;q=0.9",
|
|||
|
"Origin": "https://beian.miit.gov.cn"
|
|||
|
}
|
|||
|
data = {
|
|||
|
"token": uuid_token,
|
|||
|
"secretKey": secretKey,
|
|||
|
"clientUid": clientUid,
|
|||
|
"pointJson": pointJson
|
|||
|
}
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/image/checkImage", headers=headers,
|
|||
|
json=data).json()
|
|||
|
if resp["code"] == 200:
|
|||
|
return resp["params"]["sign"]
|
|||
|
return False
|
|||
|
|
|||
|
|
|||
|
def query(sign, uuid_token, domain, token):
|
|||
|
headers = {
|
|||
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
|
|||
|
"Referer": "https://beian.miit.gov.cn/",
|
|||
|
"Token": token,
|
|||
|
"Sign": sign,
|
|||
|
"Uuid": uuid_token,
|
|||
|
"Connection": "keep-alive",
|
|||
|
"Accept": "application/json, text/plain, */*",
|
|||
|
"Accept-Encoding": "gzip, deflate, br",
|
|||
|
"Accept-Language": "zh-CN,zh;q=0.9",
|
|||
|
"Origin": "https://beian.miit.gov.cn",
|
|||
|
"Content-Type": "application/json",
|
|||
|
"Cookie": "__jsluid_s=" + str(uuid.uuid4().hex[:32])
|
|||
|
}
|
|||
|
data = {"pageNum": "", "pageSize": "", "unitName": domain, "serviceType": 1}
|
|||
|
resp = requests.post("https://hlwicpfwc.miit.gov.cn/icpproject_query/api/icpAbbreviateInfo/queryByCondition",
|
|||
|
headers=headers, data=json.dumps(data).replace(" ", "")).text
|
|||
|
return resp
|
|||
|
|
|||
|
|
|||
|
def save_to_cache(domain, data):
|
|||
|
"""保存数据到缓存文件"""
|
|||
|
cache_path = os.path.join(CACHE_DIR, f"{domain}.json")
|
|||
|
with open(cache_path, 'w', encoding='utf-8') as f:
|
|||
|
json.dump(data, f)
|
|||
|
|
|||
|
|
|||
|
def load_from_cache(domain):
|
|||
|
"""从缓存文件中加载数据"""
|
|||
|
cache_path = os.path.join(CACHE_DIR, f"{domain}.json")
|
|||
|
if os.path.exists(cache_path):
|
|||
|
with open(cache_path, 'r', encoding='utf-8') as f:
|
|||
|
return json.load(f)
|
|||
|
return None
|
|||
|
|
|||
|
|
|||
|
@app.route('/query', methods=['GET'])
|
|||
|
def query_api():
|
|||
|
try:
|
|||
|
# 从 URL 查询参数中获取 domain 和 type
|
|||
|
domain = request.args.get('domain')
|
|||
|
query_type = request.args.get('type', '') # 默认为空字符串
|
|||
|
|
|||
|
if not domain:
|
|||
|
return jsonify({"status": "failed", "message": "Missing 'domain' parameter"}), 400
|
|||
|
|
|||
|
# 如果 type=cache,则检查缓存
|
|||
|
if query_type == 'cache':
|
|||
|
cached_data = load_from_cache(domain)
|
|||
|
if cached_data:
|
|||
|
json_data = json.dumps({"status": "successful", "data": cached_data})
|
|||
|
return Response(json_data, content_type='application/json')
|
|||
|
|
|||
|
# 如果没有缓存,或未指定 type=cache,则继续进行查询
|
|||
|
crack = Crack()
|
|||
|
token = auth()
|
|||
|
time.sleep(0.1)
|
|||
|
params, clientUid = getImage(token)
|
|||
|
pointjson = generate_pointjson(params["bigImage"], params["smallImage"], params["secretKey"])
|
|||
|
time.sleep(0.5)
|
|||
|
sign = checkImage(params["uuid"], params["secretKey"], clientUid, pointjson, token)
|
|||
|
time.sleep(0.5)
|
|||
|
|
|||
|
if sign:
|
|||
|
result = query(sign, params["uuid"], domain, token)
|
|||
|
response = json.loads(result)
|
|||
|
json_data = json.dumps({"status": "successful", "data": response['params']['list']})
|
|||
|
|
|||
|
# 如果 type=cache,则将查询结果缓存
|
|||
|
if query_type == 'cache':
|
|||
|
save_to_cache(domain, response['params']['list'])
|
|||
|
|
|||
|
return Response(json_data, content_type='application/json')
|
|||
|
|
|||
|
else:
|
|||
|
return jsonify({"status": "failed", "message": "Captcha verification failed"}), 400
|
|||
|
|
|||
|
except Exception as e:
|
|||
|
return jsonify({"status": "error", "message": str(e)}), 500
|
|||
|
|
|||
|
|
|||
|
if __name__ == '__main__':
|
|||
|
app.run(debug=True, port=5000)
|