python非同期バッチ挿入sqliteデータベース(mysql同理)

5707 ワード

import pymysql
from scrapy.conf import settings
import time
import logging
from io import BytesIO
import base64
import re
from PIL import Image
import sqlite3
from twisted.enterprise import adbapi
from copy import deepcopy, copy


class BaidustreetpanoimgPipeline(object):
    def __init__(self, sqlite_file, sqlite_table):
        # mysql
        self.conn = pymysql.connect(host=settings["MYSQL_HOST"],
                                    user=settings["MYSQL_USER"],
                                    password=settings["MYSQL_PASSWD"],
                                    db=settings["MYSQL_DBNAME"],
                                    port=settings["MYSQL_PORT"],
                                    cursorclass=settings["MYSQL_CURSORCLASS"]  #           =    
                                    )
        self.cursor = self.conn.cursor()
        self.mysql_table_points = settings["MYSQL_TABLEPOINTS"]
        # sqlite
        self.sqlite_file = sqlite_file
        self.sqlite_table = sqlite_table
        # self.sqlite_conn = sqlite3.connect(self.sqlite_file)
        # self.cur = self.sqlite_conn.cursor()
        self.dbpool = adbapi.ConnectionPool('sqlite3',
                                            database=self.sqlite_file,
                                            check_same_thread=False)

@classmethod
def from_crawler(cls, crawler):
    #    sqlite_file      ,self     
    return cls(
        sqlite_file=crawler.settings.get('SQLITE_FILE'),  #   settings.py   
        sqlite_table=crawler.settings.get('SQLITE_TABLE', 'items')
    )

def open_spider(self, spider):
    #         panoid img
    if spider.table_name is not None:
        self.mysql_table_points = spider.table_name
        self.sqlite_table = spider.table_name
    select_sql_url = "select * from {0} where TaskName=%s;".format(self.mysql_table_points)
    self.cursor.execute(select_sql_url, (spider.task_name))
    arr_all_panoid = self.cursor.fetchall()
    for p in arr_all_panoid:
        p.pop("TaskName")
        p.pop("ImageWidth")
        spider.panoid_list.append(p)
    self.cursor.close()
    self.conn.close()
    logging.info("********      **********" + str(time.time()))

def close_spider(self, spider):
    if len(spider.pgs) > 0:
        logging.info("********     **********" + str(len(spider.pgs)))
        bulkdata = copy(spider.pgs)
        self.dbpool.runInteraction(self.bulk_insert_to_sqlite, spider.itemkeys, bulkdata)
        # self.bulk_insert_to_sqlite(spider.itemkeys, spider.pgs)
        #      
        del spider.pgs[:]
    logging.info("********      **********" + str(time.time()))

def process_item(self, item, spider):
    self.montage_pics(spider, item)
    if len(spider.pgs) == 1000:
        bulkdata = copy(spider.pgs)
        self.dbpool.runInteraction(self.bulk_insert_to_sqlite, spider.itemkeys, bulkdata)
        #      
        del spider.pgs[:]
    return item

#     
def bulk_insert_to_sqlite(self, tx, itemkeys, bulkdata):
    try:
        insert_sql = "insert into {0}({1}) values ({2})".format(self.sqlite_table,
                                                                ', '.join(itemkeys),
                                                                ', '.join(['?'] * len(itemkeys)))
        tx.executemany(insert_sql, bulkdata)
    except sqlite3.Error as why:
        logging.info(why.args[0])
    logging.info("***************      **************")

#        
# def deal_pics(self, spider, item):
#     # self.cur = self.sqlite_conn.cursor()
#     self.montage_pics(spider, item)
#     self.sqlite_conn.commit()
#     # self.cur.close()

def montage_pics(self, spider, item):
    #       key  
    listkeys = spider.rds.keys(pattern="{}*".format(item["PanoID"]))
    if len(listkeys) == 32:
        # keys  
        listkeys.sort(key=lambda x: tuple(int(i) for i in re.findall('\d+', str(x))[-3:]))
        #   keys      value
        total_width = 8 * 512
        total_height = 4 * 512
        new_image = Image.new('RGB', (total_width, total_height))
        x_offset = 0
        y_offset = 0
        count = 1
        images = map(Image.open, [BytesIO(base64.b64decode(data)) for data in spider.rds.mget(listkeys)])
        for subitem in images:
            new_image.paste(subitem, (x_offset, y_offset))
            x_offset += subitem.size[0]
            if count % 8 == 0:
                x_offset = 0
                y_offset += subitem.size[0]
                count = 1
            else:
                count += 1
        imgByteArr = BytesIO()
        # new_image.resize((3328, 1664), Image.ANTIALIAS)
        # new_image.save(imgByteArr, format='JPEG')  # quality
        out = new_image.resize((3328, 1664), Image.ANTIALIAS)
        out.save(imgByteArr, format='JPEG')  # quality
        imgByteArr = imgByteArr.getvalue()
        new_image.close()
        logging.info("***      ***")
        item.update({'Pic': sqlite3.Binary(imgByteArr)})
        if spider.itemkeys is None or spider.itemkeys == "":
            spider.itemkeys = item.keys()
        #          
        # tuple(item.values())
        spider.pgs.append(tuple(item.values()))
        #   redis    
        if len(spider.rds.keys(pattern='{0}*'.format(item["PanoID"]))) > 0:
            spider.rds.delete(*spider.rds.keys(pattern='{0}*'.format(item["PanoID"])))
            logging.info("************         ***********" + str(len(spider.pgs)))