pythonネットワーク爬虫類myip.ms

16004 ワード

ターゲットサイト:https://myip.ms難易度10個星は極めて強い反爬虫能力を持っている:ipを閉じる
'''

------------------------------
https://myip.ms/browse/web_hosting/1/countryID/ALA%5EASM

------------------------------
'''

import os
import csv
import time
import random
import requests
from lxml import etree
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException


class LoopOver(Exception):
    def __init__(self, *args, **kwargs):
        pass


class Spider:
    def __init__(self):
        # csv  
        self.path = '.'
        self.inputfilename = 'country.csv'
        self.csvfilename = 'datas.csv'
        self.logfilename = 'run.log'

        options = webdriver.ChromeOptions()

        self.browser = webdriver.Chrome(options=options)
        self.wait = WebDriverWait(self.browser, 20)

        #   
        self.listurl = 'https://myip.ms/browse/web_hosting/1/countryID/{}'

        self.host = 'https://myip.ms'

        self.tempalte = '''

{}

{}
No Hosting Company Website/s Total Websites use this company IPs TOP Websites use this company IPs Record Update Time
''' self.tempalte_page = ''' {}
No Web Site Website IP Address Website IPV6 Address World Site Popular World Site Popular Rating DNS Records Record Update Time
''' def turn2filename(self, dst): d = dst.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace( "?", "").replace("\"", "").replace("", "").replace( "|", "") return d def run(self): strat = time.time() self.get_input() # 71 for c, cid in self.datas[115:116]: print('>>> ', c, self.listurl.format(cid)) for item_index, item in enumerate(self.parse_list(self.get_list(self.listurl.format(cid)))): if item[1] == '- No Records Found -': item[0] = c if c in ['British Indian Ocean Territory', 'Brunei', 'Bulgaria']: self.save_data(item=item, filename=self.turn2filename(c) + '.csv') else: self.save_data(item=item, filename='data.csv') time.sleep(0) end = time.time() self.runtime = end - strat print(' {}'.format(self.runtime)) end = time.time() self.runtime = end - strat def get_input(self): with open(self.inputfilename, 'r', encoding='utf_8') as f: reader = csv.reader(f) self.datas = [i for i in list(reader) if i] def mkurl(self, kw): for i in range(0, 1): yield self.listurl.format(kw, i * 10) def get_list(self, url): while True: try: self.browser.get(url) try: self.wait.until(EC.presence_of_element_located( (By.XPATH, '//*[@id="sites_tbl" or @id ="web_hosting_tbl"]'))) except Exception: if 'a Robot' in self.browser.find_element_by_xpath('/html/body/div[2]/div/div/div/center').text: self.browser.find_element_by_xpath( '//*[@id="captcha_submit"]').click() time.sleep(1) raise Exception return self.browser except Exception as error: print('error >>> ', error) if self.browser.current_url != url: self.browser.quit() self.browser = webdriver.Chrome() self.wait = WebDriverWait(self.browser, 20) time.sleep(1) pass def parse_list(self, response): html = etree.HTML(response.page_source) def pop(attr): return attr[0].strip().replace( '
', '').replace(' ', '') if attr else '' for tr in html.xpath('//*[@id="web_hosting_tbl"]/tbody/tr[not(contains(@class,"expand"))]'): No = tr.xpath('./td[1]/text()')[0].strip() Hosting_Company = pop(tr.xpath('./td[2]/a/text()')) page_url = pop(tr.xpath('./td[2]/a/@href')) country_name = pop(tr.xpath('./td[3]/a/text()')) Website = pop(tr.xpath('./td[4]/a/text()')) Total_Websites_use_this_company_IPs = pop( tr.xpath('./td[5]/a/text()')) TOP_Websites_use_this_company_IPs = pop( tr.xpath('./td[6]/a/text()')) record_update_time = pop( tr.xpath('./td[7]/text()')) yield [country_name, No, Hosting_Company, Website, Total_Websites_use_this_company_IPs, TOP_Websites_use_this_company_IPs, record_update_time, self.host + page_url] def get_page(self, url): while True: try: self.browser.get(url) try: self.wait.until(EC.presence_of_element_located( (By.XPATH, '//*[@id="sites_tbl" or @id ="web_hosting_tbl"]'))) except Exception as error: print('//*[@id="sites_tbl" or @id ="web_hosting_tbl"] error', error) if 'a Robot' in self.browser.find_element_by_xpath('/html/body/div[2]/div/div/div/center').text: self.browser.find_element_by_xpath( '//*[@id="captcha_submit"]').click() time.sleep(5) raise Exception return self.browser except Exception as error: print('error >>> ', error) if self.browser.current_url != url: self.browser.quit() self.browser = webdriver.Chrome() self.wait = WebDriverWait(self.browser, 20) time.sleep(100) pass def parse_page(self, response): text = response.page_source html = etree.HTML(text) def pop(attr): return attr[0].strip().replace( '
', '').replace(' ', '') if attr else '' l = len(html.xpath( '//*[@id="sites_tbl" or @id ="web_hosting_tbl"]/tbody/tr[not(contains(@class,"expand"))]')) print('len is ', l) try: for i in range(1, l + 1): tr = html.xpath( '//*[@id="sites_tbl" or @id ="web_hosting_tbl"]/tbody/tr[not(contains(@class,"expand"))][{}]'.format( i))[0] tre = html.xpath( '//*[@id="sites_tbl" or @id ="web_hosting_tbl"]/tbody/tr[contains(@class,"expand")][{}]'.format(i))[ 0] No = pop(tr.xpath('./td[1]/text()')) web_site = pop(tr.xpath('./td[2]/a/text()')) web_site_ip_address = pop(tr.xpath('./td[3]/a/text()')) # tre web_site_ipv6_address = pop( tre.xpath( './td[1]/div[@class="stitle"]/b[contains(text(),"IPv6")]/../following-sibling::*[1]//a/text()')) # tre website_popularity = pop( tre.xpath('./td[1]/div/span[@class="bold arial grey"]/text()')) website_popularity_rating = pop( tr.xpath('./td[7]/span/text()')) # tre dns_records = '
'.join( [i for i in tre.xpath( './td[1]/div[@class="stitle"]/b[contains(text(),"DNS")]/../following-sibling::*[1]//a/text()')]) # tre record_update_time = pop( tre.xpath( './td[1]/div[@class="stitle"]/b[contains(text(),"Record Update Time")]/../following-sibling::div/text()')) yield [No, web_site, web_site_ip_address, web_site_ipv6_address, website_popularity, website_popularity_rating, dns_records, record_update_time] except IndexError: raise LoopOver if l < 50: with open('error.html', 'w', encoding='utf-8') as f: f.write(text) raise LoopOver def save_data(self, filename=None, path=None, item=None): if not filename: filename = self.csvfilename if not path: path = self.path ''' ''' with open('{}/{}'.format(path, filename), 'a', encoding='utf_8', newline='') as csvfile: writer = csv.writer(csvfile) writer.writerow(item) def save_log(self, info): with open(self.logfilename, 'a', encoding='utf-8') as f: f.write(info + ' ' + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + '
') def save_html_list(self, country, items, filename=None, path=None): tr = '' for item in items: t = '' for index, it in enumerate(item): if index == 1: td = '{}'.format( country, it.replace("\\", "").replace("/", "").replace(":", "").replace("*", "").replace("?", "").replace( "\"", "").replace("", "").replace("|", ""), it) else: td = '{}'.format(it) t += td tr += '' + t + '' with open('main.html', 'a', encoding='utf-8') as f: f.write(self.tempalte.format(country, tr)) def save_html_page(self, country, items, filename=None, path=None, it=None): if not os.path.exists(path): os.mkdir(path) tr = '' for index, item in enumerate(items): t = '' for it in item: td = '{}'.format(it) t += td tr += '' + t + '' with open('./{}/{}'.format(path, filename), 'w', encoding='utf-8') as f: f.write(self.tempalte_page.format(tr)) @property def time(self): return ' :{} '.format(self.runtime) if __name__ == '__main__': spider = Spider() spider.run() print(spider.time) #