エンタープライズ検索エンジン開発のコネクタconnector(八)

33601 ワード

次にTimedCancelableクラスとQueryTraverserクラスの分析を続けます
TimedCancelableクラスのソースコードは次のとおりです.
/**

 * A {@link TimedCancelable} for running a {@link Connector} batch using

 * a {@link Traverser}

 */

class CancelableBatch implements TimedCancelable {

  private static final Logger LOGGER =

    Logger.getLogger(CancelableBatch.class.getName());



  final Traverser traverser;

  final String traverserName;

  final BatchResultRecorder batchResultRecorder;

  final BatchTimeout batchTimeout;

  final BatchSize batchSize;



  /**

   * Construct a {@link CancelableBatch}.

   *

   * @param traverser {@link Traverser} for running the batch.

   * @param traverserName traverser name for logging purposes.

   * @param batchResultRecorder {@link BatchResultRecorder} for recording

   *        the result of running the batch.

   * @param batchSize hint and constraints as to the number of documents

   *        to process in the batch.

   */

  public CancelableBatch(Traverser traverser, String traverserName,

      BatchResultRecorder batchResultRecorder, BatchTimeout batchTimeout,

      BatchSize batchSize) {

    this.traverser = traverser;

    this.traverserName = traverserName;

    this.batchResultRecorder = batchResultRecorder;

    this.batchSize = batchSize;

    this.batchTimeout = batchTimeout;

  }



  public void cancel() {

   traverser.cancelBatch();

  }



  public void timeout(TaskHandle taskHandle) {

     batchTimeout.timeout();

  }



  public void run() {

    NDC.push("Traverse " + traverserName);

    try {

      LOGGER.fine("Begin runBatch; traverserName = " + traverserName

          + "  " + batchSize);

      BatchResult batchResult = traverser.runBatch(batchSize);

      LOGGER.fine("Traverser " + traverserName + " batchDone with result = "

          + batchResult);

      batchResultRecorder.recordResult(batchResult);

    } finally {

      NDC.remove();

    }

  }



  @Override

  public String toString() {

    return "CancelableBatch traverser: " + traverser + "  " + batchSize;

  }

}

このクラスは操作コネクタの非相関メソッドを提供するほか,主にスレッド実行のrunメソッドである(cancelメソッドは主にスレッド実行ハンドルTaskHandleクラスによって呼び出される)
スレッド実行方法でtraverserを呼び出す.runBatch(batchSize)メソッド(traverserはTraverserインタフェース実装クラスQueryTraverserオブジェクト)
実行が完了すると、BatchResultRecorderクラスオブジェクトによって実行結果の情報が記録されます.
引き続きQueryTraverserクラスのソースコードを分析します.
/**

 * Traverser for a repository implemented using a TraversalManager

 */

public class QueryTraverser implements Traverser {

  private static final Logger LOGGER =

      Logger.getLogger(QueryTraverser.class.getName());



  private final PusherFactory pusherFactory;

  private final TraversalManager queryTraversalManager;

  private final TraversalStateStore stateStore;

  private final String connectorName;

  private final TraversalContext traversalContext;



  // Synchronize access to cancelWork.

  private final Object cancelLock = new Object();

  private boolean cancelWork = false;



  public QueryTraverser(PusherFactory pusherFactory,

      TraversalManager traversalManager, TraversalStateStore stateStore,

      String connectorName, TraversalContext traversalContext) {

    this.pusherFactory = pusherFactory;

    this.queryTraversalManager = traversalManager;

    this.stateStore = stateStore;

    this.connectorName = connectorName;

    this.traversalContext = traversalContext;

    if (queryTraversalManager instanceof TraversalContextAware) {

      TraversalContextAware contextAware =

          (TraversalContextAware)queryTraversalManager;

      try {

        contextAware.setTraversalContext(traversalContext);

      } catch (Exception e) {

        LOGGER.log(Level.WARNING, "Unable to set TraversalContext", e);

      }

    }

  }



  //@Override

  public void cancelBatch() {

    synchronized(cancelLock) {

      cancelWork = true;

    }

    LOGGER.fine("Cancelling traversal for connector " + connectorName);

  }



  public boolean isCancelled() {

    synchronized(cancelLock) {

      return cancelWork;

    }

  }



  //@Override

  public BatchResult runBatch(BatchSize batchSize) {

    final long startTime = System.currentTimeMillis();

    final long timeoutTime = startTime

      + traversalContext.traversalTimeLimitSeconds() * 1000;



    if (isCancelled()) {

        LOGGER.warning("Attempting to run a cancelled QueryTraverser");

      return new BatchResult(TraversalDelayPolicy.ERROR);

    }

    try {

      queryTraversalManager.setBatchHint(batchSize.getHint());

    } catch (RepositoryException e) {

      LOGGER.log(Level.WARNING, "Unable to set batch hint", e);

    }



    String connectorState;

    try {

      if (stateStore != null) {

        connectorState = stateStore.getTraversalState();

      } else {

        throw new IllegalStateException("null TraversalStateStore");

      }

    } catch (IllegalStateException ise) {

      // We get here if the ConnectorStateStore for connector is disabled.

      // That happens if the connector was deleted while we were asleep.

      // Our connector seems to have been deleted.  Don't process a batch.

      LOGGER.finer("Halting traversal..." + ise.getMessage());

      return new BatchResult(TraversalDelayPolicy.ERROR);

    }



    DocumentList resultSet = null;

    if (connectorState == null) {

      try {

        LOGGER.finer("Starting traversal...");

        resultSet = queryTraversalManager.startTraversal();

      } catch (Exception e) {

        LOGGER.log(Level.WARNING, "startTraversal threw exception: ", e);

        return new BatchResult(TraversalDelayPolicy.ERROR);

      }

    } else {

      try {

        LOGGER.finer("Resuming traversal...");

        resultSet = queryTraversalManager.resumeTraversal(connectorState);

      } catch (Exception e) {

        LOGGER.log(Level.WARNING, "resumeTraversal threw exception: ", e);

        return new BatchResult(TraversalDelayPolicy.ERROR);

      }

    }



    // If the traversal returns null, that means that the repository has

    // no new content to traverse.

    if (resultSet == null) {

      LOGGER.finer("Result set is NULL, no documents returned for traversal.");

      return new BatchResult(TraversalDelayPolicy.POLL, 0);

    }



    Pusher pusher = null;

    BatchResult result = null;

    int counter = 0;

    try {

      // Get a Pusher for feeding the returned Documents.

      pusher = pusherFactory.newPusher(connectorName);



      while (counter < batchSize.getMaximum()) {

        if (Thread.currentThread().isInterrupted() || isCancelled()) {

          LOGGER.fine("Traversal for connector " + connectorName

                      + " has been interrupted...breaking out of batch run.");

          break;

        }

        if (System.currentTimeMillis() >= timeoutTime) {

          LOGGER.fine("Traversal for connector " + connectorName

              + " is completing due to time limit.");

          break;

        }



        Document nextDocument = null;

        String docid = null;

        try {

          LOGGER.finer("Pulling next document from connector " + connectorName);

          nextDocument = resultSet.nextDocument();

          if (nextDocument == null) {

            break;

          } else {

            // Since there are a couple of places below that could throw

            // exceptions but not exit the while loop, the counter should be

            // incremented here to insure it represents documents returned from

            // the list.  Note the call to nextDocument() could also throw a

            // RepositoryDocumentException signaling a skipped document in which

            // case the call will not be counted against the batch maximum.

            counter++;

            // Fetch DocId to use in messages.

            try {

              docid = Value.getSingleValueString(nextDocument,

                                                 SpiConstants.PROPNAME_DOCID);

            } catch (IllegalArgumentException e1) {

                LOGGER.fine("Unable to get document id for document ("

                            + nextDocument + "): " + e1.getMessage());

            } catch (RepositoryException e1) {

                LOGGER.fine("Unable to get document id for document ("

                            + nextDocument + "): " + e1.getMessage());

            }

          }

          LOGGER.finer("Sending document (" + docid + ") from connector "

              + connectorName + " to Pusher");



          if (!pusher.take(nextDocument)) {

            LOGGER.fine("Traversal for connector " + connectorName

                + " is completing at the request of the Pusher.");

            break;

          }



        } catch (SkippedDocumentException e) {

          /* TODO (bmj): This is a temporary solution and should be replaced.

           * It uses Exceptions for non-exceptional cases.

           */

          // Skip this document.  Proceed on to the next one.

          if (LOGGER.isLoggable(Level.FINER)) {

            LOGGER.log(Level.FINER, "Skipping document (" + docid

                + ") from connector " + connectorName + ": " + e.getMessage());

          }

        } catch (RepositoryDocumentException e) {

          // Skip individual documents that fail.  Proceed on to the next one.

          LOGGER.log(Level.WARNING, "Skipping document (" + docid

              + ") from connector " + connectorName, e);

        } catch (RuntimeException e) {

          // Skip individual documents that fail.  Proceed on to the next one.

          LOGGER.log(Level.WARNING, "Skipping document (" + docid

              + ") from connector " + connectorName, e);

        }

      }

      // No more documents. Wrap up any accumulated feed data and send it off.

      if (!isCancelled()) {

        pusher.flush();

      }

    } catch (OutOfMemoryError e) {

      pusher.cancel();

      System.runFinalization();

      System.gc();

      result = new BatchResult(TraversalDelayPolicy.ERROR);

      try {

        LOGGER.severe("Out of JVM Heap Space.  Will retry later.");

        LOGGER.log(Level.FINEST, e.getMessage(), e);

      } catch (Throwable t) {

        // OutOfMemory state may prevent us from logging the error.

        // Don't make matters worse by rethrowing something meaningless.

      }

    } catch (RepositoryException e) {

      // Drop the entire batch on the floor.  Do not call checkpoint

      // (as there is a discrepancy between what the Connector thinks

      // it has fed, and what actually has been pushed).

      LOGGER.log(Level.SEVERE, "Repository Exception during traversal.", e);

      result = new BatchResult(TraversalDelayPolicy.ERROR);

    } catch (PushException e) {

      LOGGER.log(Level.SEVERE, "Push Exception during traversal.", e);

      // Drop the entire batch on the floor.  Do not call checkpoint

      // (as there is a discrepancy between what the Connector thinks

      // it has fed, and what actually has been pushed).

      result = new BatchResult(TraversalDelayPolicy.ERROR);

    } catch (FeedException e) {

      LOGGER.log(Level.SEVERE, "Feed Exception during traversal.", e);

      // Drop the entire batch on the floor.  Do not call checkpoint

      // (as there is a discrepancy between what the Connector thinks

      // it has fed, and what actually has been pushed).

      result = new BatchResult(TraversalDelayPolicy.ERROR);

    } catch (Throwable t) {

      LOGGER.log(Level.SEVERE, "Uncaught Exception during traversal.", t);

      // Drop the entire batch on the floor.  Do not call checkpoint

      // (as there is a discrepancy between what the Connector thinks

      // it has fed, and what actually has been pushed).

      result = new BatchResult(TraversalDelayPolicy.ERROR);

   } finally {

      // If we have cancelled the work, abandon the batch.

      if (isCancelled()) {

        result = new BatchResult(TraversalDelayPolicy.ERROR);

      }



      // Checkpoint completed work as well as skip past troublesome documents

      // (e.g. documents that are too large and will always fail).

      if ((result == null) && (checkpointAndSave(resultSet) == null)) {

        // Unable to get a checkpoint, so wait a while, then retry batch.

        result = new BatchResult(TraversalDelayPolicy.ERROR);

      }

    }

    if (result == null) {

      result = new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,

                               startTime, System.currentTimeMillis());

    } else if (pusher != null) {

      // We are returning an error from this batch. Cancel any feed that

      // might be in progress.

      pusher.cancel();

    }

    return result;

  }



  private String checkpointAndSave(DocumentList pm) {

    String connectorState = null;

    LOGGER.finest("Checkpointing for connector " + connectorName + " ...");

    try {

      connectorState = pm.checkpoint();

    } catch (RepositoryException re) {

      // If checkpoint() throws RepositoryException, it means there is no

      // new checkpoint.

      return null;

    } catch (Exception e) {

      // If checkpoint() throws some general Exception, it is probably

      // an older connector that doesn't understand the newer empty

      // DocumentList and Exception handling from runBatch() model.

      return null;

    }

    try {

      if (connectorState != null) {

        if (stateStore != null) {

          stateStore.storeTraversalState(connectorState);

        } else {

          throw new IllegalStateException("null TraversalStateStore");

        }

        LOGGER.finest("...checkpoint " + connectorState + " created.");

      }

      return connectorState;

    } catch (IllegalStateException ise) {

      // We get here if the ConnectorStateStore for connector is disabled.

      // That happens if the connector was deleted while we were working.

      // Our connector seems to have been deleted.  Don't save a checkpoint.

      LOGGER.finest("...checkpoint " + connectorState + " discarded.");

    }

    return null;

  }

}

 
void cancelBatch()メソッドは実行をキャンセルするために使用され、前述したようにCancelableBatchクラスによって呼び出され、TaskhandleはCancelableBatchクラスのcancel()メソッドを再呼び出し、最終的にConnectorCoordinatorImplクラスのvoid resetBatch()メソッドに委任される
重要な方法であるBatchResult runBatch(BatchSize batchSize)を分析します.ここでは、特定のコネクタのTransversalManagerインタフェース実装クラスを呼び出してデータをポーリングします.
まず、タスクがキャンセルされたかどうかを判断します.そうしないと、直接戻ります.
次に、格納タスクのステータス情報を取得し、queryTraversalManagerを呼び出す.startTraversal();またはqueryTraversalManagement.resumeTraversal(connectorState);データ結果セットDocumentList resultSetの取得
次にPusherFactoryファクトリでPusherタイプオブジェクトを取得し、Pusherタイプオブジェクトのtakeメソッドを呼び出して、アプリケーションセンターにデータを送信したい
最後に、データ結果セットの状態情報を保存し、コネクタデータ収集の結果情報に戻る
new BatchResult(TraversalDelayPolicy.IMMEDIATE, counter,startTime, System.currentTimeMillis());
---------------------------------------------------------------------------
本シリーズ企業検索エンジン開発のコネクタconnector系本人オリジナル
転載は出典のブログ園のハリネズミのおとなしいことを明記してください
本リンクhttp://www.cnblogs.com/chenying99/archive/2013/03/19/2968411.html