如何通过Java中的Executor框架在DynamoDb中获得最佳的批量插入率? [英] How to get optimal bulk insertion rate in DynamoDb through Executor Framework in Java?

查看:92
本文介绍了如何通过Java中的Executor框架在DynamoDb中获得最佳的批量插入率?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Java DynamoDB SDK在本地Dynamo DB中进行批量写入(大约5.5k项)的POC。我知道每个批量写入操作不能超过25个写入操作,因此我将整个数据集分为25个项目。然后,我将这些块作为可执行程序传递给Executor框架。不过,由于在100多秒内插入了5.5k记录,我的结果仍不令人满意。

I'm doing a POC on Bulk write (around 5.5k items) in local Dynamo DB using DynamoDB SDK for Java. I'm aware that each bulk write cannot have more than 25 write operations, so I am dividing the whole dataset into chunks of 25 items each. Then I'm passing these chunks as callable actions in Executor framework. Still, I'm not having a satisfactory result as the 5.5k records are getting inserted in more than 100 seconds.

我不确定我还能如何优化它。在创建表格时,我将WriteCapacityUnit设置为400(不确定我可以提供的最大值是多少),并进行了一些尝试,但没有任何区别。我也尝试过更改执行程序中的线程数。

I'm not sure how else can I optimize this. While creating the table I provisioned the WriteCapacityUnit as 400(not sure what's the maximum value I can give) and experimented with it a bit, but it never made any difference. I have also tried changing the number of threads in executor.

这是执行批量写入操作的主要代码:

This is the main code to perform the bulk write operation:


    public static void main(String[] args) throws Exception {

        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        final AmazonDynamoDB aws = new AmazonDynamoDBClient(new BasicAWSCredentials("x", "y"));
        aws.setEndpoint("http://localhost:8000");

        JSONArray employees = readFromFile();
        Iterator<JSONObject> iterator = employees.iterator();

        List<WriteRequest> batchList = new ArrayList<WriteRequest>();

        ExecutorService service = Executors.newFixedThreadPool(20);

        List<BatchWriteItemRequest> listOfBatchItemsRequest = new ArrayList<>();
        while(iterator.hasNext()) {
            if (batchList.size() == 25) {
                Map<String, List<WriteRequest>> batchTableRequests = new HashMap<String, List<WriteRequest>>();
                batchTableRequests.put("Employee", batchList);
                BatchWriteItemRequest batchWriteItemRequest = new BatchWriteItemRequest();
                batchWriteItemRequest.setRequestItems(batchTableRequests);
                listOfBatchItemsRequest.add(batchWriteItemRequest);
                batchList = new ArrayList<WriteRequest>();
            }
            PutRequest putRequest = new PutRequest();
            putRequest.setItem(ItemUtils.fromSimpleMap((Map) iterator.next()));
            WriteRequest writeRequest = new WriteRequest();
            writeRequest.setPutRequest(putRequest);
            batchList.add(writeRequest);
        }

        StopWatch watch = new StopWatch();
        watch.start();

        List<Future<BatchWriteItemResult>> futureListOfResults = listOfBatchItemsRequest.stream().
                map(batchItemsRequest -> service.submit(() -> aws.batchWriteItem(batchItemsRequest))).collect(Collectors.toList());

        service.shutdown();

        while(!service.isTerminated());

        watch.stop();
        System.out.println("Total time taken : " + watch.getTotalTimeSeconds());

    }

}

这是用于创建dynamoDB表的代码:

This is the code used to create the dynamoDB table:

    public static void main(String[] args) throws Exception {
        AmazonDynamoDBClient client = new AmazonDynamoDBClient().withEndpoint("http://localhost:8000");

        DynamoDB dynamoDB = new DynamoDB(client);
        String tableName = "Employee";
        try {
            System.out.println("Creating the table, wait...");
            Table table = dynamoDB.createTable(tableName, Arrays.asList(new KeySchemaElement("ID", KeyType.HASH)

            ), Arrays.asList(new AttributeDefinition("ID", ScalarAttributeType.S)),
                    new ProvisionedThroughput(1000L, 1000L));
            table.waitForActive();
            System.out.println("Table created successfully.  Status: " + table.getDescription().getTableStatus());

        } catch (Exception e) {
            System.err.println("Cannot create the table: ");
            System.err.println(e.getMessage());
        }
    }


推荐答案

DynamoDB本地是作为需要脱机开发DynamoDB的开发人员的工具提供的,并非针对规模或性能而设计。因此,它不适用于规模测试,并且如果您需要测试批量负载或其他高速工作负载,则最好使用真实表。在活动表上进行开发人员测试的实际成本通常非常小,因为在测试运行期间仅需要为高容量配置表即可。

DynamoDB Local is provided as a tool for developers who need to develop offline for DynamoDB and is not designed for scale or performance. As such it is not intended for scale testing, and if you need to test bulk loads or other high velocity workloads it is best to use a real table. The actual cost incurred from dev testing on a live table is usually quite minimal as the tables only need to be provisioned for high capacity during the test runs.

这篇关于如何通过Java中的Executor框架在DynamoDb中获得最佳的批量插入率?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆