#!/usr/bin/env python3 # -*- coding: UTF-8 -*- import json,sys import codecs import os,io from urllib import parse, request import subprocess import asyncio import time import hashlib import random import logging import logging.config import base64 import datetime import tempfile _sysCache={} # 设定缓存数据 def setCache(key,value): _sysCache[key]=value # 得到缓存数据 def getCache(key): if key in _sysCache: return _sysCache.get(key) else: return def _import(lib,target=-1): if target==-1: return __import__(lib) else: return __import__(lib,fromlist=(target)) # 得到上级绝对路径 def getAbsPath(): return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def b64encode(enstr): ss = base64.b64encode(enstr.encode('utf-8')) ss = str(ss.decode('utf-8')) return ss def b64decode(destr): ss = base64.b64decode(destr.encode('utf-8')) ss = str(ss.decode('utf-8')) return ss def ldjson(filename): with open(filename, "r") as data: # if data.startswith(u'\ufeff'): # data = data.encode('utf8')[3:].decode('utf8') data = json.load(data) return data return def md5(md5str): return hashlib.md5(md5str.encode(encoding='UTF-8')).hexdigest() def uri(url): with request.urlopen(url) as f: data = f.read() result = data.decode('utf-8') return json.loads(result) def uri2(url): with request.urlopen(url) as f: data = f.read() result = data.decode('utf-8') return result # 加载json文件 def loadJson(filename, charset='utf-8'): with open(filename, "r", charset) as data: # if data.startswith(u'\ufeff'): # data = data.encode('utf8')[3:].decode('utf8') data = json.load(data) return data return # 打开新线程执行 def asyncExecCmd(cmd): return execApp(cmd, 1) # 解析固定标签 def parseVar(data): data = data.replace('[RAND]', str(random.random())) data = data.replace('[MSTIME]', str(gmtime() * 1000)) data = data.replace('[TIME]', str(gmtime())) data = data.replace('[DATE]', str(gmdate())) return data # 执行系统命令 # 解析动态标签 def parseTag(data,name,value,tag='#'): data = data.replace((tag+name+tag), (value)) return data def exejs(cmd, charset='utf-8'): data = __subprocess(cmd) if not data: return '' else: return str(data) def execApp(cmd, isback=0, charset='utf-8'): data = __subprocess(cmd) if isback == 0: pass else: return str(data) def execCmd(cmd, charset='utf-8'): return __subprocess(cmd) def __subprocess(cmd): rt_list='' try: # 得到一个临时文件对象, 调用close后,此文件从磁盘删除 out_temp = tempfile.TemporaryFile(mode='w+',encoding='utf-8') # 获取临时文件的文件号 fileno = out_temp.fileno() # 执行外部shell命令, 输出结果存入临时文件中 p = subprocess.Popen(cmd, shell=True, stdout=fileno, stderr=fileno) p.wait() # 从临时文件读出shell命令的输出结果 out_temp.seek(0) rt = out_temp.read() # 以换行符拆分数据,并去掉换行符号存入列表 rt_list = rt.strip().split('\n') except Exception as e: print(e) finally: if out_temp: out_temp.close() return ''.join(rt_list) def toLog(message, ftype=0): logger = logging.getLogger() logger.propagate = False filename = time.strftime('%Y-%m-%d', time.localtime(time.time())) filename2 = time.strftime('%H', time.localtime(time.time())) handler = logging.StreamHandler() if not os.path.exists("./log/" + filename + "/"): try: os.makedirs("./log/" + filename + "/") except Exception as ex: pass if ftype == 0: ftype = '' else: ftype = ftype+"." handler_file = logging.FileHandler( "./log/" + filename + "/" + ftype + filename2 + ".err") logger.addHandler(handler) logger.addHandler(handler_file) logger.setLevel(logging.NOTSET) nowtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) logger.info({'time': nowtime, 'info': message}) logger.removeHandler(handler) logger.removeHandler(handler_file) def time2utc(strtime): strtime = time.strptime(strtime, "%Y-%m-%d %H:%M:%S") strtime = int(time.mktime(strtime)) return strtime def timeofHmd2utc(hmd): curDate = time.strftime("%Y-%m-%d", time.localtime()) checkToTime = time2utc(curDate + ' ' + hmd) return checkToTime def gmtime(): return int(time.time()) def gmdate(): return utc2timestamp(int(time.time())) def saveFile(path, data): with open(path, "w", encoding='utf-8') as f: f.write(data) def appendFile(path, data): with open(path, "a+", encoding='utf-8') as f: f.write(data) def readFile(path): with open(path, "r", encoding='utf8') as f: return f.read() return def utc2timestamp(nowtime): time_local = time.localtime(nowtime) # 转换成新的时间格式(2016-05-05 20:28:54) dt = time.strftime("%Y-%m-%d %H:%M:%S", time_local) return dt def parseListByXpath(data, path): path = path.split('/') for item in path: if data[item]: data = data[item] return data def postData(url, data): textmod = data textmod = parse.urlencode(textmod).encode(encoding='utf-8') pycomm.toLog(textmod, 'post') # 输出内容:user=admin&password=admin header_dict = { 'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko' } try: req = request.Request(url=url, data=textmod, headers=header_dict) res = request.urlopen(req) res = res.read() pycomm.toLog(res, 'post') except Exception as e: # appendFile(os.path.abspath(os.curdir) + '/log/post.log',str({'data':textmod,'url':url})) pycomm.toLog("Unexpected Error: {}".format(e), 'post') # 输出内容(python3默认获取到的是16进制'bytes'类型数据 Unicode编码,如果如需可读输出则需decode解码成对应编码):b'\xe7\x99\xbb\xe5\xbd\x95\xe6\x88\x90\xe5\x8a\x9f' # pycomm.toLog(res.decode(encoding='utf-8')) # 输出内容:登录成功 def getAsyncLoog(): try: loop = asyncio.get_event_loop() if(loop.is_running()): loop = getNewLoop() if(loop.is_closed()): loop = getNewLoop() except Exception as ex: loop = getNewLoop() def asyncExec(tasks): try: loop = asyncio.get_event_loop() if(loop.is_running()): loop = getNewLoop() if(loop.is_closed()): loop = getNewLoop() except Exception as ex: loop = getNewLoop() # 执行coroutine loop.run_until_complete(asyncio.gather(*tasks)) loop.close() return loop # loop.close() def getNewLoop(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) return loop # item=[{url:,data:}] # 异步提交数据 def asyncPost(item): if not item: return tasks = [] for it in item: tasks.append(asyncCommit(it['url'], it['data'])) asyncExec(tasks) async def asyncCommit(url, data): postData(url, data)