| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233 |
- # -*- coding: utf-8 -*-
- import json
- import jsonpath
- import scrapy
- import time
- from scrapy.http import Request
- import psycopg2
- import time
- from functools import wraps
- from contextlib import contextmanager
- import psycopg2.extras
- from ..items import Zuqiustatus
- import json
- from datetime import datetime
- from datetime import date
- import itertools
- # 测试一个函数的运行时间,使用方式:在待测函数直接添加此修饰器
- def timethis(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- r = func(*args, **kwargs)
- end = time.perf_counter()
- print('\n============================================================')
- print('{}.{} : {}'.format(func.__module__, func.__name__, end - start))
- print('============================================================\n')
- return r
- return wrapper
- # 测试一段代码运行的时间,使用方式:上下文管理器with
- # with timeblock('block_name'):
- # your_code_block...
- @contextmanager
- def timeblock(label='Code'):
- start = time.perf_counter()
- try:
- yield
- finally:
- end = time.perf_counter()
- print('==============================================================')
- print('{} run time: {}'.format(label, end - start))
- print('==============================================================')
- class SqlConn():
- '''
- 连接数据库,以及进行一些操作的封装
- '''
- sql_name = ''
- database = ''
- user = ''
- password = ''
- port = 0
- host = ''
- # 创建连接、游标
- def __init__(self, *args, **kwargs):
- if kwargs.get("sql_name"):
- self.sql_name = kwargs.get("sql_name")
- if kwargs.get("database"):
- self.database = kwargs.get("database")
- if kwargs.get("user"):
- self.user = kwargs.get("user")
- if kwargs.get("password"):
- self.password = kwargs.get("password")
- if kwargs.get("port"):
- self.port = kwargs.get("port")
- if kwargs.get("host"):
- self.host = kwargs.get("host")
- if not (self.host and self.port and self.user and
- self.password and self.database):
- raise Warning("conn_error, missing some params!")
- sql_conn = {
- 'postgresql': psycopg2,
- }
- self.conn = sql_conn[self.sql_name].connect(host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- )
- self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- if not self.cursor:
- raise Warning("conn_error!")
- # 测试连接
- def test_conn(self):
- if self.cursor:
- print("conn success!")
- else:
- print('conn error!')
- # 单条语句的并提交
- def execute(self, sql_code):
- self.cursor.execute(sql_code)
- self.conn.commit()
- # 单条语句的不提交
- def execute_no_conmmit(self, sql_code):
- self.cursor.execute(sql_code)
- # 构造多条语句,使用%s参数化,对于每个list都进行替代构造
- def excute_many(self, sql_base, param_list):
- self.cursor.executemany(sql_base, param_list)
- # 批量执行(待完善)
- def batch_execute(self, sql_code):
- pass
- # 获取数据
- def get_data(self, sql_code, count=0):
- self.cursor.execute(sql_code)
- if int(count):
- return self.cursor.fetchmany(count)
- else:
- return self.cursor.fetchall()
- # 更新数据
- def updata_data(self, sql_code):
- self.cursor(sql_code)
- # 插入数据
- def insert_data(self, sql_code):
- self.cursor(sql_code)
- # 滚动游标
- def cursor_scroll(self, count, mode='relative'):
- self.cursor.scroll(count, mode=mode)
- # 提交
- def commit(self):
- self.conn.commit()
- # 回滚
- def rollback(self):
- self.conn.rollback()
- # 关闭连接
- def close_conn(self):
- self.cursor.close()
- self.conn.close()
- class ComplexEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- class LanqiuSpider(scrapy.Spider):
- name = "ball_status"
- allowed_domains = ['hg3535z.com']
- #sid要改为1 足球 现在测试改为4
- start_urls = ['https://hg3535z.com/odds2/d/getodds?sid=1&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0'] # 滚球菜单 篮球滚球列url
- custom_settings = {
- "ITEM_PIPELINES": {
- 'hg3535.pipelines.BallStatuspipeline':200,
- }
- }
- # start_urls = ['http://hg3535z.com/odds2/d/getodds?sid=2&pt=3&ubt=am&pn=0&sb=2&dc=null&pid=0']
- # http: // hg3535z.com / odds2 / d / getamodds?eid = 3098030 & iip = false & ubt = am & isp = false
- # http://hg3535z.com/odds2/d/getodds?sid=2&pt=2&ubt=am&pn=0&sb=2&dc=null&pid=0
- def parse(self, response):
- datas = json.loads(response.text)
- ids = jsonpath.jsonpath(datas, '$..i-ot[0]..egs..es..i[16]') # ids新列表
- item = Zuqiustatus()
- utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- # zuqiu_total = {}
- zuqiu_status_list = []
- if ids:
- ids = set(ids)
- for i in ids:
- zuqiu = {}
- zuqiu['match_id'] = i
- zuqiu['create_time'] = utime
- zuqiu['status'] = 1
- zuqiu['ball_type'] = datas['i-ot'][0]['s']['n']
- # item['match_id'] = i
- # item['create_time'] = utime
- # item['status'] = 1
- # item['ball_type'] = datas['i-ot'][0]['s']['n']
- # item['zuqiu_toal'] = zuqiu
- zuqiu_status_list.append(zuqiu)
- item["zuqiu_total"] = zuqiu_status_list
- yield item
- # urls = ['http://hg3535z.com/odds2/d/getodds?sid=2&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0''http://hg3535z.com/odds2/d/getodds?sid=3&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0','http://hg3535z.com/odds2/d/getodds?sid=4&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0']
- # for url in urls:
- # yield Request(url=url, callback=self.parse)
|