錆:CCV処理
13255 ワード
このブログ記事はCSVの処理についてですが、それは退屈な音ですが、私はどのように読んで、わずか数秒でほぼ1 GBのCSVファイルを書く上で私の経験を共有したい.
シリーズの一部としてServerless Rust 他の部分をチェックアウトできます.
Part 1 and 2 さびやvscodeの設定方法を説明します.
Part 3 並列AWS SQSメッセージで処理する方法.
Part 4 受信されるAWS SQSメッセージのためにAWSステップ機能を実行する方法.
Part 5 AWS appconfigで設定を注入する方法.
読みたいIMDb Datasets とタイトルを処理します.基礎.東理大理私が遊ぶことができるようにgzAmazon Neptune .
私はこのブログの記事ではアマゾンのネプチューンについて話しません.
このデータセットのファイルが大規模であるので、私はすでに異なる言語の過去の経験からそれが問題でありえたということをすでに知っていました.それで、私はRustで彼の経験を活用して、アドバイスを求めました.ニコラスは私をすぐにライブラリといくつかのメソッドを使用するように指示した.
私の使用CSV クレート.
そこには優れたドキュメントがあり、ここでいくつかのリンクを見つけることができます. https://rust-lang-nursery.github.io/rust-cookbook/encoding/csv.html https://docs.rs/csv/latest/csv/ https://docs.rs/csv/latest/csv/tutorial/index.html
私が言ったように、私はストリームからデータを読んで、処理するために提案に続いて始めました、そして、ニコラスは私を示して親切でしたthis .
私はAmazonのネプチューンで使用できる何かについてオリジナルのCSVを変更する必要がありました.
カーゴ.toml dependencies:
CSVのフォーマットとバッファのサイズのため、この変換による行は混合されました.
例えば、私たちがオリジナルのCSVが以下であると仮定するならば:
私はストリームをCSV ReaderBuilderに強制しようとしました.
私は、私が圧縮ファイルをダウンロードして、コードのこの部分のためにフォルダにCSVを持っていると仮定します.
このCSVデータを読むには、次のように設定します. ヘッダを有効にする.これは最初の行をスキップします. からの区切り文字を「タブ」に変更します. エスケープバックスラッシュ. いくつかの奇妙な形式であるので、柔軟な長さのレコードを許可します. 任意のレコードを扱う代わりに、Serdeを使用して特定の型のレコードを逆シリアル化します.例えば、私はserdeannotation to attributes 元のCSVをマップするために.
このためにはcsv::invalid_option :
シリアライズの結果は空の値を持つCSVになります("-")
現在、AWSラムダは512 MBの一時的な記憶装置を持っているので、この使用ケースはファイルサイズのために合わないでしょう.しかし、我々がより重要な一時記憶を1日持つならば、我々はそれをAWSラムダの中に走らせることができました.
もう一つはAmazon EFS , 他のAWSサービスによって消費されるように設計された完全に管理された、柔軟で、共有されたファイルシステム.それは発表されましたJun 16, 2020 . AWSラムダは自動的にファイルシステムをマウントし、データを読み書きするためのローカルパスを提供します.もっと読みたいなら、素晴らしい記事がありますhere .
もう一つの選択肢はAWS Batch と一緒にスポットインスタンスでAWS Step Functions サービスインテグレーションを活用すると、ジョブ(. sync)パターンが実行されます.AWSバッチSubmitjobを呼んだ後に、ワークフローは休止します.ジョブが完了すると、ステップ関数は次の状態に進む.
CSVクライトは素晴らしい仕事をし、それは信じられないほど高速です.私はこのスクリプトをリリースモードで実行することができます.
シリーズの一部としてServerless Rust 他の部分をチェックアウトできます.
Part 1 and 2 さびやvscodeの設定方法を説明します.
Part 3 並列AWS SQSメッセージで処理する方法.
Part 4 受信されるAWS SQSメッセージのためにAWSステップ機能を実行する方法.
Part 5 AWS appconfigで設定を注入する方法.
問題
読みたいIMDb Datasets とタイトルを処理します.基礎.東理大理私が遊ぶことができるようにgzAmazon Neptune .
私はこのブログの記事ではアマゾンのネプチューンについて話しません.
このデータセットのファイルが大規模であるので、私はすでに異なる言語の過去の経験からそれが問題でありえたということをすでに知っていました.それで、私はRustで彼の経験を活用して、アドバイスを求めました.ニコラスは私をすぐにライブラリといくつかのメソッドを使用するように指示した.
CSV
私の使用CSV クレート.
そこには優れたドキュメントがあり、ここでいくつかのリンクを見つけることができます.
処理- 1を取る
私が言ったように、私はストリームからデータを読んで、処理するために提案に続いて始めました、そして、ニコラスは私を示して親切でしたthis .
私はAmazonのネプチューンで使用できる何かについてオリジナルのCSVを変更する必要がありました.
println!("{}", String::from_utf8_lossy(&buffer[..len]));
新しいCSVを書くのに必要なコードが必要です.カーゴ.toml dependencies:
[dependencies]
async-compression = { version = "0.3.12", features = ["all", "tokio"] }
csv = "1.1.6"
hyper = { version = "0.14", features = ["full"] }
hyper-tls = "0.5.0"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1.8"
tokio-util = { version = "0.6.9", features = ["full"] }
完全なコードは次のとおりです.use csv;
use hyper::Client;
use hyper_tls::HttpsConnector;
use std::io;
use tokio::io::AsyncReadExt;
use tokio_stream::StreamExt;
use tokio_util::io::StreamReader;
const LINK: &str = "https://datasets.imdbws.com/title.basics.tsv.gz";
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
let res = client.get(LINK.parse()?).await?;
let body = res
.into_body()
.map(|result| result.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e)));
let body = StreamReader::new(body);
let mut decoder = async_compression::tokio::bufread::GzipDecoder::new(body);
let mut buffer = [0; 1024];
let mut wtr = csv::Writer::from_path("./export/title.csv")?;
wtr.write_record(&[
"~id",
"~label",
"titleType",
"primaryTitle",
"originalTitle",
"isAdult",
"startYear",
"endYear",
"runtimeMinutes",
"genres",
])?;
loop {
let len = decoder.read(&mut buffer).await?;
if len == 0 {
break;
}
let line = String::from_utf8_lossy(&buffer[..len]);
let line: Vec<&str> = line.split("\t").collect();
wtr.write_record(&[
line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
line[8],
])?;
}
wtr.flush()?;
Ok(())
}
コードのこの部分(品質を気にしないでください): let line = String::from_utf8_lossy(&buffer[..len]);
let line: Vec<&str> = line.split("\t").collect();
wtr.write_record(&[
line[0], "movies", line[1], line[2], line[3], line[4], line[5], line[6], line[7],
line[8],
])?;
このコードはバイトを文字列に変換するときに問題があります.CSVのフォーマットとバッファのサイズのため、この変換による行は混合されました.
例えば、私たちがオリジナルのCSVが以下であると仮定するならば:
tconst titleType primaryTitle originalTitle isAdult startYear endYear runtimeMinutes genres
tt0000001 short Carmencita Carmencita 0 1894 \N 1 Documentary,Short
tt0000002 short Le clown et ses chiens Le clown et ses chiens 0 1892 \N 5 Animation,Short
tt0000003 short Pauvre Pierrot Pauvre Pierrot 0 1892 \N 4 Animation,Comedy,Romance
tt0000004 short Un bon bock Un bon bock 0 1892 \N 12 Animation,Short
tt0000005 short Blacksmith Scene Blacksmith Scene 0 1893 \N 1 Comedy,Short
tt0000006 short Chinese Opium Den Chinese Opium Den 0 1894 \N 1 Short
tt0000007 short Corbett and Courtney Before the Kinetograph Corbett and Courtney Before the Kinetograph 0 1894 \N 1 Short,Sport
行は//get the trailer of the new row
tt0000001 short Carmencita Carmencita 0 1894 \N 1 Documentary,Short
tt0000002
or
//incomplete
Courtney Before the Kinetograph
問題はどうやって図書館を使ったのか、何かを見逃してしまったのかもしれないが、解決策を見つけることができなかった.処理- 2を取る
私はストリームをCSV ReaderBuilderに強制しようとしました.
loop {
let len = decoder.read(&mut buffer).await?;
if len == 0 {
break;
}
let mut rdr = csv::ReaderBuilder::new()
.has_headers(true)
.delimiter(b'\t')
.flexible(true)
.from_reader(&buffer[..len]);
for result in rdr.records() {
let record = result?;
wtr.serialize(Record {
id: record[0].to_string(),
label: "movies".to_string(),
title_type: record[1].to_string(),
primary_title: record[2].to_string(),
original_title: record[3].to_string(),
is_adult: record[4].to_string().to_bool(),
start_year: record[5].parse::<u16>().unwrap_or_default(),
end_year: record[6].parse::<u16>().unwrap_or_default(),
runtime_minutes: record[7].parse::<u16>().unwrap_or_default(),
genres: record[8].to_string(),
})?;
}
}
しかし、問題は同じ不完全なデータでした、しかし、若干の状況で、私はそれを働かせました、しかし、コードが醜かったので、私は幸せでありませんでした.処理-ファイナルテイク.今のところ
私は、私が圧縮ファイルをダウンロードして、コードのこの部分のためにフォルダにCSVを持っていると仮定します.
use csv;
use serde::{Deserialize, Deserializer, Serialize};
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let mut wtr = csv::WriterBuilder::default()
.has_headers(false)
.from_path("./export/title.csv")?;
wtr.write_record(&[
"~id",
"~label",
"titleType",
"primaryTitle",
"originalTitle",
"isAdult",
"startYear",
"endYear",
"runtimeMinutes",
"genres",
])?;
let mut rdr = csv::ReaderBuilder::new()
.has_headers(true)
.delimiter(b'\t')
.double_quote(false)
.escape(Some(b'\\'))
.flexible(true)
//.comment(Some(b'#'))
.from_path("./import/title.basics.tsv")?;
for result in rdr.deserialize() {
let record: Record = result?;
wtr.serialize(record)?;
}
wtr.flush()?;
Ok(())
}
#[derive(Debug, Deserialize, Serialize)]
struct Record {
#[serde(alias = "tconst")]
id: String,
#[serde(default = "default_label")]
label: String,
#[serde(alias = "titleType")]
title_type: String,
#[serde(alias = "primaryTitle")]
primary_title: String,
#[serde(alias = "originalTitle")]
original_title: String,
#[serde(alias = "isAdult")]
#[serde(deserialize_with = "bool_from_string")]
is_adult: bool,
#[serde(alias = "startYear")]
#[serde(deserialize_with = "csv::invalid_option")]
start_year: Option<u16>,
#[serde(alias = "endYear")]
#[serde(deserialize_with = "csv::invalid_option")]
end_year: Option<u16>,
#[serde(alias = "runtimeMinutes")]
#[serde(deserialize_with = "csv::invalid_option")]
runtime_minutes: Option<u16>,
#[serde(alias = "genres")]
#[serde(deserialize_with = "csv::invalid_option")]
genres: Option<String>,
}
fn default_label() -> String {
"movies".to_string()
}
/// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: Deserializer<'de>,
{
match String::deserialize(deserializer)?.as_ref() {
"1" => Ok(true),
"0" => Ok(false),
_ => Ok(false),
}
}
CSVファイルを読むためにCSV ::ReaderBuilderを使用することに決めます.このCSVデータを読むには、次のように設定します.
#[derive(Debug, Deserialize, Serialize)]
struct Record {
#[serde(alias = "tconst")]
id: String,
.....
オリジナルのCSVの「ISAPE」列は「1」と「0」の形式で、純粋なBooleanではないので、変換する必要があります./// Deserialize bool from String with custom value mapping
fn bool_from_string<'de, D>(deserializer: D) -> Result<bool, D::Error>
where
D: Deserializer<'de>,
{
match String::deserialize(deserializer)?.as_ref() {
"1" => Ok(true),
"0" => Ok(false),
_ => Ok(false),
}
}
最後に、他のデータが元のCSVで必須ではないので、それは/Nとして表示されます.ターゲットが異なるタイプの場合は間違った逆シリアル化を処理する必要があります.このためにはcsv::invalid_option :
#[serde(deserialize_with = "csv::invalid_option")]
このフィールドのどんな逆シリアル化エラーもNone値に変換するようにSerdeに伝えます.シリアライズの結果は空の値を持つCSVになります("-")
~id,~label,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
tt0000001,movies,short,Carmencita,Carmencita,false,1894,,1,"Documentary,Short"
結論
現在、AWSラムダは512 MBの一時的な記憶装置を持っているので、この使用ケースはファイルサイズのために合わないでしょう.しかし、我々がより重要な一時記憶を1日持つならば、我々はそれをAWSラムダの中に走らせることができました.
もう一つはAmazon EFS , 他のAWSサービスによって消費されるように設計された完全に管理された、柔軟で、共有されたファイルシステム.それは発表されましたJun 16, 2020 . AWSラムダは自動的にファイルシステムをマウントし、データを読み書きするためのローカルパスを提供します.もっと読みたいなら、素晴らしい記事がありますhere .
もう一つの選択肢はAWS Batch と一緒にスポットインスタンスでAWS Step Functions サービスインテグレーションを活用すると、ジョブ(. sync)パターンが実行されます.AWSバッチSubmitjobを呼んだ後に、ワークフローは休止します.ジョブが完了すると、ステップ関数は次の状態に進む.
CSVクライトは素晴らしい仕事をし、それは信じられないほど高速です.私はこのスクリプトをリリースモードで実行することができます.
Reference
この問題について(錆:CCV処理), 我々は、より多くの情報をここで見つけました https://dev.to/aws-builders/rust-csv-processing-1ndaテキストは自由に共有またはコピーできます。ただし、このドキュメントのURLは参考URLとして残しておいてください。
Collection and Share based on the CC Protocol