| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- # -*- coding: utf-8 -*-
- import jsonpath
- import scrapy
- from scrapy.http import Request
- import psycopg2
- import time
- from functools import wraps
- from contextlib import contextmanager
- import psycopg2.extras
- from scrapy.conf import settings
- import json
- from datetime import datetime
- from datetime import date
- import itertools
- import re
- from scrapy.xlib.pydispatch import dispatcher
- from scrapy import signals
- # 测试一个函数的运行时间,使用方式:在待测函数直接添加此修饰器
- def timethis(func):
- @wraps(func)
- def wrapper(*args, **kwargs):
- start = time.perf_counter()
- r = func(*args, **kwargs)
- end = time.perf_counter()
- print('\n============================================================')
- print('{}.{} : {}'.format(func.__module__, func.__name__, end - start))
- print('============================================================\n')
- return r
- return wrapper
- # 测试一段代码运行的时间,使用方式:上下文管理器with
- # with timeblock('block_name'):
- # your_code_block...
- @contextmanager
- def timeblock(label='Code'):
- start = time.perf_counter()
- try:
- yield
- finally:
- end = time.perf_counter()
- print('==============================================================')
- print('{} run time: {}'.format(label, end - start))
- print('==============================================================')
- class SqlConn():
- '''
- 连接数据库,以及进行一些操作的封装
- '''
- sql_name = ''
- database = ''
- user = ''
- password = ''
- port = 0
- host = ''
- # 创建连接、游标
- def __init__(self, *args, **kwargs):
- if kwargs.get("sql_name"):
- self.sql_name = kwargs.get("sql_name")
- if kwargs.get("database"):
- self.database = kwargs.get("database")
- if kwargs.get("user"):
- self.user = kwargs.get("user")
- if kwargs.get("password"):
- self.password = kwargs.get("password")
- if kwargs.get("port"):
- self.port = kwargs.get("port")
- if kwargs.get("host"):
- self.host = kwargs.get("host")
- if not (self.host and self.port and self.user and
- self.password and self.database):
- raise Warning("conn_error, missing some params!")
- sql_conn = {
- 'postgresql': psycopg2,
- }
- self.conn = sql_conn[self.sql_name].connect(host=self.host,
- port=self.port,
- user=self.user,
- password=self.password,
- database=self.database,
- )
- self.cursor = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
- # self.cursor = self.conn.cursor()
- if not self.cursor:
- raise Warning("conn_error!")
- # 测试连接
- def test_conn(self):
- if self.cursor:
- print("conn success!")
- else:
- print('conn error!')
- # 单条语句的并提交
- def execute(self, sql_code):
- self.cursor.execute(sql_code)
- self.conn.commit()
- # 单条语句的不提交
- def execute_no_conmmit(self, sql_code):
- self.cursor.execute(sql_code)
- # 构造多条语句,使用%s参数化,对于每个list都进行替代构造
- def excute_many(self, sql_base, param_list):
- self.cursor.executemany(sql_base, param_list)
- # 批量执行(待完善)
- def batch_execute(self, sql_code):
- pass
- # 获取数据
- def get_data(self, sql_code, count=0):
- self.cursor.execute(sql_code)
- if int(count):
- return self.cursor.fetchmany(count)
- else:
- return self.cursor.fetchall()
- # 更新数据
- def updata_data(self, sql_code):
- self.cursor.execute(sql_code)
- # 插入数据
- def insert_data(self, sql_code):
- self.cursor(sql_code)
- # 滚动游标
- def cursor_scroll(self, count, mode='relative'):
- self.cursor.scroll(count, mode=mode)
- # 提交
- def commit(self):
- self.conn.commit()
- # 回滚
- def rollback(self):
- self.conn.rollback()
- # 关闭连接
- def close_conn(self):
- self.cursor.close()
- self.conn.close()
- class ComplexEncoder(json.JSONEncoder):
- def default(self, obj):
- if isinstance(obj, datetime):
- return obj.strftime('%Y-%m-%d %H:%M:%S')
- elif isinstance(obj, date):
- return obj.strftime('%Y-%m-%d')
- else:
- return json.JSONEncoder.default(self, obj)
- class LanqiuSpider(scrapy.Spider):
- def __init__(self):
- super(LanqiuSpider).__init__()
- #信号量
- # dispatcher.connect(self.spider_opened, signals.spider_opened)
- dispatcher.connect(self.spider_closed, signals.spider_closed)
- # self.conn = SqlConn(sql_name='postgresql',host='127.0.0.1',port=5432,user='postgres',password='9998877',database='postgres')
- # database = PostgresqlDatabase('kaiyou',**{'host': '192.168.2.200', 'port': 10432, 'user': 'kaiyou', 'password': '123456'})
- # self.conn = SqlConn(sql_name='postgresql',host='192.168.2.200',port=10432,user='kaiyou',password='123456',database='kaiyou')
- self.conn = SqlConn(sql_name='postgresql',host=settings["POST_HOST"], port=settings['POST_PORT'], user=settings["POST_USER"],password=settings["POST_PASSWORD"],database=settings["POST_DATABASE"])
- name = "ball_status_update"
- allowed_domains = ['hg3535z.com']
- #sid要改为1 足球 现在测试改为4
- start_urls = ['https://hg3535z.com/odds2/d/getodds?sid=3&pt=4&ubt=am&pn=0&sb=2&dc=null&pid=0'] # 滚球菜单 篮球滚球列url
- # custom_settings = {
- # "ITEM_PIPELINES": {
- # 'scrapy_yzd.pipelines.BallStatuspipeline':1,
- # }
- # }
- # start_urls = ['http://hg3535z.com/odds2/d/getodds?sid=2&pt=3&ubt=am&pn=0&sb=2&dc=null&pid=0']
- # http: // hg3535z.com / odds2 / d / getamodds?eid = 3098030 & iip = false & ubt = am & isp = false
- # http://hg3535z.com/odds2/d/getodds?sid=2&pt=2&ubt=am&pn=0&sb=2&dc=null&pid=0
- def parse(self, response):
- # a = SqlConn(sql_name='postgresql',host='127.0.0.1',port=5432,user='postgres',password='9998877',database='postgres')
- # a.test_conn()
- b = self.conn.get_data("select match_id from st_ball_status where ball_type='足球'")
- d = list(itertools.chain(*b))
- # yield Request(url='http://hg3535z.com/odds2/d/getamodds?eid=3147927&iip=true&ubt=am&isp=false',callback=self.parse_each)
- for i in d:
- urls = 'https://hg3535z.com/odds2/d/getamodds?eid={}&iip=true&ubt=am&isp=false'.format(i)
- yield Request(url=urls,callback=self.parse_each)
- #取得url中的id字段
- def re_str(self,url_str):
- a = (re.findall(r"eid=(.+?)&",url_str))
- result = "".join(a)
- return result
- def parse_each(self,response):
- # print("URL: " + response.request.url)
- if response.text != "null":
- res = json.loads(response.text)
- # res = {"i":['false',1,3149430,4,2,"sh",0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,"28 / 04","99:00","下半场",'false','true',"足球",0],"eg":{"c":{"k":27078,"n":"罗马尼亚乙组联赛"},"es":[{"dbg":'true',"egid":0,"g":"","i":["阿格斯","法乌尔","1","True","28 / 04","","1","False","0","0","0","1","FT","","","",0,"0","1",0,"False"],"ibs":'true',"ibsc":'true',"lcd":{"lcpid":5,"ilup":'false',"lcid":3147921,"p":0},"k":3147921,"o":{},"pci":{"ctid":0},"egn":""},{"dbg":'false',"egid":0,"g":"","i":["阿格斯-角球数","法乌尔-角球数","1","True","28 / 04","99:00","1","False","0","0","4","2","下半场","Second Half","","",0,"","",0,"False"],"ibs":'true',"ibsc":'false',"k":3149430,"o":{"oe":{"s":192,"v":["o3239030643","0.00","o3239030644","0.00"],"n":"角球:滚球 单 / 双"}},"pci":{"ctid":1,"ctn":"角球"},"egn":""}]},"ot":2,"sc":{"3147921":{"a":1,"h":0},"3149430":{"a":2,"h":4}},"v":567862}
- # res1 = jsonpath.jsonpath(res,'$..eg..es[0]..o')
- res1 = jsonpath.jsonpath(res,'$..eg..es[:]..o')
- if len(res1) > 1:
- print("这是有角球啊")
- o_dict0 = res1[0] # 递归取o字典
- o_dict1 = res1[1]
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print("我是角球id是")
- print(res_id)
- if o_dict0 or o_dict1:
- print("这不是个空字典")
- print("我不做任何操作的啊")
- # re_url = response.request.url
- # res_id = self.re_str(re_url)
- # print(res_id)
- # utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- # a = SqlConn(sql_name='postgresql',host='127.0.0.1',port=5432,user='postgres',password='9998877',database='postgres')
- # self.conn.updata_data("update st_ball_status set status=0, update_time='{0}' where match_id={1}".format(utime,res_id))
- # sql1 = "insert into comendnotice(status, game_code, match_id,done_time) values (%s,%s, %s, %s) on conflict(match_id) do update set done_time = %s"
- # self.conn.cursor.execute(sql1,(4,'zq',res_id,utime,utime))
- # cursor.execute(sql1, (match_id, create_time,staus,update_time, ball_type,update_time))
- # self.conn.commit()
- else:
- o_dict0 = res1[0]
- if not o_dict0:
- print("这是空字典我要改状态")
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print(res_id)
- utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- # a = SqlConn(sql_name='postgresql',host='127.0.0.1',port=5432,user='postgres',password='9998877',database='postgres')
- self.conn.updata_data("update st_ball_status set status=0, update_time='{0}' where match_id={1}".format(utime,res_id))
- # sql1 = "insert into comendnotice(status, game_code, match_id,done_time) values (%s,%s, %s, %s) on conflict(match_id) do update set done_time = %s"
- self.conn.updata_data("update st_zq_result set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_result_record set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_competition set status=2 where match_id={}".format(res_id))
- # self.conn.cursor.execute(sql1,(4,'zq',res_id,utime,utime))
- # cursor.execute(sql1, (match_id, create_time,staus,update_time, ball_type,update_time))
- self.conn.commit()
- # if o_dict:
- # print("有数据")
- # else:
- # print("没数据")
- if response.text == "null":
- print("暂时没有数据")
- re_url = response.request.url
- res_id = self.re_str(re_url)
- print(res_id)
- utime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
- # a = SqlConn(sql_name='postgresql',host='127.0.0.1',port=5432,user='postgres',password='9998877',database='postgres')
- self.conn.updata_data("update st_ball_status set status=0, update_time='{0}' where match_id={1}".format(utime,res_id))
- sql1 = "insert into comendnotice(status, game_code, match_id,done_time) values (%s,%s, %s, %s) on conflict(match_id) do update set done_time = %s"
- self.conn.cursor.execute(sql1,(4,'zq',res_id,utime,utime))
- self.conn.updata_data("update st_zq_result set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_result_record set status=2 where match_id={}".format(res_id))
- self.conn.updata_data("update st_zq_competition set status=2 where match_id={}".format(res_id))
- # cursor.execute(sql1, (match_id, create_time,staus,update_time, ball_type,update_time))
- self.conn.commit()
- # a.close_conn()
- # def spider_opened(self, spider):
- def spider_closed(self, spider):
- print("我要关闭了")
- self.conn.close_conn()
- # new_datas = json.loads(response.text).get('eg', "")
- # if new_datas:
- # new_data = new_datas.get("es", "")
- # result = new_data[0]
- # o_dic = result['o']
- # if response.text == "null" or not o_dic:
- # print("没有数据")
- # print(response.body)
- # if response.text == "null":
- # print("这个网页没有数据的")
|