| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import datetime
- import hashlib
- import json
- import platform
- # import time
- import requests
- from requests_futures.sessions import FuturesSession
- # from scrapy import Selector
- # from selenium import webdriver
- from hgg070_spider.conf.uid import UID as u_id, UID
- # from selenium.webdriver import FirefoxOptions
- fs_session = FuturesSession()
- import hashlib
- import json
- from requests_futures.sessions import FuturesSession
- from .langconv import *
- from .LocalToken import token
- class Helper(object):
- @staticmethod
- def changetime(params):
- if params.endswith('p'):
- p_time = params[-6:-1]
- if p_time.startswith('12'):
- us_time = params[:-1] + ":00"
- # print(us_time)
- # start_time = datetime.datetime.strptime(us_time, "%Y-%m-%d %H:%M:%S") + datetime.timedelta(
- # hours=12)
- # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0]
- # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1]
- else:
- params = params[:-1] + ':00'
- us_time = datetime.datetime.strptime(params, "%Y-%m-%d %H:%M:%S") + datetime.timedelta(
- hours=12)
- # start_time = us_time + datetime.timedelta(hours=12)
- # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0]
- # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1]
- us_time = us_time.strftime("%Y-%m-%d %H:%M:%S")
- # pass
- else:
- us_time = params[:-1] + ':00'
- # start_time = datetime.datetime.strptime(params, "%Y-%m-%d %H:%M:%S") + datetime.timedelta(
- # hours=12)
- # match_date = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[0]
- # match_time = start_time.strftime("%Y-%m-%d %H:%M:%S").split(' ')[1]
- return us_time
- @staticmethod
- def get(url, params):
- return requests.get(url, data={"data": json.dumps(params), "token": u_id['token']}, timeout=30)
- @staticmethod
- def post(url, params):
- return requests.post(url, data={"data": json.dumps(params), "token": u_id['token']}, timeout=30)
- @staticmethod
- def async_post(url, params):
- try:
- # print(u_id['token'])
- data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']}).result()
- # data = fs_session.post(url, data={"data": json.dumps(params), "token": "agxrI115617094865d15cbaecca9f"}, timeout=30).result()
- if data:
- response_data = json.loads(data.content.decode('utf-8'))
- # print(response_data)
- # token异常重新获取
- if response_data.get('status') == 10032:
- token = Helper.get_token()
- sys = platform.system()
- if sys == 'Windows':
- file_path = os.path.abspath(os.path.join(os.getcwd(), "..")) + "\\conf\\uid.py"
- else:
- file_path = os.path.abspath(os.path.join(os.getcwd(), "..")) + "/hgg070_spider/conf/uid.py"
- if os.path.exists(file_path):
- fs = open(file_path, 'w+')
- UID['token'] = token
- fs.write('UID={}'.format(UID))
- fs.close()
- data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']}).result()
- # data = fs_session.post(url, data={"data": json.dumps(params), "token": u_id['token']})
- return data.content.decode('utf-8')
- # return data
- except requests.exceptions.RequestException as e:
- print(e)
- @staticmethod
- def get_zip_data(list1, list2):
- if list1 and list2:
- if isinstance(list1, list) and isinstance(list2, list):
- return dict(zip(list1, list2))
- @staticmethod
- def get_token():
- cpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- with open(cpath + '/conf/settings.json', 'r', encoding='utf8') as f:
- data = json.load(f)
- token_url = data['token_url']
- username1 = data['username1']
- password1 = data['password1']
- r = requests.post(token_url, data={'account': username1, 'password': password1})
- token = r.json()['data']['token']
- return token
- @staticmethod
- def genearte_MD5(params):
- # 创建md5对象
- hl = hashlib.md5()
- hl.update(params.encode(encoding='utf-8'))
- # print('MD5加密前为 :' + params)
- # print('MD5加密后为 :' + hl.hexdigest())
- return hl.hexdigest()
- if __name__ == '__main__':
- import os
- print(os.path.abspath(os.path.join(os.getcwd(), "..")) + "\\conf\\uid.py")
- print(os.path.abspath(__file__))
- def async_post(url, params):
- fs_session = FuturesSession()
- t_url, t_user, t_password, t_token = token['token_url'], token['username'], token['password'], token['token']
- data = fs_session.post(url, data={"data": json.dumps(params), "token": t_token}, timeout=180).result()
- try:
- new_data = data.json()
- if new_data.get('status') == 6:
- t_data = fs_session.post(url=t_url, data={'account': t_user, 'password': t_password}).result()
- if t_data.json().get('status') == 1:
- g_token = t_data.json()['data']['token']
- token['token'] = g_token
- with open('./utils/LocalToken.py', 'w+', encoding='utf8') as f:
- f.write('token = {}'.format(token))
- except Exception as e:
- print(e)
- new_data = {"status": 0, "msg": "接口返回异常", "data": []}
- return new_data
- @staticmethod
- def genearte_MD5(params, pt):
- # 创建md5对象
- hl = hashlib.md5()
- pn = int(pt)
- if pn == 3:
- param = params + str(pt)
- else:
- param = params
- hl.update(param.encode(encoding='utf-8'))
- return hl.hexdigest()
- @staticmethod
- def genearte_uuid(params):
- # 简体
- line = Converter("zh-hans").convert(params).replace(' ', '')
- # 繁体
- # line = Converter("zh-hant").convert(params).replace(' ', '')
- hl = hashlib.md5()
- hl.update(line.encode(encoding='utf-8'))
- return hl.hexdigest()
|