JAVAマルチスレッドネットワーク爬虫類のコード実装
プロジェクトが必要なので、ネット爬虫類の小さなDEMOを作りました.
高性能のネットワーク爬虫類を実現するために、まずAPACEのHttpClientを採用してページの採集と解析を行うことを考慮して、HttpClientはURLを通じて遠隔の内容を簡単に得ることができて、例えば1つの小さいプログラム:
ページ解析やアナログ登録なども可能で、かなりの機能です.
次に、ネットワーク爬虫類やネットワーク採集であれば、大量のURLアドレスの収集や分析が必要になる可能性があるので、NoSQLデータベースで実行効率を高める必要があり、Redis、Memcache、BerkeleyDBは良い選択です.ここではBerkeleyDBデータベースを選択します.従来のキューやその他の形式を使用すると、パフォーマンスが向上する可能性がありますが、メモリ消費量が多くなり、条件に合った大きなメモリサーバが見つかるとは限りません.
そして、URLアドレスをフィルタリングして、既読のURLアドレスかどうかを判断し、既読であれば既読データベースに格納し、未読であれば未読データベースに格納するのは、URLアドレスの重複読み出しを避けるためにキューの形に似ている.もちろん,ページ内容が重複しているか否かを判断し,重複ページを読み取る確率を低減する必要がある.
そして,ページを解析し,キーコンテンツとURLアドレスを抽出する.
最後に、パフォーマンスを保証するために、マルチスレッドの実装方式を採用し、マルチサーバのモードで分散アルゴリズムを採用してより高いパフォーマンスを実現することもできる.
上の考え方に従って、小さなプログラムを書きました.
1、一部の配置情報は、CrawlConfigで構成し、これらをxmlファイルとして保存してもよい.ここで収集したのは163サイトである
(1)CrawlConfig.java
(2) CrawlUrl.JAvaはURLアドレスのオブジェクトとして、もちろんURL属性の他にも情報を格納できます
(3)LinkFilter.JAva、URLアドレスとしてのフィルタ
2、BerkelyDBにアクセスするコードを作成する(まずBerkeleyDBをインストールし、BerkeleyDBのJeパッケージを導入してください
(1)AbstractFrontier.java
(2)Frontier.java
(3)並行BDRontierを考慮する.java
3、核心部分:URL取得、ページ解析、ページダウンロードを含み、ページの解析とダウンロードには時間がかかる.
(1)RetievePage.JAva、URLアクセスとページダウンロードを実現
(2)HtmlParserTool.JAva,ページの解析を実現し,URLアドレスを抽出する
(3)MyCrawler.JAvaはページの採集を実現し,ここでは幅優先の採集規則を採用しているが,もちろんより複雑な考慮では深さを設定し,ここでは主にドメイン名接頭辞をフィルタ条件として採用している.またマルチスレッド環境では,データの同期問題を考慮する必要がある.
(4)Runnableインタフェース形式でd額マルチスレッドを実現する
実行結果:
採集開始
……
収集終了プログラム実行時間:25777 ms
最後に採集性能を分析し、前後してLinkQueueキュー単一スレッド、BerkeleyDB単一スレッド、BerkeleyDBマルチスレッド方案を採用してネットワーク採集を行い、テストデータの比較は以下の通りである.
これにより、マルチスレッドは明らかなパフォーマンス向上をもたらすことができます.
高性能のネットワーク爬虫類を実現するために、まずAPACEのHttpClientを採用してページの採集と解析を行うことを考慮して、HttpClientはURLを通じて遠隔の内容を簡単に得ることができて、例えば1つの小さいプログラム:
CloseableHttpClienthttp client = HttpClients.createDefault();
HttpGet httpget = new HttpGet("http://localhost/");
CloseableHttpResponse response = httpclient.execute(httpget);
try
{
HttpEntity entity =response.getEntity();
if (entity != null) {
long len =entity.getContentLength();
if (len != -1 && len <2048) {
System.out.println(EntityUtils.toString(entity));
} else {
// Stream contentout
}
}
}
finally {
response.close();
}
ページ解析やアナログ登録なども可能で、かなりの機能です.
次に、ネットワーク爬虫類やネットワーク採集であれば、大量のURLアドレスの収集や分析が必要になる可能性があるので、NoSQLデータベースで実行効率を高める必要があり、Redis、Memcache、BerkeleyDBは良い選択です.ここではBerkeleyDBデータベースを選択します.従来のキューやその他の形式を使用すると、パフォーマンスが向上する可能性がありますが、メモリ消費量が多くなり、条件に合った大きなメモリサーバが見つかるとは限りません.
そして、URLアドレスをフィルタリングして、既読のURLアドレスかどうかを判断し、既読であれば既読データベースに格納し、未読であれば未読データベースに格納するのは、URLアドレスの重複読み出しを避けるためにキューの形に似ている.もちろん,ページ内容が重複しているか否かを判断し,重複ページを読み取る確率を低減する必要がある.
そして,ページを解析し,キーコンテンツとURLアドレスを抽出する.
最後に、パフォーマンスを保証するために、マルチスレッドの実装方式を採用し、マルチサーバのモードで分散アルゴリズムを採用してより高いパフォーマンスを実現することもできる.
上の考え方に従って、小さなプログラムを書きました.
1、一部の配置情報は、CrawlConfigで構成し、これらをxmlファイルとして保存してもよい.ここで収集したのは163サイトである
(1)CrawlConfig.java
……
public class CrawlConfig {
public static final String CRAWL_PATH = "http://www.163.com";
public static final String CRAWL_LIMIT_PATH = "http://www.163.com";
public static final String CRAWL_VISITED_FRONTIER = "d:\\cache\\hevisited";
public static final String CRAWL_UNVISITED_FRONTIER = "d:\\cache\\heunvisited";
public static final String CRAWL_DOWNLOAD_PATH = "d:\\download\\163\\";
public static final int CRAWL_THREAD_NUM = 6;
}
(2) CrawlUrl.JAvaはURLアドレスのオブジェクトとして、もちろんURL属性の他にも情報を格納できます
……
public class CrawlUrl implements Serializable{
private static final long serialVersionUID = 79332323432323L;
public CrawlUrl() {
}
private String oriUrl; // url
private String url; //url
public String getOriUrl() {
return oriUrl;
}
public void setOriUrl(String oriUrl) {
this.oriUrl = oriUrl;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
(3)LinkFilter.JAva、URLアドレスとしてのフィルタ
public interface LinkFilter {
public boolean accept(String url);
}
2、BerkelyDBにアクセスするコードを作成する(まずBerkeleyDBをインストールし、BerkeleyDBのJeパッケージを導入してください
(1)AbstractFrontier.java
……
public abstract class AbstractFrontier {
private Environment env;
private static String CLASS_CATALOG = "java_class_catalog";
protected StoredClassCatalog javaCatalog;
protected Database catalogdatabase;
protected static Database database = null ;
protected String homeDirectory = null;
public AbstractFrontier(String homeDirectory) throws DatabaseException,
FileNotFoundException {
this.homeDirectory = homeDirectory;
System.out.println("open environment: " + homeDirectory);
// , env
EnvironmentConfig envConfig = new EnvironmentConfig();
envConfig.setTransactional(true);
envConfig.setAllowCreate(true);
env = new Environment(new File(homeDirectory), envConfig);
//
DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setTransactional(true);
dbConfig.setAllowCreate(true);
//
catalogdatabase = env.openDatabase(null, CLASS_CATALOG, dbConfig);
javaCatalog = new StoredClassCatalog(catalogdatabase);
//
DatabaseConfig dbConfigTe = new DatabaseConfig();
dbConfigTe.setTransactional(true);
dbConfigTe.setAllowCreate(true);
//
database = env.openDatabase(null, "URL", dbConfig);
}
public void close() throws DatabaseException {
database.close();
javaCatalog.close();
env.close();
}
protected abstract void put(Object key, Object value);
protected abstract Object get(Object key);
protected abstract Object delete(Object key);
}
(2)Frontier.java
……
public interface Frontier {
public CrawlUrl getNext() throws Exception;
public boolean putUrl(CrawlUrl url) throws Exception;
}
(3)並行BDRontierを考慮する.java
……
public class BDBFrontier extends AbstractFrontier implements Frontier{
private StoredMap pendingUrisDB = null;
public static int threads = CrawlConfig.CRAWL_THREAD_NUM;
/**
* Creates a new instance of BDBFrontier.
*
* @param homeDirectory
* @throws DatabaseException
* @throws FileNotFoundException
*/
public BDBFrontier(String homeDirectory) throws DatabaseException,
FileNotFoundException {
super(homeDirectory);
EntryBinding keyBinding = new SerialBinding(javaCatalog, String.class);
EntryBinding valueBinding = new SerialBinding(javaCatalog, CrawlUrl.class);
pendingUrisDB = new StoredMap(database, keyBinding, valueBinding, true);
}
/**
*
* clearAll:
*
*
* @param
* @return void
* @throws
*
*/
public void clearAll() {
if(!pendingUrisDB.isEmpty())
pendingUrisDB.clear();
}
/**
*
* @see com.fc.frontier.Frontier#getNext()
*/
@Override
public synchronized CrawlUrl getNext() throws Exception {
CrawlUrl result = null;
while(true) {
if(!pendingUrisDB.isEmpty()) {
Set entrys = pendingUrisDB.entrySet();
Entry<String, CrawlUrl> entry = (Entry<String,
CrawlUrl>) pendingUrisDB.entrySet().iterator().next();
result = entry.getValue(); //
delete(entry.getKey()); //
System.out.println("get:" + homeDirectory + entrys);
return result;
}
else {
threads --;
if(threads > 0) {
wait();
threads ++;
}
else {
notifyAll();
return null;
}
}
}
}
/**
* url
* @see com.fc.frontier.Frontier#putUrl(com.fc.CrawlUrl)
*/
@Override
public synchronized boolean putUrl(CrawlUrl url) throws Exception {
if(url.getOriUrl() != null && !url.getOriUrl().equals("")
&& !pendingUrisDB.containsKey(url.getOriUrl()))
{
Set entrys = pendingUrisDB.entrySet();
put(url.getOriUrl(), url);
notifyAll();
System.out.println("put:" + homeDirectory + entrys);
return true;
}
return false;
}
public boolean contains(Object key) {
if(pendingUrisDB.containsKey(key))
return true;
return false;
}
/**
*
* @see com.fc.frontier.AbstractFrontier#put(java.lang.Object, java.lang.Object)
*/
@Override
protected synchronized void put(Object key, Object value) {
pendingUrisDB.put(key, value);
}
/**
*
* @see com.fc.frontier.AbstractFrontier#get(java.lang.Object)
*/
@Override
protected synchronized Object get(Object key) {
return pendingUrisDB.get(key);
}
/**
*
* @see com.fc.frontier.AbstractFrontier#delete(java.lang.Object)
*/
@Override
protected synchronized Object delete(Object key) {
return pendingUrisDB.remove(key);
}
/**
*
* calculateUrl:
* Url ,
*
* @param
* @return String
* @throws
*
*/
private String calculateUrl(String url) {
return url;
}
public static void main(String[] strs) {
try {
BDBFrontier bdbFrontier = new BDBFrontier("d:\\cache");
CrawlUrl url = new CrawlUrl();
url.setOriUrl("http://www.163.com");
bdbFrontier.putUrl(url);
System.out.println(((CrawlUrl)bdbFrontier.getNext()).getOriUrl());
bdbFrontier.close();
}catch(Exception e) {
e.printStackTrace();
}
}
}
3、核心部分:URL取得、ページ解析、ページダウンロードを含み、ページの解析とダウンロードには時間がかかる.
(1)RetievePage.JAva、URLアクセスとページダウンロードを実現
……
public class RetrievePage {
private static String USER_AGENT = "Mozilla/4.0 (compatible; MSIE 6.0;
Windows NT 5.1; SV1; QQDownload 1.7; .NET CLR 1.1.4322; CIBA; .NET CLR
2.0.50727";
private static String DEFAULT_CHARSET = "GB2312,utf-8;q=0.7,*;q=0.7";
private static String ACCEPT = "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8";
/**
*
* @param path
* @return
* @throws Exception
* @throws IOException
*/
public static boolean downloadPage(String path) throws Exception,IOException
{
CloseableHttpClient httpclient = HttpClients.createDefault();
HttpGet httpget = new HttpGet(path);
httpget.addHeader("Accept-Charset", DEFAULT_CHARSET);
// httpget.addHeader("Host", host);
httpget.addHeader("Accept", ACCEPT);
httpget.addHeader("User-Agent", USER_AGENT);
RequestConfig requestConfig = RequestConfig.custom() //
.setSocketTimeout(1000)
.setConnectTimeout(1000)
.build();
httpget.setConfig(requestConfig);
CloseableHttpResponse response = httpclient.execute(httpget);
try {
HttpEntity entity = response.getEntity();
StatusLine statusLine = response.getStatusLine();
if(statusLine.getStatusCode() == HttpStatus.SC_MOVED_PERMANENTLY || //
statusLine.getStatusCode() == HttpStatus.SC_MOVED_TEMPORARILY ||
statusLine.getStatusCode() == HttpStatus.SC_SEE_OTHER ||
statusLine.getStatusCode() == HttpStatus.SC_TEMPORARY_REDIRECT)
{
Header header = httpget.getFirstHeader("location");
if(header != null){
String newUrl = header.getValue();
if(newUrl == null || newUrl.equals(""))
{
newUrl = "/";
HttpGet redirect = new HttpGet(newUrl);
}
}
}
if(statusLine.getStatusCode() == HttpStatus.SC_OK) { //
if (entity == null) {
throw new ClientProtocolException("Response contains no content");
}
else {
InputStream instream = entity.getContent();
String filename = getFilenameByUrl(path,entity.getContentType().getValue());
OutputStream outstream = new
FileOutputStream(CrawlConfig.CRAWL_DOWNLOAD_PATH +
filename); //
try {
//System.out.println(convertStreamToString(instream));
int tempByte = -1;
while((tempByte = instream.read())>0)
{
outstream.write(tempByte);
}
return true;
}
catch(Exception e){
e.printStackTrace();
return false;
} finally {
if(instream != null)
{
instream.close();
}
if(outstream != null)
{
outstream.close();
}
}
}
}
return false;
}finally {
response.close();
}
}
public static String getFilenameByUrl(String url, String contentType) {
url = url.substring(7);
if(contentType.indexOf("html") != -1) {
url = url.replaceAll("[\\?/:*|<>\"]","_") + ".html";
return url;
}
else {
url = url.replaceAll("[\\?/:*|<>\"]","_") + contentType.substring(contentType.lastIndexOf('/') + 1);
return url;
}
}
/**
*
* @param is
* @return
*/
public static String convertStreamToString(InputStream is) {
BufferedReader reader = new BufferedReader(new InputStreamReader(is));
StringBuilder sb = new StringBuilder();
String line = null;
try {
while ((line = reader.readLine()) != null) {
sb.append(line + "/n");
}
} catch (IOException e) {
e.printStackTrace();
} finally {
}
return sb.toString();
}
public static void main(String[] args)
{
try{
System.out.println(" ");
RetrievePage.downloadPage("http://www.baidu.com");
System.out.println(" ");
}
catch(HttpException e){
e.printStackTrace();
}
catch(IOException e)
{
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
(2)HtmlParserTool.JAva,ページの解析を実現し,URLアドレスを抽出する
……
public class HtmlParserTool {
public static Set<String> extractLinks(String url, LinkFilter filter){
Set<String> links = new HashSet<String>();
try {
Parser parser = new Parser(url);
parser.setEncoding("gb2312");
NodeFilter frameFilter = new NodeFilter() { //
public boolean accept(Node node) {
if(node.getText().startsWith("frame src=")) {
return true;
}
else {
return false;
}
}
};
OrFilter linkFilter = new OrFilter(new NodeClassFilter(LinkTag.class), frameFilter);
NodeList list = parser.extractAllNodesThatMatch(linkFilter); //
for(int i = 0; i <list.size();i++)
{
Node tag = list.elementAt(i);
if(tag instanceof LinkTag) { //
LinkTag linkTag = (LinkTag) tag;
String linkUrl = linkTag.getLink();//url
String text = linkTag.getLinkText();//
System.out.println(linkUrl + "**********" + text);
if(filter.accept(linkUrl))
links.add(linkUrl);
}
else if (tag instanceof ImageTag) //<img> //
{
ImageTag image = (ImageTag) list.elementAt(i);
System.out.print(image.getImageURL() + "********");//
System.out.println(image.getText());//
if(filter.accept(image.getImageURL()))
links.add(image.getImageURL());
}
else//<frame>
{
// frame src <frame src="test.html"/>
String frame = tag.getText();
int start = frame.indexOf("src=");
frame = frame.substring(start);
int end = frame.indexOf(" ");
if (end == -1)
end = frame.indexOf(">");
frame = frame.substring(5, end - 1);
System.out.println(frame);
if(filter.accept(frame))
links.add(frame);
}
}
return links;
} catch (ParserException e) {
e.printStackTrace();
return null;
}
}
}
(3)MyCrawler.JAvaはページの採集を実現し,ここでは幅優先の採集規則を採用しているが,もちろんより複雑な考慮では深さを設定し,ここでは主にドメイン名接頭辞をフィルタ条件として採用している.またマルチスレッド環境では,データの同期問題を考慮する必要がある.
……
public class MyCrawler {
public static BDBFrontier visitedFrontier;
public static BDBFrontier unvisitedFrontier;
private static int num = 0;
public MyCrawler() {
try{
if(visitedFrontier == null){
visitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_VISITED_FRONTIER); // Nosql
visitedFrontier.clearAll();
}
if(unvisitedFrontier == null) {
unvisitedFrontier = new BDBFrontier(CrawlConfig.CRAWL_UNVISITED_FRONTIER);
unvisitedFrontier.clearAll();
}
}catch(Exception e) {
e.printStackTrace();
}
}
private void initCrawlerWithSeeds(String[] seeds) {
synchronized (this) {
try {
for(int i = 0;i<seeds.length;i++){
CrawlUrl url = new CrawlUrl(); // berkeleyDB
url.setOriUrl(seeds[i]);
unvisitedFrontier.putUrl(url);
}
} catch(Exception e) {
e.printStackTrace();
}
}
}
public void crawling(String[] seeds, int threadId) {
try {
LinkFilter filter = new LinkFilter() {
@Override
public boolean accept(String url) {
Pattern pattern = Pattern.compile("^((https|http|ftp|rtsp|mms)?://)"
+ "+(([0-9a-z_!~*'().&=+$%-]+: )?[0-9a-z_!~*'().&=+$%-]+@)?"
+ "(([0-9]{1,3}\\.){3}[0-9]{1,3}"
+ "|"
+ "([0-9a-z_!~*'()-]+\\.)*"
+ "([0-9a-z][0-9a-z-]{0,61})?[0-9a-z]\\."
+ "[a-z]{2,6})"
+ "(:[0-9]{1,4})?"
+ "((/?)|"
+ "(/[0-9a-z_!~*'().;?:@&=+$,%#-]+)+/?)$");
Matcher matcher = pattern.matcher(url);
boolean isMatch= matcher.matches();
if(isMatch && url.startsWith(CrawlConfig.CRAWL_LIMIT_PATH)) {
return true;
}
else {
return false;
}
}
};
initCrawlerWithSeeds(seeds);
// berkeleyDB
CrawlUrl visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
//visitedFrontier.putUrl(visitedCrawlUrl);
do{
System.out.println(" :" + threadId);
if(visitedCrawlUrl == null) {
continue;
}
String visitedUrl = visitedCrawlUrl.getOriUrl();
if(visitedFrontier.contains(visitedUrl)) { //
visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
continue;
}
visitedFrontier.putUrl(visitedCrawlUrl);
if(null == visitedUrl || "".equals(visitedUrl.trim())) { //
visitedFrontier.putUrl(visitedCrawlUrl);
visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
continue;
}
try{
RetrievePage.downloadPage(visitedUrl); //
Set<String> links = HtmlParserTool.extractLinks(visitedUrl, filter);
for(String link :links) {
if(!visitedFrontier.contains(link)
&&!unvisitedFrontier.contains(link) )
{
CrawlUrl unvisitedCrawlUrl = new CrawlUrl();
unvisitedCrawlUrl.setOriUrl(link);
unvisitedFrontier.putUrl(unvisitedCrawlUrl);
}
}
}catch(ConnectTimeoutException e) { //
visitedFrontier.putUrl(visitedCrawlUrl);
visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
num ++;
e.printStackTrace();
continue;
}catch(SocketTimeoutException e) {
visitedFrontier.putUrl(visitedCrawlUrl);
visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
num ++;
e.printStackTrace();
continue;
}
visitedCrawlUrl = (CrawlUrl)unvisitedFrontier.getNext();
num ++;
}while(BDBFrontier.threads >0 && num < 1000);
}
catch (IOException e) {
e.printStackTrace();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
(4)Runnableインタフェース形式でd額マルチスレッドを実現する
……
public class MyCrawlerByThread extends MyCrawler implements Runnable{
private int threadId;
public MyCrawlerByThread(int id) {
this.threadId = id;
}
/**
* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
crawling(new String[]{CrawlConfig.CRAWL_PATH}, threadId);
}catch(Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
long startTime=System.currentTimeMillis();
System.out.println(" ");
ArrayList<Thread> threadList = new ArrayList<Thread>(CrawlConfig.CRAWL_THREAD_NUM);
for(int i = 0 ; i < CrawlConfig.CRAWL_THREAD_NUM; i++) {
MyCrawlerByThread crawler = new MyCrawlerByThread(i);
Thread t = new Thread(crawler);
t.start();
threadList.add(t);
Thread.sleep(10L);
}
while(threadList.size() > 0) {
Thread child = (Thread) threadList.remove(0);
child.join();
}
System.out.println(" ");
long endTime=System.currentTimeMillis();
System.out.println(" : "+(endTime-startTime)+"ms");
} catch(Exception e) {
e.printStackTrace();
}
}
}
実行結果:
採集開始
……
収集終了プログラム実行時間:25777 ms
最後に採集性能を分析し、前後してLinkQueueキュー単一スレッド、BerkeleyDB単一スレッド、BerkeleyDBマルチスレッド方案を採用してネットワーク採集を行い、テストデータの比較は以下の通りである.
これにより、マルチスレッドは明らかなパフォーマンス向上をもたらすことができます.