python非同期バッチ挿入sqliteデータベース(mysql同理)
5707 ワード
import pymysql
from scrapy.conf import settings
import time
import logging
from io import BytesIO
import base64
import re
from PIL import Image
import sqlite3
from twisted.enterprise import adbapi
from copy import deepcopy, copy
class BaidustreetpanoimgPipeline(object):
def __init__(self, sqlite_file, sqlite_table):
# mysql
self.conn = pymysql.connect(host=settings["MYSQL_HOST"],
user=settings["MYSQL_USER"],
password=settings["MYSQL_PASSWD"],
db=settings["MYSQL_DBNAME"],
port=settings["MYSQL_PORT"],
cursorclass=settings["MYSQL_CURSORCLASS"] # =
)
self.cursor = self.conn.cursor()
self.mysql_table_points = settings["MYSQL_TABLEPOINTS"]
# sqlite
self.sqlite_file = sqlite_file
self.sqlite_table = sqlite_table
# self.sqlite_conn = sqlite3.connect(self.sqlite_file)
# self.cur = self.sqlite_conn.cursor()
self.dbpool = adbapi.ConnectionPool('sqlite3',
database=self.sqlite_file,
check_same_thread=False)
@classmethod
def from_crawler(cls, crawler):
# sqlite_file ,self
return cls(
sqlite_file=crawler.settings.get('SQLITE_FILE'), # settings.py
sqlite_table=crawler.settings.get('SQLITE_TABLE', 'items')
)
def open_spider(self, spider):
# panoid img
if spider.table_name is not None:
self.mysql_table_points = spider.table_name
self.sqlite_table = spider.table_name
select_sql_url = "select * from {0} where TaskName=%s;".format(self.mysql_table_points)
self.cursor.execute(select_sql_url, (spider.task_name))
arr_all_panoid = self.cursor.fetchall()
for p in arr_all_panoid:
p.pop("TaskName")
p.pop("ImageWidth")
spider.panoid_list.append(p)
self.cursor.close()
self.conn.close()
logging.info("******** **********" + str(time.time()))
def close_spider(self, spider):
if len(spider.pgs) > 0:
logging.info("******** **********" + str(len(spider.pgs)))
bulkdata = copy(spider.pgs)
self.dbpool.runInteraction(self.bulk_insert_to_sqlite, spider.itemkeys, bulkdata)
# self.bulk_insert_to_sqlite(spider.itemkeys, spider.pgs)
#
del spider.pgs[:]
logging.info("******** **********" + str(time.time()))
def process_item(self, item, spider):
self.montage_pics(spider, item)
if len(spider.pgs) == 1000:
bulkdata = copy(spider.pgs)
self.dbpool.runInteraction(self.bulk_insert_to_sqlite, spider.itemkeys, bulkdata)
#
del spider.pgs[:]
return item
#
def bulk_insert_to_sqlite(self, tx, itemkeys, bulkdata):
try:
insert_sql = "insert into {0}({1}) values ({2})".format(self.sqlite_table,
', '.join(itemkeys),
', '.join(['?'] * len(itemkeys)))
tx.executemany(insert_sql, bulkdata)
except sqlite3.Error as why:
logging.info(why.args[0])
logging.info("*************** **************")
#
# def deal_pics(self, spider, item):
# # self.cur = self.sqlite_conn.cursor()
# self.montage_pics(spider, item)
# self.sqlite_conn.commit()
# # self.cur.close()
def montage_pics(self, spider, item):
# key
listkeys = spider.rds.keys(pattern="{}*".format(item["PanoID"]))
if len(listkeys) == 32:
# keys
listkeys.sort(key=lambda x: tuple(int(i) for i in re.findall('\d+', str(x))[-3:]))
# keys value
total_width = 8 * 512
total_height = 4 * 512
new_image = Image.new('RGB', (total_width, total_height))
x_offset = 0
y_offset = 0
count = 1
images = map(Image.open, [BytesIO(base64.b64decode(data)) for data in spider.rds.mget(listkeys)])
for subitem in images:
new_image.paste(subitem, (x_offset, y_offset))
x_offset += subitem.size[0]
if count % 8 == 0:
x_offset = 0
y_offset += subitem.size[0]
count = 1
else:
count += 1
imgByteArr = BytesIO()
# new_image.resize((3328, 1664), Image.ANTIALIAS)
# new_image.save(imgByteArr, format='JPEG') # quality
out = new_image.resize((3328, 1664), Image.ANTIALIAS)
out.save(imgByteArr, format='JPEG') # quality
imgByteArr = imgByteArr.getvalue()
new_image.close()
logging.info("*** ***")
item.update({'Pic': sqlite3.Binary(imgByteArr)})
if spider.itemkeys is None or spider.itemkeys == "":
spider.itemkeys = item.keys()
#
# tuple(item.values())
spider.pgs.append(tuple(item.values()))
# redis
if len(spider.rds.keys(pattern='{0}*'.format(item["PanoID"]))) > 0:
spider.rds.delete(*spider.rds.keys(pattern='{0}*'.format(item["PanoID"])))
logging.info("************ ***********" + str(len(spider.pgs)))