錆:CCV処理


このブログ記事はCSVの処理についてですが、それは退屈な音ですが、私はどのように読んで、わずか数秒でほぼ1 GBのCSVファイルを書く上で私の経験を共有したい.
シリーズの一部としてServerless Rust 他の部分をチェックアウトできます.
Part 1 and 2 さびやvscodeの設定方法を説明します.
Part 3 並列AWS SQSメッセージで処理する方法.
Part 4 受信されるAWS SQSメッセージのためにAWSステップ機能を実行する方法.
Part 5 AWS appconfigで設定を注入する方法.

問題


読みたいIMDb Datasets とタイトルを処理します.基礎.東理大理私が遊ぶことができるようにgzAmazon Neptune .
私はこのブログの記事ではアマゾンのネプチューンについて話しません.
このデータセットのファイルが大規模であるので、私はすでに異なる言語の過去の経験からそれが問題でありえたということをすでに知っていました.それで、私はRustで彼の経験を活用して、アドバイスを求めました.ニコラスは私をすぐにライブラリといくつかのメソッドを使用するように指示した.

CSV


私の使用CSV クレート.
そこには優れたドキュメントがあり、ここでいくつかのリンクを見つけることができます.
  • https://rust-lang-nursery.github.io/rust-cookbook/encoding/csv.html
  • https://docs.rs/csv/latest/csv/
  • https://docs.rs/csv/latest/csv/tutorial/index.html
  • 処理- 1を取る


    私が言ったように、私はストリームからデータを読んで、処理するために提案に続いて始めました、そして、ニコラスは私を示して親切でしたthis .
    私はAmazonのネプチューンで使用できる何かについてオリジナルのCSVを変更する必要がありました.
    println!("{}", String::from_utf8_lossy(&buffer[..len]));
    
    新しいCSVを書くのに必要なコードが必要です.
    カーゴ.toml dependencies:
    [dependencies]
    async-compression = { version = "0.3.12", features = ["all", "tokio"] }
    csv = "1.1.6"
    hyper = { version = "0.14", features = ["full"] }
    hyper-tls = "0.5.0"
    serde =  { version = "1.0", features = ["derive"] }
    tokio = { version = "1", features = ["full"] }
    tokio-stream = "0.1.8"
    tokio-util = { version = "0.6.9", features = ["full"] } 
    
    完全なコードは次のとおりです.
    use csv;
    use hyper::Client;
    use hyper_tls::HttpsConnector;
    use std::io;
    use tokio::io::AsyncReadExt;
    use tokio_stream::StreamExt;
    use tokio_util::io::StreamReader;
    const LINK: &str = "https://datasets.imdbws.com/title.basics.tsv.gz";
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn std::error::Error>> {
        let https = HttpsConnector::new();
        let client = Client::builder().build::<_, hyper::Body>(https);
        let res = client.get(LINK.parse()?).await?;
        let body = res
            .into_body()
            .map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
        let body = StreamReader::new(body);
        let mut decoder = async_compression::tokio::bufread::GzipDecoder::new(body);
        let mut buffer = [0; 1024];
        let mut wtr = csv::Writer::from_path("./export/title.csv")?;
        wtr.write_record(&[
            "~id",
            "~label",
            "titleType",
            "primaryTitle",
            "originalTitle",
            "isAdult",
            "startYear",
            "endYear",
            "runtimeMinutes",
            "genres",
        ])?;
        loop {
            let len = decoder.read(&mut buffer).await?;
            if len == 0 {
                break;
            }
            let line = String::from_utf8_lossy(&buffer[..len]);
            let line: Vec<&str> = line.split("\t").collect();
            wtr.write_record(&[
                line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
                line[8],
            ])?;
        }
         wtr.flush()?;
        Ok(())
    }
    
    コードのこの部分(品質を気にしないでください):
      let line = String::from_utf8_lossy(&buffer[..len]);
            let line: Vec<&str> = line.split("\t").collect();
            wtr.write_record(&[
                line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
                line[8],
            ])?;
    
    このコードはバイトを文字列に変換するときに問題があります.
    CSVのフォーマットとバッファのサイズのため、この変換による行は混合されました.
    例えば、私たちがオリジナルのCSVが以下であると仮定するならば:
    tconst  titleType   primaryTitle    originalTitle   isAdult startYear   endYear runtimeMinutes  genres
    tt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Short
    tt0000002   short   Le clown et ses chiens  Le clown et ses chiens  0   1892    \N  5   Animation,Short
    tt0000003   short   Pauvre Pierrot  Pauvre Pierrot  0   1892    \N  4   Animation,Comedy,Romance
    tt0000004   short   Un bon bock Un bon bock 0   1892    \N  12  Animation,Short
    tt0000005   short   Blacksmith Scene    Blacksmith Scene    0   1893    \N  1   Comedy,Short
    tt0000006   short   Chinese Opium Den   Chinese Opium Den   0   1894    \N  1   Short
    tt0000007   short   Corbett and Courtney Before the Kinetograph Corbett and Courtney Before the Kinetograph 0   1894    \N  1   Short,Sport
    
    行は
    //get the trailer of the new row
    tt0000001   short   Carmencita  Carmencita  0   1894    \N  1   Documentary,Short
    tt0000002
    
    or
    //incomplete
    Courtney Before the Kinetograph
    
    問題はどうやって図書館を使ったのか、何かを見逃してしまったのかもしれないが、解決策を見つけることができなかった.

    処理- 2を取る


    私はストリームをCSV ReaderBuilderに強制しようとしました.
    loop {
            let len = decoder.read(&mut buffer).await?;
            if len == 0 {
                break;
            }
    
            let mut rdr = csv::ReaderBuilder::new()
                .has_headers(true)
                .delimiter(b'\t')
                .flexible(true)
                .from_reader(&buffer[..len]);
    
            for result in rdr.records() {
                 let record = result?;
                 wtr.serialize(Record {
                        id: record[0].to_string(),
                        label: "movies".to_string(),
                        title_type: record[1].to_string(),
                        primary_title: record[2].to_string(),
                        original_title: record[3].to_string(),
                        is_adult: record[4].to_string().to_bool(),
                        start_year: record[5].parse::<u16>().unwrap_or_default(),
                        end_year: record[6].parse::<u16>().unwrap_or_default(),
                        runtime_minutes: record[7].parse::<u16>().unwrap_or_default(),
                        genres: record[8].to_string(),
                    })?;
            }
        }
    
    しかし、問題は同じ不完全なデータでした、しかし、若干の状況で、私はそれを働かせました、しかし、コードが醜かったので、私は幸せでありませんでした.

    処理-ファイナルテイク.今のところ


    私は、私が圧縮ファイルをダウンロードして、コードのこの部分のためにフォルダにCSVを持っていると仮定します.
    use csv;
    use serde::{Deserialize, Deserializer, Serialize};
    use std::error::Error;
    
    #[tokio::main]
    async fn main() -> Result<(), Box<dyn Error>> {
        let mut wtr = csv::WriterBuilder::default()
            .has_headers(false)
            .from_path("./export/title.csv")?;
    
        wtr.write_record(&[
            "~id",
            "~label",
            "titleType",
            "primaryTitle",
            "originalTitle",
            "isAdult",
            "startYear",
            "endYear",
            "runtimeMinutes",
            "genres",
        ])?;
    
        let mut rdr = csv::ReaderBuilder::new()
            .has_headers(true)
            .delimiter(b'\t')
            .double_quote(false)
            .escape(Some(b'\\'))
            .flexible(true)
            //.comment(Some(b'#'))
            .from_path("./import/title.basics.tsv")?;
    
        for result in rdr.deserialize() {
            let record: Record = result?;
            wtr.serialize(record)?;
        }
    
        wtr.flush()?;
    
        Ok(())
    }
    
    #[derive(Debug, Deserialize, Serialize)]
    struct Record {
        #[serde(alias = "tconst")]
        id: String,
    
        #[serde(default = "default_label")]
        label: String,
    
        #[serde(alias = "titleType")]
        title_type: String,
    
        #[serde(alias = "primaryTitle")]
        primary_title: String,
    
        #[serde(alias = "originalTitle")]
        original_title: String,
    
        #[serde(alias = "isAdult")]
        #[serde(deserialize_with = "bool_from_string")]
        is_adult: bool,
    
        #[serde(alias = "startYear")]
        #[serde(deserialize_with = "csv::invalid_option")]
        start_year: Option<u16>,
    
        #[serde(alias = "endYear")]
        #[serde(deserialize_with = "csv::invalid_option")]
        end_year: Option<u16>,
    
        #[serde(alias = "runtimeMinutes")]
        #[serde(deserialize_with = "csv::invalid_option")]
        runtime_minutes: Option<u16>,
    
        #[serde(alias = "genres")]
        #[serde(deserialize_with = "csv::invalid_option")]
        genres: Option<String>,
    }
    
    fn default_label() -> String {
        "movies".to_string()
    }
    
    /// Deserialize bool from String with custom value mapping
    fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
    where
        D: Deserializer<'de>,
    {
        match String::deserialize(deserializer)?.as_ref() {
            "1" => Ok(true),
            "0" => Ok(false),
            _ => Ok(false),
        }
    }
    
    
    CSVファイルを読むためにCSV ::ReaderBuilderを使用することに決めます.
    このCSVデータを読むには、次のように設定します.
  • ヘッダを有効にする.これは最初の行をスキップします.
  • からの区切り文字を「タブ」に変更します.
  • エスケープバックスラッシュ.
  • いくつかの奇妙な形式であるので、柔軟な長さのレコードを許可します.
  • 任意のレコードを扱う代わりに、Serdeを使用して特定の型のレコードを逆シリアル化します.例えば、私はserdeannotation to attributes 元のCSVをマップするために.
    #[derive(Debug, Deserialize, Serialize)]
    struct Record {
        #[serde(alias = "tconst")]
        id: String,
      .....
    
    オリジナルのCSVの「ISAPE」列は「1」と「0」の形式で、純粋なBooleanではないので、変換する必要があります.
    /// Deserialize bool from String with custom value mapping
    fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
    where
        D: Deserializer<'de>,
    {
        match String::deserialize(deserializer)?.as_ref() {
            "1" => Ok(true),
            "0" => Ok(false),
            _ => Ok(false),
        }
    }
    
    最後に、他のデータが元のCSVで必須ではないので、それは/Nとして表示されます.ターゲットが異なるタイプの場合は間違った逆シリアル化を処理する必要があります.
    このためにはcsv::invalid_option :
    #[serde(deserialize_with = "csv::invalid_option")]
    
    このフィールドのどんな逆シリアル化エラーもNone値に変換するようにSerdeに伝えます.
    シリアライズの結果は空の値を持つCSVになります("-")
    ~id,~label,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
    tt0000001,movies,short,Carmencita,Carmencita,false,1894,,1,"Documentary,Short"
    

    結論


    現在、AWSラムダは512 MBの一時的な記憶装置を持っているので、この使用ケースはファイルサイズのために合わないでしょう.しかし、我々がより重要な一時記憶を1日持つならば、我々はそれをAWSラムダの中に走らせることができました.
    もう一つはAmazon EFS , 他のAWSサービスによって消費されるように設計された完全に管理された、柔軟で、共有されたファイルシステム.それは発表されましたJun 16, 2020 . AWSラムダは自動的にファイルシステムをマウントし、データを読み書きするためのローカルパスを提供します.もっと読みたいなら、素晴らしい記事がありますhere .
    もう一つの選択肢はAWS Batch と一緒にスポットインスタンスでAWS Step Functions サービスインテグレーションを活用すると、ジョブ(. sync)パターンが実行されます.AWSバッチSubmitjobを呼んだ後に、ワークフローは休止します.ジョブが完了すると、ステップ関数は次の状態に進む.
    CSVクライトは素晴らしい仕事をし、それは信じられないほど高速です.私はこのスクリプトをリリースモードで実行することができます.