# -*- coding: utf-8 -*- import json import jsonpath import scrapy import time from scrapy.http import Request import psycopg2 import time # import datetime from functools import wraps from contextlib import contextmanager import psycopg2.extras from ..items import Zuqiustatus import json from datetime import datetime from datetime import date import itertools # 测试一个函数的运行时间,使用方式:在待测函数直接添加此修饰器 def timethis(func): @wraps(func) def wrapper(*args, **kwargs): start = time.perf_counter() r = func(*args, **kwargs) end = time.perf_counter() print('\n============================================================') print('{}.{} : {}'.format(func.__module__, func.__name__, end - start)) print('============================================================\n') return r return wrapper # 测试一段代码运行的时间,使用方式:上下文管理器with # with timeblock('block_name'): # your_code_block... @contextmanager def timeblock(label='Code'): start = time.perf_counter() try: yield finally: end = time.perf_counter() print('==============================================================') print('{} run time: {}'.format(label, end - start)) print('==============================================================') class SqlConn(): ''' 连接数据库,以及进行一些操作的封装 ''' sql_name = '' database = '' user = '' password = '' port = 0 host = '' # 创建连接、游标 def __init__(self, *args, **kwargs): if kwargs.get("sql_name"): self.sql_name = kwargs.get("sql_name") if kwargs.get("database"): self.database = kwargs.get("database") if kwargs.get("user"): self.user = kwargs.get("user") if kwargs.get("password"): self.password = kwargs.get("password") if kwargs.get("port"): self.port = kwargs.get("port") if kwargs.get("host"): self.host = kwargs.get("host") if not (self.host and self.port and self.user and self.password and self.database): raise Warning("conn_error, missing some params!") sql_conn = { 'postgresql': psycopg2, } self.conn = sql_conn[self.sql_name].connect(host=self.host, port=self.port, user=self.user, password=self.password, database=self.database, ) self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) if not self.cursor: raise Warning("conn_error!") # 测试连接 def test_conn(self): if self.cursor: print("conn success!") else: print('conn error!') # 单条语句的并提交 def execute(self, sql_code): self.cursor.execute(sql_code) self.conn.commit() # 单条语句的不提交 def execute_no_conmmit(self, sql_code): self.cursor.execute(sql_code) # 构造多条语句,使用%s参数化,对于每个list都进行替代构造 def excute_many(self, sql_base, param_list): self.cursor.executemany(sql_base, param_list) # 批量执行(待完善) def batch_execute(self, sql_code): pass # 获取数据 def get_data(self, sql_code, count=0): self.cursor.execute(sql_code) if int(count): return self.cursor.fetchmany(count) else: return self.cursor.fetchall() # 更新数据 def updata_data(self, sql_code): self.cursor(sql_code) # 插入数据 def insert_data(self, sql_code): self.cursor(sql_code) # 滚动游标 def cursor_scroll(self, count, mode='relative'): self.cursor.scroll(count, mode=mode) # 提交 def commit(self): self.conn.commit() # 回滚 def rollback(self): self.conn.rollback() # 关闭连接 def close_conn(self): self.cursor.close() self.conn.close() class ComplexEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') else: return json.JSONEncoder.default(self, obj) class LanqiuSpider(scrapy.Spider): name = "ball_status" allowed_domains = ['hg3535z.com'] to_day = datetime.now() #sid要改为1 足球 现在测试改为4 start_urls = ['https://hg3535z.com/odds2/d/getodds?sid=1&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0'] # 滚球菜单 篮球滚球列url custom_settings = { "ITEM_PIPELINES": { 'hg3535.pipelines.BallStatuspipeline':200, }, 'LOG_LEVEL': 'DEBUG', 'LOG_FILE': "../hg3535/log/ball_status_{}_{}_{}.log".format(to_day.year, to_day.month, to_day.day) } def parse(self, response): datas = json.loads(response.text) ids = jsonpath.jsonpath(datas, '$..i-ot[0]..egs..es..i[16]') # ids新列表 item = Zuqiustatus() utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # zuqiu_total = {} zuqiu_status_list = [] if ids: ids = set(ids) for i in ids: zuqiu = {} zuqiu['match_id'] = i zuqiu['create_time'] = utime zuqiu['status'] = 1 zuqiu['ball_type'] = datas['i-ot'][0]['s']['n'] zuqiu_status_list.append(zuqiu) item["zuqiu_total"] = zuqiu_status_list yield item