amazon-kinesis相关内容

Spring Aws Kinesis Binder ProvisionedThroughputExceededException,同时在批处理模式下消费消息

我正在使用批处理模式从 kinesis 流中提取记录.我们正在使用 spring aws kinesis binder. 大多数时候我们无法从流中提取消息.只有某些时候我们能够从流中提取消息. 我的配置如下所示 我的配置 弹簧:云:溪流:运动:粘合剂:锁:租期:30读取容量:1写入容量:1检查点:读取容量:1写入容量:1绑定:InStreamGroupOne:消费者:liste ..

AWS 和 AWS 之间的流式日志数据延迟是多少?谷歌云服务?

有没有人经历过: 将流式/微批处理日志数据从 Amazon 发送到 BigQuery 以进行处理,并可以阐明任何延迟问题? 将(微批量)日志从 Google DataFlow 发送到 Amazon(Kinesis/S3/DynamoDB) 有人可以提供有关延迟的信息吗? 谢谢 解决方案 在问题 1 中,我相信您对 BigQuery 提取延迟感兴趣.根据 将数据流式传输到 ..

使用 Jmeter 的 HTTP 请求将记录放入 Amazon Kinesis

我使用 jmeter 为我的网络服务 REST 创建 HTTP 请求.现在我想使用 PutRecords 方法将此请求发送到 Amazon kinesis,但我不知道如何创建请求,特别是如何在 kinesis 中设置用于签名和身份验证的 Headers 字段.有人使用过 Rest 请求吗?谢谢 解决方案 这个问题很老了,我不记得所有了,但如果有人需要,这是代码:BeanShell 采样器 ..
发布时间:2022-01-04 12:54:29 其他开发

如果记录顺序无关紧要,我可以使用单个 Kinesis 分片并行调用 Lambda 函数吗?

我有一个应用程序,我只需要 1 个 Kinesis 分片的带宽,但我需要并行调用许多 lambda 函数来跟上记录处理.我的记录大小处于高端(其中一些超过 1000 KB 的限制),但传入速率仅为 1 MB/s,因为我使用单个 EC2 实例来填充流.由于每条记录都包含一个内部时间戳,因此我不关心按顺序处理它们.基本上,我有几个月的数据需要迁移,我想并行迁移. 处理后的记录为可以处理 1000 ..
发布时间:2021-11-27 10:24:33 其他开发

使用 Kinesis Analytics 构建实时会话

某处是否有示例,或者有人可以解释如何使用 Kinesis Analytics 构建实时会话.(即会话) 这里提到这可能:https://aws.amazon.com/blogs/aws/amazon-kinesis-analytics-process-streaming-data-in-real-time-with-sql/在自定义窗口的讨论中但没有给出示例. 这通常是在 SQL 中使 ..
发布时间:2021-11-27 10:24:24 其他开发

Firehose JSON ->S3 实木复合地板 ->ETL Spark,错误:无法推断 Parquet 的架构

看起来这应该很容易,就像它是这组功能的核心用例一样,但它却是一个接一个的问题. 最新的尝试是通过 Glue Dev 端点(PySpark 和 Scala 端点)运行命令. 按照此处的说明操作:https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html 导入系统从 pyspark.context ..
发布时间:2021-11-27 10:24:15 其他开发

如何将 AWS Kinesis Video Stream GetMedia API 输出解码为 mp3/wav?

我现在使用 GetMedia API 通过 AWS Connect 服务将数据摄取到(Kinesis Video Stream)KVS 能够提取有效负载,但如何将此输出转换为 mp3/wav?我想将此输出摄取到 AWS Transcribe 服务,以获取 AWS Connect 服务摄取到 KVS 的音频呼叫的文本格式. 以下代码的Payload输出如下: 00#AWS_KINESIS ..

分片 [shardId-000000000000] 未关闭.如果我们在重新分片操作正在进行时构建了分片列表,就会发生这种情况

我在从 Amazon kinesis Stream 获取数据时收到此错误.我正在做以下步骤 创建亚马逊 kinesis Steam 使用AmazonKinesisClient 的putRecord api 放入数据. 然后使用 Worker Of KCL 库从流中获取数据. 解决方案 有几种可能性. 在您下令创建流后,您是否等待完成的时间足够长?有时,创建分片可能需要 10 ..
发布时间:2021-11-27 10:23:53 其他开发

我可以在不删除流的情况下从 amazon Kinesis 删除数据记录或分片吗?

我知道 Kinesis Stream 中的数据记录将在 24 小时内自动删除.但是在我的应用程序中,当我将一些数据写入流时,第二次如果我想写入一些其他数据,首先插入的数据应该被删除.请任何人帮助我,因为我刚开始使用 AWS Kinesis Stream...我没有从 Kinesis Service API 得到任何帮助... 解决方案 您不能从流中删除以前插入的数据,但可以使用 KCL 读 ..
发布时间:2021-11-27 10:23:29 Java开发

如何在多个记录处理器之间平衡 kinesis 分片?

我目前正在编写 Golang 版本的简单 Kinesis 客户端库 (KCL).我希望它用于我的简单 KCL 的功能之一是跨多个记录处理器和 EC2 实例的负载平衡分片.例如,我有两个记录处理器(将在单独的 EC2 实例中运行)和四个 Kinesis 分片.负载平衡功能将允许每个记录处理器处理两个 Kinesis 分片. 我读到 Java KCL 实现了这个,但我在库中找不到实现.我的问题是 ..
发布时间:2021-11-27 10:23:18 其他开发

Kinesis 流待处理消息计数

我正在尝试将 AWS Kinesis 流用于我们的数据流之一.我想出于操作目的监视我的流中的待处理消息(根据积压向下游扩展),但无法找到任何在我的流中提供(大约)待处理消息的 API. 这看起来很奇怪,因为消息在 7 天后过期,如果生产者和消费者被隔离并且无法通信,你怎么知道消息即将过期.你如何处理这个问题? 谢谢! 解决方案 Kinesis 中没有“待处理"消息这样的概念.所 ..
发布时间:2021-11-27 10:23:08 其他开发

Kinesis 分区键始终位于同一个分片中

我有一个包含 2 个分片的 kinesis 流,如下所示: {“流描述":{"StreamStatus": "ACTIVE","StreamName": "我的流",“碎片":[{"ShardId": "shardId-000000000001",“哈希键范围":{"EndingHashKey": "17014118346046923173168730371587",“起始哈希键":“0"},{ ..
发布时间:2021-11-27 10:22:48 其他开发

AWS Lambda 性能问题

我使用与 aws lambda(java) 集成的 aws api 网关,但我发现这种方法存在一些严重的问题.删除服务器并使您的应用程序开箱即用的概念非常好,但这是我面临的问题.我的 lambda 正在做 2 件简单的事情 - 验证从客户端收到的有效负载,然后将其发送到 kinesis 流以从另一个 lambda 进行进一步处理(您会问为什么我不直接发送到流并且只对所有使用 1 个 lambda的 ..

使用 AWS Kinesis Firehose 写入 S3 存储桶中的特定文件夹

我希望能够根据数据中的内容将数据发送到 kinesis firehose.例如,如果我发送此 JSON 数据: {"name": "约翰",“身份证":345} 我想根据 id 过滤数据并将其发送到我的 s3 存储桶的子文件夹,例如:S3://myS3Bucket/345_2018_03_05.使用 Kinesis Firehose 或 AWS Lambda 是否可以做到这一点? 我现在 ..