python mysqlが格納したファイルパスを読み取ってファイルをダウンロードし、内容解析、七牛雲をアップロードし、内容はesに入る
43748 ワード
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser
import json
import os
import re
from re import sub
import sys
import time
import requests
from pdfminer.converter import PDFPageAggregator
from pdfminer.layout import LTTextBoxHorizontal, LAParams
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfpage import PDFTextExtractionNotAllowed
from pdfminer.pdfparser import PDFParser
from qiniu import Auth
from qiniu import etag
from qiniu import put_file
import log_config
from OP_Mysql import get_connection
from HTMLParser import HTMLParser
import random
reload(sys)
sys.setdefaultencoding('utf-8')
logger = log_config.getlogger('analysis_neeq_content', 'analysis_neeq_content.log')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
neeq_remainder = conf.get("basic_config", "neeq_remainder")
neeq_server_num = conf.get("basic_config", "neeq_server_num")
neeq_start_id = conf.get("basic_config", "neeq_start_id")
neeq_json_path = conf.get("basic_config", "neeq_json_path")
neeq_json = conf.get("basic_config", "neeq_json")
json_suffix = '.json'
neeq_id = conf.get("basic_config", "neeq_id")
neeq_file_path = conf.get("basic_config", "neeq_file_path")
access_key = conf.get("basic_config", "access_key")
secret_key = conf.get("basic_config", "secret_key")
bucket = conf.get("basic_config", "bucket")
class analysis:
def __init__(self):
#
self.count = 0
self.neeq_json = neeq_json
self.headers = {'Host': 'www.neeq.com.cn',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
}
self.create_init_dirtory()
#
def create_init_dirtory(self):
if not os.path.exists(neeq_json_path):
os.makedirs(neeq_json_path)
if not os.path.exists(neeq_file_path):
os.makedirs(neeq_file_path)
# mysql
def get_data(self):
with get_connection() as db:
# SQL
count = r"SELECT COUNT(*) as num FROM ssb_insight_neeq WHERE pro_status = 0 AND neeq_id <= %s and %s = (neeq_id %% %s)"
logger.info("now excute sql script sql = %s" % count)
try:
#
db.cursor.execute(count, [neeq_start_id, neeq_remainder, neeq_server_num])
counts = db.cursor.fetchall()
num = counts[0]['num']
logger.info("now rows num = %s" % num)
if 0 != num % 1000:
pages = num / 1000 + 1
else:
pages = num / 1000
start_rows = 1000
for i in range(0, pages):
start_page = i * 1000
sql = "SELECT t.sec_code,t.sec_name,t.title,t.doc_type,t.doc_type_key,c.industry1,c.industry2," \
"t.url,t.public_time,t.content,t.pro_status,t.module,t.es_id FROM ssb_insight_neeq t " \
"LEFT JOIN ssb_d_listed_company c ON t.sec_code = c.secCode WHERE t.pro_status = 0 and t.neeq_id <= %s " \
"AND %s = (t.neeq_id %% %s) ORDER BY t.neeq_id DESC LIMIT %s ,%s"
db.cursor.execute(sql, [neeq_start_id, neeq_remainder, neeq_server_num, start_page, start_rows])
result_datas = db.cursor.fetchall()
# 1000
json_data = []
es_id_file_addr = []
for row in result_datas:
item = {}
es_obj = {}
result = {'secCode': row['sec_code'],
'secName': row['sec_name'],
'title': row['title'],
'docType': row['doc_type'].split(','),
'docTypeKey': row['doc_type_key'].split(','),
'url': row['url'],
'publicTime': row['public_time'],
'industry1': row['industry1'],
'industry2': row['industry2'],
'content': row['content'],
'proStatus': bool(row['pro_status']),
'module': row['module'],
}
file_url = row['url']
self.download_file(file_url)
file_name = re.findall(r".*/(.*)", file_url)[0]
file_paths = neeq_file_path + file_name
if os.path.exists(file_paths):
content = self.analysis_file_content(file_paths)
self.upload_qiniu(file_paths)
self.del_file(file_paths)
if content == '':
continue
result['content'] = content
else:
logger.warn("file_url %s download fail" % file_url)
continue
item['id'] = row['es_id']
item['data'] = result
json_data.append(item)
es_obj['es_id'] = row['es_id']
es_obj['file_addr'] = file_paths
es_id_file_addr.append(es_obj)
self.write_json_file(json_data)
self.write_es_id_file_addr(es_id_file_addr)
except Exception as e:
logger.error("Error: unable to fecth data Exception %s" % e)
def write_json_file(self, json_data):
#
json_path = neeq_json_path + self.neeq_json + json_suffix
rows = self.get_json_rows(json_path)
if rows > 100000:
self.count = self.count + 1
self.neeq_json = neeq_json + str(self.count)
json_path = neeq_json_path + self.neeq_json + json_suffix
with open(json_path, 'a') as es_file:
for jsonitem in json_data:
jsondatar = json.dumps(jsonitem, ensure_ascii=True)
es_file.write(jsondatar+"
")
def write_es_id_file_addr(self, es_id_data):
# es_id,
with open(neeq_id, 'a') as es_id_file:
for jsonitem in es_id_data:
es_id_file.write(jsonitem['es_id']+","+jsonitem['file_addr']+";"+"
")
# json ,
def get_json_rows(self, json_path):
count = 0
if not os.path.exists(json_path):
return 0
thefile = open(json_path, 'rb')
while True:
buffer = thefile.read(8192 * 1024)
if not buffer:
break
count += buffer.count('
')
thefile.close()
return count
#
def upload_qiniu(self, file_path_name):
q = Auth(access_key, secret_key)
# Token,
token = q.upload_token(bucket, file_path_name, 3600)
#
ret, info = put_file(token, file_path_name, file_path_name)
# logger.info(info)
if info.status_code != 200:
logger.info("file upload qiniuyun fail %s" % file_path_name)
#
def del_file(self, file_path_name):
if os.path.exists(file_path_name):
os.remove(file_path_name)
else:
logger.info("%s " % file_path_name)
#
def download_file(self, file_url):
time.sleep(random.uniform(1, 2))
retry = 0
try:
while retry < 3:
file_name = re.findall(r".*/(.*)", file_url)[0]
response = requests.get(file_url, stream=True, headers=self.headers, timeout=5)
if response.status_code == requests.codes.ok:
with open(neeq_file_path + file_name, "wb") as code:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
code.write(chunk)
break
except Exception as e:
logger.exception(e)
retry += 1
#
def analysis_file_content(self, filename):
content = ''
fenzhihouzhui = re.findall(r'.*(\..*)', str(filename))[0]
if fenzhihouzhui == '.pdf' or fenzhihouzhui == '.PDF':
content = self.analysis_pdf_file_content(filename)
elif fenzhihouzhui == '.html' or fenzhihouzhui == '.HTML':
content = self.analysi_html_file_content(filename)
return content
def analysis_pdf_file_content(self, filename):
content = ''
try:
fileobject = open(filename, 'rb')
parser = PDFParser(fileobject)
document = PDFDocument(parser)
if not document.is_extractable:
raise PDFTextExtractionNotAllowed
else:
rsrcmgr = PDFResourceManager()
laparams = LAParams()
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
interpreter = PDFPageInterpreter(rsrcmgr, device)
for page in PDFPage.create_pages(document):
interpreter.process_page(page)
layout = device.get_result()
for x in layout:
if isinstance(x, LTTextBoxHorizontal):
results = x.get_text().encode('utf-8')
content += results
fileobject.close()
except Exception as e:
logger.error("analysis pdf file fail : %s" % e)
return content
def analysi_html_file_content(self, filename):
content_open = open(filename, 'rb')
contents = content_open.read()
print contents
contents = dehtml(contents)
class pythonNToTxt(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.__text = []
def handle_data(self, data):
text = data.strip()
if len(text) > 0:
text = sub('[ \t\r
]+', ' ', text)
self.__text.append(text + ' ')
def handle_starttag(self, tag, attrs):
if tag == 'p':
self.__text.append('
')
elif tag == 'br':
self.__text.append('
')
def handle_startendtag(self, tag, attrs):
if tag == 'br':
self.__text.append('
')
def text(self):
return ''.join(self.__text).strip()
def dehtml(text):
try:
parser = pythonNToTxt()
parser.feed(text)
parser.close()
return parser.text()
except Exception as e:
logger.error("html analysis excepiton : %s" % e)
return text
logger.info("analysis neeq content start,now params neeq_remainder=%s,neeq_start_id =%s,neeq_json = %s,neeq_id = %s ,neeq_file_path = %s" % (neeq_remainder, neeq_start_id, neeq_json, neeq_id, neeq_file_path))
analysis = analysis()
analysis.get_data()
#!/usr/bin/env python
# -*- coding: utf-8 -*
import sys
import log_config
import ConfigParser
import pymysql
from DBUtils.PooledDB import PooledDB
reload(sys)
sys.setdefaultencoding('utf-8')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
user = conf.get("mysql", "user")
password = conf.get("mysql", "password")
database = conf.get("mysql", "database")
host = conf.get("mysql", "host")
port = conf.get("mysql", "port")
charset = "utf8"
class OPMysql(object):
__pool = None
def __init__(self):
# , 、
pass
def __enter__(self):
self.conn = self.getmysqlconn()
self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return self
def __exit__(self, typeq, value, trace):
self.cursor.close()
self.conn.close()
#
@staticmethod
def getmysqlconn():
if OPMysql.__pool is None:
__pool_a = PooledDB(creator=pymysql, mincached=1, maxcached=10, host=host, user=user, passwd=password, db=database, port=int(port), charset=charset)
OPMysql.__pool = __pool_a
return OPMysql.__pool.connection()
def get_connection():
return OPMysql()
ログモジュールは前のエッセイで
#------mysql basic config
[mysql]
user=
password=
database=
host= mysqlIp
port =3306
[basic_config]
#---------------neeq config
# 0
neeq_remainder = 0
#
neeq_server_num = 6
neeq_start_id = 1000
neeq_json_path = neeq/json/
neeq_json = neeq
neeq_id = neeq/neeq_id.txt
neeq_file_path = neeq/file/
bucket = bucket
access_key = access_key
secret_key = secret_key
転載先:https://www.cnblogs.com/keepMoveForevery/p/10400131.html