python mysqlが格納したファイルパスを読み取ってファイルをダウンロードし、内容解析、七牛雲をアップロードし、内容はesに入る

43748 ワード

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ConfigParser
import json
import os
import re
from re import sub
import sys
import time
import requests
from pdfminer.converter import PDFPageAggregator
from pdfminer.layout import LTTextBoxHorizontal, LAParams
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfpage import PDFTextExtractionNotAllowed
from pdfminer.pdfparser import PDFParser
from qiniu import Auth
from qiniu import etag
from qiniu import put_file
import log_config
from OP_Mysql import get_connection
from HTMLParser import HTMLParser
import random

reload(sys)
sys.setdefaultencoding('utf-8')
logger = log_config.getlogger('analysis_neeq_content', 'analysis_neeq_content.log')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
neeq_remainder = conf.get("basic_config", "neeq_remainder")
neeq_server_num = conf.get("basic_config", "neeq_server_num")
neeq_start_id = conf.get("basic_config", "neeq_start_id")
neeq_json_path = conf.get("basic_config", "neeq_json_path")
neeq_json = conf.get("basic_config", "neeq_json")
json_suffix = '.json'
neeq_id = conf.get("basic_config", "neeq_id")
neeq_file_path = conf.get("basic_config", "neeq_file_path")
access_key = conf.get("basic_config", "access_key")
secret_key = conf.get("basic_config", "secret_key")
bucket = conf.get("basic_config", "bucket")


class analysis:
    def __init__(self):
        #       
        self.count = 0
        self.neeq_json = neeq_json
        self.headers = {'Host': 'www.neeq.com.cn',
                        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
                        }
        self.create_init_dirtory()

    #        
    def create_init_dirtory(self):
        if not os.path.exists(neeq_json_path):
            os.makedirs(neeq_json_path)
        if not os.path.exists(neeq_file_path):
            os.makedirs(neeq_file_path)

    # mysql     
    def get_data(self):
        with get_connection() as db:
            # SQL     
            count = r"SELECT COUNT(*) as num FROM ssb_insight_neeq WHERE pro_status = 0 AND neeq_id <= %s and %s = (neeq_id %% %s)"
            logger.info("now excute sql script sql = %s" % count)
            try:
                #         
                db.cursor.execute(count, [neeq_start_id, neeq_remainder, neeq_server_num])
                counts = db.cursor.fetchall()
                num = counts[0]['num']
                logger.info("now rows num = %s" % num)
                if 0 != num % 1000:
                    pages = num / 1000 + 1
                else:
                    pages = num / 1000
                start_rows = 1000
                for i in range(0, pages):
                    start_page = i * 1000

                    sql = "SELECT t.sec_code,t.sec_name,t.title,t.doc_type,t.doc_type_key,c.industry1,c.industry2," \
                          "t.url,t.public_time,t.content,t.pro_status,t.module,t.es_id FROM ssb_insight_neeq t " \
                          "LEFT JOIN ssb_d_listed_company c ON t.sec_code = c.secCode WHERE t.pro_status = 0 and t.neeq_id <= %s " \
                          "AND %s = (t.neeq_id %% %s) ORDER BY t.neeq_id DESC LIMIT %s ,%s"

                    db.cursor.execute(sql, [neeq_start_id, neeq_remainder, neeq_server_num, start_page, start_rows])
                    result_datas = db.cursor.fetchall()
                    # 1000        
                    json_data = []
                    es_id_file_addr = []
                    for row in result_datas:
                        item = {}
                        es_obj = {}
                        result = {'secCode': row['sec_code'],
                                  'secName': row['sec_name'],
                                  'title': row['title'],
                                  'docType': row['doc_type'].split(','),
                                  'docTypeKey': row['doc_type_key'].split(','),
                                  'url': row['url'],
                                  'publicTime': row['public_time'],
                                  'industry1': row['industry1'],
                                  'industry2': row['industry2'],
                                  'content': row['content'],
                                  'proStatus': bool(row['pro_status']),
                                  'module': row['module'],
                                  }
                        file_url = row['url']
                        self.download_file(file_url)
                        file_name = re.findall(r".*/(.*)", file_url)[0]
                        file_paths = neeq_file_path + file_name
                        if os.path.exists(file_paths):
                            content = self.analysis_file_content(file_paths)
                            self.upload_qiniu(file_paths)
                            self.del_file(file_paths)
                            if content == '':
                                continue
                            result['content'] = content

                        else:
                            logger.warn("file_url %s download fail" % file_url)
                            continue
                        item['id'] = row['es_id']
                        item['data'] = result
                        json_data.append(item)
                        es_obj['es_id'] = row['es_id']
                        es_obj['file_addr'] = file_paths
                        es_id_file_addr.append(es_obj)
                    self.write_json_file(json_data)
                    self.write_es_id_file_addr(es_id_file_addr)
            except Exception as e:
                logger.error("Error: unable to fecth data Exception %s" % e)

    def write_json_file(self, json_data):
        #    
        json_path = neeq_json_path + self.neeq_json + json_suffix
        rows = self.get_json_rows(json_path)
        if rows > 100000:
            self.count = self.count + 1
            self.neeq_json = neeq_json + str(self.count)
            json_path = neeq_json_path + self.neeq_json + json_suffix
        with open(json_path, 'a') as es_file:
            for jsonitem in json_data:
                jsondatar = json.dumps(jsonitem, ensure_ascii=True)
                es_file.write(jsondatar+"
") def write_es_id_file_addr(self, es_id_data): # es_id, with open(neeq_id, 'a') as es_id_file: for jsonitem in es_id_data: es_id_file.write(jsonitem['es_id']+","+jsonitem['file_addr']+";"+"
") # json , def get_json_rows(self, json_path): count = 0 if not os.path.exists(json_path): return 0 thefile = open(json_path, 'rb') while True: buffer = thefile.read(8192 * 1024) if not buffer: break count += buffer.count('
') thefile.close() return count # def upload_qiniu(self, file_path_name): q = Auth(access_key, secret_key) # Token, token = q.upload_token(bucket, file_path_name, 3600) # ret, info = put_file(token, file_path_name, file_path_name) # logger.info(info) if info.status_code != 200: logger.info("file upload qiniuyun fail %s" % file_path_name) # def del_file(self, file_path_name): if os.path.exists(file_path_name): os.remove(file_path_name) else: logger.info("%s " % file_path_name) # def download_file(self, file_url): time.sleep(random.uniform(1, 2)) retry = 0 try: while retry < 3: file_name = re.findall(r".*/(.*)", file_url)[0] response = requests.get(file_url, stream=True, headers=self.headers, timeout=5) if response.status_code == requests.codes.ok: with open(neeq_file_path + file_name, "wb") as code: for chunk in response.iter_content(chunk_size=1024): if chunk: code.write(chunk) break except Exception as e: logger.exception(e) retry += 1 # def analysis_file_content(self, filename): content = '' fenzhihouzhui = re.findall(r'.*(\..*)', str(filename))[0] if fenzhihouzhui == '.pdf' or fenzhihouzhui == '.PDF': content = self.analysis_pdf_file_content(filename) elif fenzhihouzhui == '.html' or fenzhihouzhui == '.HTML': content = self.analysi_html_file_content(filename) return content def analysis_pdf_file_content(self, filename): content = '' try: fileobject = open(filename, 'rb') parser = PDFParser(fileobject) document = PDFDocument(parser) if not document.is_extractable: raise PDFTextExtractionNotAllowed else: rsrcmgr = PDFResourceManager() laparams = LAParams() device = PDFPageAggregator(rsrcmgr, laparams=laparams) interpreter = PDFPageInterpreter(rsrcmgr, device) for page in PDFPage.create_pages(document): interpreter.process_page(page) layout = device.get_result() for x in layout: if isinstance(x, LTTextBoxHorizontal): results = x.get_text().encode('utf-8') content += results fileobject.close() except Exception as e: logger.error("analysis pdf file fail : %s" % e) return content def analysi_html_file_content(self, filename): content_open = open(filename, 'rb') contents = content_open.read() print contents contents = dehtml(contents) class pythonNToTxt(HTMLParser): def __init__(self): HTMLParser.__init__(self) self.__text = [] def handle_data(self, data): text = data.strip() if len(text) > 0: text = sub('[ \t\r
]+
', ' ', text) self.__text.append(text + ' ') def handle_starttag(self, tag, attrs): if tag == 'p': self.__text.append('

') elif tag == 'br': self.__text.append('
') def handle_startendtag(self, tag, attrs): if tag == 'br': self.__text.append('

') def text(self): return ''.join(self.__text).strip() def dehtml(text): try: parser = pythonNToTxt() parser.feed(text) parser.close() return parser.text() except Exception as e: logger.error("html analysis excepiton : %s" % e) return text logger.info("analysis neeq content start,now params neeq_remainder=%s,neeq_start_id =%s,neeq_json = %s,neeq_id = %s ,neeq_file_path = %s" % (neeq_remainder, neeq_start_id, neeq_json, neeq_id, neeq_file_path)) analysis = analysis() analysis.get_data()
#!/usr/bin/env python
# -*- coding: utf-8 -*
import sys
import log_config
import ConfigParser
import pymysql
from DBUtils.PooledDB import PooledDB
reload(sys)
sys.setdefaultencoding('utf-8')
conf = ConfigParser.ConfigParser()
conf.read("mysql.conf")
user = conf.get("mysql", "user")
password = conf.get("mysql", "password")
database = conf.get("mysql", "database")
host = conf.get("mysql", "host")
port = conf.get("mysql", "port")
charset = "utf8"


class OPMysql(object):
    __pool = None

    def __init__(self):
        #     ,       、  
        pass

    def __enter__(self):
        self.conn = self.getmysqlconn()
        self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
        return self

    def __exit__(self, typeq, value, trace):
        self.cursor.close()
        self.conn.close()

    #         
    @staticmethod
    def getmysqlconn():
        if OPMysql.__pool is None:
            __pool_a = PooledDB(creator=pymysql, mincached=1, maxcached=10, host=host, user=user, passwd=password, db=database, port=int(port), charset=charset)
            OPMysql.__pool = __pool_a
        return OPMysql.__pool.connection()


def get_connection():
    return OPMysql()

ログモジュールは前のエッセイで
#------mysql basic config
[mysql]
user=   
password=  
database=   
host=  mysqlIp
port =3306

[basic_config]
#---------------neeq config
#   0
neeq_remainder = 0
#     
neeq_server_num = 6
neeq_start_id = 1000
neeq_json_path = neeq/json/
neeq_json = neeq
neeq_id = neeq/neeq_id.txt
neeq_file_path = neeq/file/
bucket =    bucket
access_key =     access_key
secret_key =      secret_key

 
転載先:https://www.cnblogs.com/keepMoveForevery/p/10400131.html