トリッキーなデータフローEP.2:MongoDBビューからドキュメントをインポートする


これは私のトリッキーなデータフローシリーズの2番目のエピソードです.そこで、私はGoogle Cloud Dataflowでパイプラインを実行している間、私が直面した最も重要な問題のいくつかを提示します、そして、私は彼らを克服しました.
今回は、完全に異なる種類のデータベースについて話しましょう
MongoDBは現在、DBの世界ではかなり広く、市場で最もよく知られているNOSQLデータベースです.それで、1つが予想するように、データフローSDKはAを持ちましたMongoDB connector ready to ease the usage of MongoDB as a datasource .
それはmongodbコレクションに読み書きする能力を提供していますので、私は(MongoDBにはあまり馴染みのないNA - AVI - ve ME)、この単純なパイプラインを実装するために必要とされたすべてのものだと考えました.

しかし、もちろんそうでなければ、ブログ記事を書くポイントがないでしょう.

それで、あなたはビューを尋ねたいです。


私がウォームアップとしてした最初のバージョンでは、コレクションから直接文書を読みますMongoDbIO.read().withUri(...).withDatabase(...).withCollection(...) そして、本当の問題に直面しませんでした.しかし、私はその時の重要性を理解していなかった.
ソースMongoDBインスタンスがアトラスでホストされたので.MongoDbIO was not allowed to run the default splitVector() command したがってit was mandatory to add withBucketAuto(true) clause to download the collection.
私は、コレクションの代わりにビューネームを使用しようとしたときに起こった困難を予想していませんでした.

[WARNING] org.apache.beam.sdk.Pipeline$PipelineExecutionException: com.mongodb.MongoCommandException: Command failed with error 166 (CommandNotSupportedOnView): 'Namespace [myview] is a view, not a collection' on server [***]


それで、MongoDBは私の見解について知っています、私がこの見解を要求したいと思うのを理解します、しかし、いいえ、それは私にそれから文書を取り出させません.実際には、ビューからドキュメントを取得する簡単な方法がなかったことが判明.確かに良い説明がありますが、見つけられませんでした.とてもイライラする.

あなたは、その感覚を知っています..(写真Wikipedia/nlan 86 )
実際には、MongoDBのビューは、SQL Worldの通常のビューとして簡単ではありません.MongoDBビューは、集約パイプラインで処理されたコレクションドキュメントの結果です.そして、Mongodbioは読書コレクションの上で集約質問を実行することができますAggregationQuery それは.withQueryFn() . ソリューションが表示され始めました.
  • コレクションから読み込む
  • ビューオプションから集約定義を取得する
  • アグリゲーションパイプラインをwithQueryFn
  • MongoDBは提供されたパイプラインを通してドキュメントを処理します
  • 計画に従いましょう!

    ビューの集約パイプラインを取得する


    パイプラインを取得するには、Mongo Javaクライアントを直接使用し、コレクション情報を取得する必要があります.かなり詳細です.
    static List<BsonDocument> retrieveViewPipeline(Options options) {
            if (Strings.isNullOrEmpty(options.getView())) {
                LOG.debug("No view in options");
                return new ArrayList<>();
            }
            com.mongodb.MongoClientOptions.Builder optionsBuilder = new com.mongodb.MongoClientOptions.Builder();
            optionsBuilder.maxConnectionIdleTime(60000);
            MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb+srv://" + options.getMongoDBUri(),
                    optionsBuilder));
    
            List<Document> viewPipeline = null;
            for (Document collecInfosDoc : mongoClient.getDatabase(options.getDatabase()).listCollections()) {
                if (collecInfosDoc.getString("name").equalsIgnoreCase(options.getView())) {
                    viewPipeline = collecInfosDoc.get("options", Document.class).getList("pipeline", Document.class);
                    break;
                }
            }
            checkArgument(viewPipeline != null, String.format("%s view not found", options.getView()));
    
            return viewPipeline.stream().map((doc) -> doc.toBsonDocument(BsonDocument.class,
                    MongoClient.getDefaultCodecRegistry())).collect(Collectors.toList());
        }
    

    パイプラインをMongodbioに渡す


    前述のように、Mongodbioは集約を扱う方法を持っています.withQueryFn . しかし、実際にこのメソッドhas a little bug in the current version (2.27) パイプラインが複数のステップを持つとき:

    ライン71 :パイプラインの最後の段階のための厳しい時間
    もちろん、単純な回避策があります.単に役に立たないアイテムをパイプラインリストに追加しますbucket() ステージ
    if (viewPipeline.size() > 1) {
        viewPipeline.add(new BsonDocument());
    }
    
    このように構成されたソースコネクタを使用すると、ビュードキュメントを取得できます.
    PCollectionTuple mongoDocs =
        pipeline.apply("Read from MongoDB",
            MongoDbIO.read()
            .withUri("mongodb+srv://" + options.getMongoDBUri())         
            .withDatabase(options.getDatabase())                        
            .withCollection(options.getCollection())
            .withBucketAuto(true) 
            .withQueryFn(
                AggregationQuery.create()
                    .withMongoDbPipeline(viewPipeline))
        )
    

    でも待って!それは巨大なコレクションで動作しますか?


    ついに!今すぐあなたのテストデータセットからドキュメントを取得することができます今すぐあなたの本当の、巨大な、MongoDBビューであなたの光沢のある新しいパイプラインをテストする準備ができて感じている.それから.

    com.mongodb.MongoCommandException:
    Command failed with error 16819 (Location16819): ‘Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.’


    ... まだ終わっていないことがわかる.少なくともエラーメッセージは非常に明確です:MongoDBインスタンスの集約パイプラインを処理するとき、メモリ(RAM)の制限が超えています.悲しいことに、この制限は設定できません.唯一の回避策はMongoDBがスワップファイルを使うことを可能にすることですallowDiskUse: true 集約パイプラインと一緒に.
    このパラメータは簡単にMongo JavaクライアントのおかげでアクセスできますAggregateIterable.allowDiskUse() . 問題は、悲しいことに、このメソッドはまだMongodbioで公開されていません.There is a feature request for it しかし、それは現在ロードマップでありません.
    残念ながらallowDiskUse() MongoDBビームコネクタの2つの場所で必要です.
  • MongoDbIO.buildAutoBuckets
  • AggregateIterable<Document> buckets = mongoCollection.aggregate(aggregates).allowDiskUse(true);
    
  • AggregationQuery.apply()
  • return collection.aggregate(mongoDbPipeline()).allowDiskUse(true).iterator();
    
    したがって、これらのクラスを現在編集する唯一の方法はフォークをフォークするか複製することです.完璧ではありませんが、少なくともパイプライン依存関係にあるクリーンアップを行うことができます.
        <!-- MongoDB connector -->
        <!-- Because of limitations, a fork of this lib is used -->
        <!--<dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-mongodb</artifactId>
          <version>${beam.version}</version>
        </dependency>-->
        <!-- The fork needs the Mongo-java driver -->
        <dependency>
          <groupId>org.mongodb</groupId>
          <artifactId>mongo-java-driver</artifactId>
          <version>3.12.7</version>
        </dependency>
    
    必要なのはmongo Javaドライバ
    この長い話は幸せな終わりを持ちます:AllowDiskuseとSWAPファイルのおかげで、あなたのカスタムMongodbioコネクタは、現在どんなサイズのMongoDBビューでも照会することができます!
    これが2回目のエピソードです.滞在は次のいずれかのためにチューニング、私は現在のGCPワークフロー、あなたのデータフローパイプラインを調整する便利な方法を紹介します