import datetime import hashlib import json import platform # import time import requests from requests_futures.sessions import FuturesSession # from scrapy import Selector # from selenium import webdriver from hgg070_spider.conf.uid import UID as u_id, UID # from selenium.webdriver import FirefoxOptions fs_session = FuturesSession() import hashlib import json from requests_futures.sessions import FuturesSession from .langconv import * from .LocalToken import token class Helper(object): @staticmethod def changetime(params): if params.endswith('p'): p_time = params[-6:-1] if p_time.startswith('12'): us_time = params[:-1] + ":00" # print(us_time) # start_time = datetime.datetime.strptime(us_time, "%Y-%m-%d %H:%M:%S") + datetime.timedelta( # hours=12) # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0] # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1] else: params = params[:-1] + ':00' us_time = datetime.datetime.strptime(params, "%Y-%m-%d %H:%M:%S") + datetime.timedelta( hours=12) # start_time = us_time + datetime.timedelta(hours=12) # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0] # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1] us_time = us_time.strftime("%Y-%m-%d %H:%M:%S") # pass else: us_time = params[:-1] + ':00' # start_time = datetime.datetime.strptime(params, "%Y-%m-%d %H:%M:%S") + datetime.timedelta( # hours=12) # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0] # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1] return us_time @staticmethod def get(url, params): return requests.get(url, data={"data": json.dumps(params), "token": u_id['token']}, timeout=30) @staticmethod def post(url, params): return requests.post(url, data={"data": json.dumps(params), "token": u_id['token']}, timeout=30) @staticmethod def async_post(url, params): try: # print(u_id['token']) data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']}).result() # data = fs_session.post(url, data={"data": json.dumps(params), "token": "agxrI115617094865d15cbaecca9f"}, timeout=30).result() if data: response_data = json.loads(data.content.decode('utf-8')) # print(response_data) # token异常重新获取 if response_data.get('status') == 10032: token = Helper.get_token() sys = platform.system() if sys == 'Windows': file_path = os.path.abspath(os.path.join(os.getcwd(), "..")) + "\\conf\\uid.py" else: file_path = os.path.abspath(os.path.join(os.getcwd(), "..")) + "/hgg070_spider/conf/uid.py" if os.path.exists(file_path): fs = open(file_path, 'w+') UID['token'] = token fs.write('UID={}'.format(UID)) fs.close() data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']}).result() # data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']}) return data.content.decode('utf-8') # return data except requests.exceptions.RequestException as e: print(e) @staticmethod def get_zip_data(list1, list2): if list1 and list2: if isinstance(list1, list) and isinstance(list2, list): return dict(zip(list1, list2)) @staticmethod def get_token(): cpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) with open(cpath + '/conf/settings.json', 'r', encoding='utf8') as f: data = json.load(f) token_url = data['token_url'] username1 = data['username1'] password1 = data['password1'] r = requests.post(token_url, data={'account': username1, 'password': password1}) token = r.json()['data']['token'] return token @staticmethod def genearte_MD5(params): # 创建md5对象 hl = hashlib.md5() hl.update(params.encode(encoding='utf-8')) # print('MD5加密前为 :' + params) # print('MD5加密后为 :' + hl.hexdigest()) return hl.hexdigest() if __name__ == '__main__': import os print(os.path.abspath(os.path.join(os.getcwd(), "..")) + "\\conf\\uid.py") print(os.path.abspath(__file__)) def async_post(url, params): fs_session = FuturesSession() t_url, t_user, t_password, t_token = token['token_url'], token['username'], token['password'], token['token'] data = fs_session.post(url, data={"data": json.dumps(params), "token": t_token}, timeout=180).result() try: new_data = data.json() if new_data.get('status') == 6: t_data = fs_session.post(url=t_url, data={'account': t_user, 'password': t_password}).result() if t_data.json().get('status') == 1: g_token = t_data.json()['data']['token'] token['token'] = g_token with open('./utils/LocalToken.py', 'w+', encoding='utf8') as f: f.write('token = {}'.format(token)) except Exception as e: print(e) new_data = {"status": 0, "msg": "接口返回异常", "data": []} return new_data @staticmethod def genearte_MD5(params, pt): # 创建md5对象 hl = hashlib.md5() pn = int(pt) if pn == 3: param = params + str(pt) else: param = params hl.update(param.encode(encoding='utf-8')) return hl.hexdigest() @staticmethod def genearte_uuid(params): # 简体 line = Converter("zh-hans").convert(params).replace(' ', '') # 繁体 # line = Converter("zh-hant").convert(params).replace(' ', '') hl = hashlib.md5() hl.update(line.encode(encoding='utf-8')) return hl.hexdigest()