並列スキャンとバッチ書込みを用いたダイナモテーブル間の効率的な複製


最近、我々はダイナモテーブルから大量のデータを別のアカウントにコピーする必要があった.元々、私たちは、より多くの記録が得られる必要があるかどうかチェックするために、ExclusiveStartKey/LastValuatedKeyマーカーでアイテムを得るためにスキャン機能を使用しました.次に、putitem API呼び出しを使ってデータを宛先テーブルに挿入します.テーブルに数十万枚のレコードがあったので、数時間かかってコピーしました.その時点で我々はデータをコピーするためのより速い方法を使用する必要があることを決めた.並列走査とバッチ書き込みを用いてコピー時間を大幅に改善した.このポストはそれらを使用する方法の例を提供し、使用するメソッドの結果を比較します.
デモ目的のために、私は同じアカウントを使用しています.私は、プログラム的に単純なDynamoDBテーブルを作成する方法を書きました.それはいくつかの都市で2020年の気温に関するデータを含みます
private static async Task<TableDescription> CreateTable(string name)
{
    var request = new CreateTableRequest
    {
        AttributeDefinitions = new List<AttributeDefinition>()
        {
            new AttributeDefinition{
                AttributeName = "City",
                AttributeType = "S"
            },
            new AttributeDefinition{
                AttributeName = "Date",
                AttributeType = "S"
            }
        },
        TableName = name,
        ProvisionedThroughput = new ProvisionedThroughput
        {
            ReadCapacityUnits = 15,
            WriteCapacityUnits = 15
        },
        KeySchema = new List<KeySchemaElement>
        {
            new KeySchemaElement
            {
                AttributeName="City",
                KeyType="HASH"
            },
            new KeySchemaElement
            {
                AttributeName="Date",
                KeyType="Range"
            }
        }
    };
    var response = await client.CreateTableAsync(request);
    return response.TableDescription;
}
私は上記のメソッドを使っていくつかのテーブルを作成します.
await CreateTable(sourceTableName);
await CreateTable(destinationTableNameSlow);
await CreateTable(destinationTableNameFast);

var describeTableRequest = new DescribeTableRequest
{
    TableName = sourceTableName
};

DescribeTableResponse describeTableResponse;
do
{
    System.Threading.Thread.Sleep(1000);
    describeTableResponse = await client.DescribeTableAsync(describeTableRequest);
}
while (describeTableResponse.Table.TableStatus != TableStatus.ACTIVE);
次に、ソーステーブルに温度に関するデータを設定します.
for (var i = 0; i <= 100; i++)
{
    var putItemRequest = new PutItemRequest
    {
        TableName = sourceTableName,
        Item = new Dictionary<string, AttributeValue>()
        {
            {"City", new AttributeValue{S=cities[(new Random()).Next(cities.Length)]  } },
            {"Date", new AttributeValue{S=GetRandom2020Date()  } },
            {"Highest", new AttributeValue{N=(new Random()).Next(20,30).ToString()  } },
            {"Lowest", new AttributeValue{N=(new Random()).Next(1,10).ToString()  } }
        }
    };
    await client.PutItemAsync(putItemRequest);
}
今、最も興味深い部分が始まります.まず、データをゆっくりコピーする方法を呼び出します.私はスキャンとputitem API呼び出しを使用します.
private static async Task CopySlowly()
{
    Stopwatch sw = new Stopwatch();
    sw.Start();
    var request = new ScanRequest
    {
        TableName = sourceTableName
    };

    var result = await client.ScanAsync(request, default);
    foreach (var item in result.Items)
    {
        var putItemRequest = new PutItemRequest
        {
            TableName = destinationTableNameSlow,
            Item = item
        };

        await client.PutItemAsync(putItemRequest, default);
    }
    sw.Stop();
    Console.Write($"Copy slow - {sw.ElapsedMilliseconds} milliseconds elapsed");
    Console.ReadLine();
}
デモテーブルには100項目しかないので、スキャン操作でexclusivestartkey/lastevaluatedkeyを使用していません.スキャンがデータの1 MBの最大値を得るとき、それらは確かに大きいテーブルに必要です.
次に、パラレルスキャンを使用してデータをコピーする別の方法を呼び出します.私はtotalsegment変数を使用してどのように多くの並列ワーカースレッドを作成するかを指定します.この場合、3に設定されていましたが、あなたが好きなだけ持っているかもしれません)
private static void CopyFast()
{
    Stopwatch sw = new Stopwatch();            
    sw.Start();
    Task[] tasks = new Task[totalSegments];
    for (int segment = 0; segment < totalSegments; segment++)
    {
        int tmpSegment = segment;
        tasks[segment] = Task.Run(() => ScanSegment(tmpSegment));
    }

    Task.WaitAll(tasks);
    sw.Stop();

    Console.WriteLine($"Copy fast - {sw.ElapsedMilliseconds} milliseconds elapsed");
    Console.ReadLine();
}
private static async Task ScanSegment(int tmpSegment)
{
    var request = new ScanRequest
    {
        TableName = sourceTableName,
        Segment = tmpSegment,
        TotalSegments = totalSegments,
    };

    var result = await client.ScanAsync(request);

    for (var i = 0; i < result.Items.Count; i += 25)
    {
        var items = result.Items.Skip(i).Take(25).ToArray();
        var req = new BatchWriteItemRequest
        {
            RequestItems = new Dictionary<string, List<WriteRequest>>
            {
                {
                    destinationTableNameFast,
                    items.Select(i => new WriteRequest(new PutRequest(i))).ToList()
                }
            }
        };

        await client.BatchWriteItemAsync(req);
    }
}
単純なスキャンは一度に1つのパーティションにアクセスしますが、パラレルスキャンを使用する場合、いくつかのワーカースレッドが作成され、それらの各々がテーブルのあるセグメントをスキャンします.BatchWriteItem操作を使用すると、一度に最大25アイテムのputitem要求を作成することができます.
その結果、コピー速度の違いが目立つ.

もう一つのポイントは、このタスクは、私が取った(と合格!)のAWS開発者の准試験の準備に役立つということです2021年2月には、並列テストとBatchWriteItemについての実際のテストでいくつかの質問があった、私は非常に私は仕事でこのシナリオを出くわしていたと正しい答えを知っていたことを嬉しく思った!
ハッピーラーニング!