ストリーミングAPI


ストリーミングAPIはクライアントに送信するデータの大規模なセットが必要な場合です.私が考えることができるアナロジーはこれです.庭に水を入れたいなら、バケツに水を入れて、庭や植物の水を1つずつ手に入れます.これは、庭がクライアントアプリケーションであり、私のバケツを埋めるためにサーバー(ウォーターソース)で待つ必要がありますし、その満たされたデータをクライアントに設定したまま、残りのAPIに似ています.ストリーミングAPIは私が水の源からパイプをリアルタイムで水を得るように接続し、私は完全なデータが取得されるまで待つ必要はありません.メモリに膨大なデータ量を保持する必要がないので、多くのサーバーメモリを節約します.
この例では、データベースからデータをプルしようとしています.データの量が膨大なので、その後のDBへの呼び出しでデータをプルします.よく、このシナリオでトレードオフがあります.
任意のREST APIと同じようにストリーミングコントローラを作成します.
//Controller.java

@Autowired
private TaskExecutor taskExecutor;

//input query: SELECT * FROM TABLE1 WHERE ID_COLUMN > $INDEX LIMIT $BACTHSIZE
@PostMapping("/stream/query")
public ResponseBodyEmitter executeMdx(@RequestBody Input input) {

    final ResponseBodyEmitter emitter = new ResponseBodyEmitter();

    taskExecutor.execute(() -> {
        try {

            final String batchSize = "10";

            for (int i = 0; i < 10; i++) {
                input.setInputQuery(input.getInputQuery().replace("$INDEX", String.valueOf(i + 1))
                        .replace("$BATCHSIZE", batchSize));

                Connector c = _applicationContext.getBean(Connector.class, input);

                String[][] result = c.executeQuery(input);
                if (result == null)
                    break;

                for (String[] x : result) {
                    emitter.send(String.join(",", x));
                }
            }
            emitter.complete();
        } catch (Exception e) {
            emitter.completeWithError(e);
            e.printStackTrace();
        }
        emitter.complete();

    });

        return emitter;
}
私たちのユースケースクライアントアプリケーションは一度に列コンマで区切られた列を求めます.これは非常に粗野な方法です.JSONのバイナリもエミッタを使用して送信できます.また、BatchSizeの計算は、要件に応じて動的に操作することができます注意してください.
TaskExecuteBeanはこのように構成されています.
//ThreadConfig.java
@Configuration
public class ThreadConfig {
    @Bean
    public TaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(4);
        executor.setMaxPoolSize(4);        
        executor.setThreadNamePrefix("task_executor_thread");
        executor.initialize();
        return executor;
    }
}
あなたが使用している豆を確実にスレッド準備ができていることを確認してください.私は、コネクタが実行可能であることを意味します.
//Connector.java
@Service
@Scope("prototype")
public class Connector implements Runnable {

    @Autowired
    private Logger log;

    @Autowired
    private Grid grid;

    private Input _input;

    public Connector(Input input) {
        _input = input;
    }

    public String[][] executeQuery(Input input) {
        try {
            return grid.executeQuery(input);

        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }

        return null;
    }

    @Override
    public void run() {        
        executeQuery(_input);
    }
}
値をスレッドに渡すには、コンストラクタ引数を使用します.
このサンプルは、ストリーミングAPIに対処するために必要なものをガイドすることができます.すべてのusecasesの正しい方法でないかもしれません.
サーバー側のストリーミングAPIを作成したので、この方法でUIを消費する必要があります.
async function postData(url = '', data = {}) {

    var request = {
        method: 'POST',
        mode: 'cors',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(data)
    }

    await fetch('http://localhost:8080/stream/query', request)
        .then(response => {
            const reader = response.body.getReader()
            const utf8Decoder = new TextDecoder("utf-8");
            const stream = new ReadableStream({
                start(controller) {
                    function push() {
                        reader.read().then(({
                            done,
                            value
                        }) => {
                            if (done) {
                                controller.close();
                                return;
                            }

                            controller.enqueue(value);
                            let listItem = document.createElement('li');
                            listItem.textContent = utf8Decoder.decode(value);
                            console.log(listItem.textContent);
                            document.body.appendChild(listItem);
                            push();
                        });
                    };

                    push();
                }
            });

            return new Response(stream, {
                headers: {
                    "Content-Type": "text/html"
                }
            });
        });
}

postData('http://localhost:8080/stream/query', {    
    "inputQuery": "SELECT {[AUGW120]:[AUGW420],[AUG20],[Q3FY20]} ON COLUMNS, NON EMPTY CrossJoin ({[Fulfillment Region].children}, CrossJoin ( SUBSET({Descendants ([Product_Org],3)},$INDEX,$BATCHSIZE), {[CSR_Unit_Sales],[UNIT_Ships]}))ON ROWS FROM [LIBASOD2.NEW_DM] WHERE ([Version].[&CVersion],[Site].[Site],[Customer].[Customer], [Geo].[Geo],[Channel].[Channel],[Config].[Config])"
}).then(data => {
    console.log(data);
});
  • 写真でJoshua Sortino
  • もともと投稿bitsmonkey
  • 参考文献
  • baeldung
  • howtodoinjava