| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- # -*- coding: utf-8 -*-
- import jsonpath
- import scrapy
- from scrapy.http import Request
- import psycopg2
- import time
- from functools import wraps
- from contextlib import contextmanager
- import psycopg2.extras
- from scrapy.conf import settings
- import json
- from datetime import datetime
- from datetime import date
- import itertools
- import re
- from scrapy.xlib.pydispatch import dispatcher
- from scrapy import signals
- # 测试一个函数的运行时间,使用方式:在待测函数直接添加此修饰器
- def timethis(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- r = func(*args, **kwargs)
- end = time.perf_counter()
- print('\n============================================================')
- print('{}.{} : {}'.format(func.__module__, func.__name__, end - start))
- print('============================================================\n')
- return r
- return wrapper
- # 测试一段代码运行的时间,使用方式:上下文管理器with
- # with timeblock('block_name'):
- # your_code_block...
- @contextmanager
- def timeblock(label='Code'):
- start = time.perf_counter()
- try:
- yield
- finally:
- end = time.perf_counter()
- print('==============================================================')
- print('{} run time: {}'.format(label, end - start))
- print('==============================================================')
- class SqlConn():
- '''
- 连接数据库,以及进行一些操作的封装
- '''
- sql_name = ''
- database = ''
- user = ''
- password = ''
- port = 0
- host = ''
- # 创建连接、游标
- def __init__(self, *args, **kwargs):
- if kwargs.get("sql_name"):
- self.sql_name = kwargs.get("sql_name")
- if kwargs.get("database"):
- self.database = kwargs.get("database")
- if kwargs.get("user"):
- self.user = kwargs.get("user")
- if kwargs.get("password"):
- self.password = kwargs.get("password")
- if kwargs.get("port"):
- self.port = kwargs.get("port")
- if kwargs.get("host"):
- self.host = kwargs.get("host")
- if not (self.host and self.port and self.user and
- self.password and self.database):
- raise Warning("conn_error, missing some params!")
- sql_conn = {
- 'postgresql': psycopg2,
- }
- self.conn = sql_conn[self.sql_name].connect(host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- )
- self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- # self.cursor = self.conn.cursor()
- if not self.cursor:
- raise Warning("conn_error!")
- # 测试连接
- def test_conn(self):
- if self.cursor:
- print("conn success!")
- else:
- print('conn error!')
- # 单条语句的并提交
- def execute(self, sql_code):
- self.cursor.execute(sql_code)
- self.conn.commit()
- # 单条语句的不提交
- def execute_no_conmmit(self, sql_code):
- self.cursor.execute(sql_code)
- # 构造多条语句,使用%s参数化,对于每个list都进行替代构造
- def excute_many(self, sql_base, param_list):
- self.cursor.executemany(sql_base, param_list)
- # 批量执行(待完善)
- def batch_execute(self, sql_code):
- pass
- # 获取数据
- def get_data(self, sql_code, count=0):
- self.cursor.execute(sql_code)
- if int(count):
- return self.cursor.fetchmany(count)
- else:
- return self.cursor.fetchall()
- # 更新数据
- def updata_data(self, sql_code):
- self.cursor.execute(sql_code)
- # 插入数据
- def insert_data(self, sql_code):
- self.cursor(sql_code)
- # 滚动游标
- def cursor_scroll(self, count, mode='relative'):
- self.cursor.scroll(count, mode=mode)
- # 提交
- def commit(self):
- self.conn.commit()
- # 回滚
- def rollback(self):
- self.conn.rollback()
- # 关闭连接
- def close_conn(self):
- self.cursor.close()
- self.conn.close()
- class ComplexEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- class LanqiuSpider(scrapy.Spider):
- def __init__(self):
- super(LanqiuSpider).__init__()
- #信号量
- dispatcher.connect(self.spider_closed, signals.spider_closed)
- self.conn = SqlConn(sql_name='postgresql',host=settings["POST_HOST"], port=settings['POST_PORT'], user=settings["POST_USER"],password=settings["POST_PASSWORD"],database=settings["POST_DATABASE"])
- name = "ball_status_update"
- allowed_domains = ['hg3535z.com']
- #sid要改为1 足球 现在测试改为4
- start_urls = ['https://hg3535z.com/odds2/d/getodds?sid=3&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0'] # 滚球菜单 篮球滚球列url
- def parse(self, response):
- b = self.conn.get_data("select match_id from st_ball_status where ball_type='足球'")
- d = list(itertools.chain(*b))
- for i in d:
- urls = 'https://hg3535z.com/odds2/d/getamodds?eid={}&iip=true&ubt=am&isp=false'.format(i)
- yield Request(url=urls,callback=self.parse_each, dont_filter=True)
- #取得url中的id字段
- def re_str(self,url_str):
- a = (re.findall(r"eid=(.+?)&",url_str))
- result = "".join(a)
- return result
- def parse_each(self,response):
- if response.text != "null":
- res = json.loads(response.text)
- res1 = jsonpath.jsonpath(res,'$..eg..es[:]..o')
- if len(res1) > 1:
- print("这是有角球啊")
- o_dict0 = res1[0] # 递归取o字典
- o_dict1 = res1[1]
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print("我是角球id是")
- print(res_id)
- if o_dict0 or o_dict1:
- print("这不是个空字典")
- print("我不做任何操作的啊")
- else:
- o_dict0 = res1[0]
- if not o_dict0:
- print("这是空字典我要改状态")
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print(res_id)
- utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- self.conn.updata_data("update st_ball_status set status=0, update_time='{0}' where match_id={1}".format(utime,res_id))
- self.conn.updata_data("update st_zq_result set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_result_record set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_competition set status=2 where match_id={}".format(res_id))
- self.conn.commit()
- if response.text == "null":
- print("暂时没有数据")
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print(res_id)
- utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- self.conn.updata_data("update st_ball_status set status=0, update_time='{0}' where match_id={1}".format(utime,res_id))
- sql1 = "insert into comendnotice(status, game_code, match_id,done_time) values (%s,%s, %s, %s) on conflict(match_id) do update set done_time = %s"
- self.conn.cursor.execute(sql1,(0,'zq',res_id,utime,utime))
- self.conn.updata_data("update st_zq_result set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_result_record set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_competition set status=2 where match_id={}".format(res_id))
- # cursor.execute(sql1, (match_id, create_time,staus,update_time, ball_type,update_time))
- self.conn.commit()
- def spider_closed(self, spider):
- print("我要关闭了")
- self.conn.close_conn()
|