amazon-kinesis相关内容

如何将 Java Kinesis 客户端库与 X-Ray 一起使用?

添加 aws-xray-recorder-sdk-aws-sdk-instrumentor 时,KCL 会引发 SegmentNotFoundException. 据我所知,这是因为 KCL 正在启动他们的 自己的线程 所以我对 AWSXRay.beginSegment() 的调用不适用于在这些线程上运行的检测请求,但我没有直接控制这些线程,所以我无法设置上下文/段. 有什么解决方法吗 ..
发布时间:2021-10-27 18:53:12 其他开发

使用lambda函数将数据发送到kinesis流(在不同的AWS账户中)

我有一个lambda函数,可以写入运动流.但是现在,我想写一个属于另一个AWS账户的运动流.假设我具有所有必需的跨帐户权限,如何将数据发送到此流?调用kinesis构造函数或putRecord函数时应如何更改参数? 解决方案 上面的方法在技术上是可行的,但是硬编码凭据甚至将凭据配置为lambda对我来说似乎有点多余,因为lambda本身要求您具有职务.您需要做的是创建跨帐户信任并使用sts ..
发布时间:2021-04-13 18:40:01 其他开发

Apache Beam:状态规范中的TTL

我们正在从Kinesis中读取内容并写入镶木地板,并使用 StateSpec> 来避免在从上一个保存点正常停止并重新启动管道之后重复处理记录. > 我们看到一些记录被重复,因为它们最终在随后的重新启动时落在了不同的任务管理器上,并且我们使用 StateSpec> 来存储有关已处理记录的有状态信息并避免重复. ..

Kinesis使用者getRecords不能始终返回10,000条记录

我有一个运动学流(20个碎片),大约有1天的数据滞后,这是基于KCL的运动学消费者所消耗的.使用者部署了20个ECS实例,因此每个实例都有一个线程从每个分片中提取数据. 根据文档,看起来一个单独的getRecords调用最多可以获取10,000条记录或最大有效负载大小为10 MB.但是,当我监视使用者日志时,似乎并非所有碎片都达到此限制.使用单个getRecords调用获取的记录在使用者实例 ..

我们如何将大型有效载荷压缩成运动流?

我有一个5 MB的JSON负载,我需要使用放置记录将其推送到Kinesis流.由于Kinesis数据大小限制为1 MB,因此我应该遵循哪些方法来压缩数据以及将要执行的步骤 解决方案 如果压缩后您的json有效负载仍然太大,那么通常会有两个选项: 将拆分为多个较小的有效负载.使用者必须能够根据您的有效载荷的 part id 来重构有效载荷. 将大型有效负载数据存储在流外部,例如在 ..
发布时间:2021-04-03 19:14:17 其他开发

带有AWS Kinesis的Debezium嵌入式引擎-PostgreSQL快照加载和事务元数据流

我想在AWS Kinesis中使用Debezium嵌入式引擎,以便加载PostgreSQL数据库的初始快照,然后连续执行CDC. 我知道,使用Kafka Connect,我可以开箱即用的“交易元数据"主题,以便检查交易边界. 使用Debezium嵌入式引擎和AWS Kinesis进行同样的操作( https://debezium.io/blog/2018/08/30/streaming ..
发布时间:2021-04-03 19:14:14 其他开发

AWS firehose lambda函数调用给出了错误的输出结构格式

当我使用put操作将数据对象插入aws firhose流时,它可以正常工作.由于我的firehose流上启用了lambda函数.因此,调用了lambda函数但给了我输出结构响应错误: "errorMessage":“无效的输出结构:请检查函数,并确保已处理的记录包含Dropped,Ok或ProcessingFailed的有效结果状态. 所以现在我以这种方式创建了lambda函数,以实现正确的 ..

使用Python解析和渲染Kinesis Video Streams并获得输入帧的图像表示

我建立了一个管道,在其中将视频实时流传输到Kinesis Video Stream(KVS),Kinesis Video Stream(KVS)将帧发送到Amazon Rekognition以进行人脸识别,再将帧发送到Kinesis Data Stream(KDS).最后,KDS将结果发送到lambda. 对于进行了面部识别的框架,我得到以下格式的JSON: get_media_for_fr ..
发布时间:2021-04-03 19:14:09 Python

Apache Beam如何管理运动学检查点?

我有一个用Apache Beam(使用Spark Runner)开发的流传输管道,该管道从kinesis流中读取数据. 我正在寻找Apache Beam中用于管理运动学检查点的选项(即定期存储运动学流的当前位置),以便它使系统从故障中恢复并继续处理流停止的地方. 是否有可供Apache Beam支持的运动学检查点的设置,类似于Spark Streaming(参考链接- Sequence ..
发布时间:2021-04-03 19:14:03 其他开发

Flink检查点的大小增长超过20GB,检查点时间超过1分钟

首要: 我是Flink的新手(了解原理并能够创建我需要的任何基本流工作) 我使用Kinesis Analytics运行Flink作业,默认情况下,它使用间隔为1分钟的增量检查点. Flink作业正在使用FlinkKinesisConsumer和自定义反序列化器(将字节反序列化为一个简单的Java对象,在整个作业中使用)从Kinesis流中读取事件 我想存档的只是简单地计算过去24小 ..

Apache Spark Kinesis Integration:已连接,但未收到任何记录

tldr; ,因为它不接收数据,所以不能使用Kinesis Spark Streaming集成. 已建立测试流,nodejs应用每秒发送1条简单记录. 在环境中使用docker-compose,AWS凭证设置具有主节点和工作节点(4个核心)的标准Spark 1.5.2集群 spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 已下载并添加 ..
发布时间:2021-04-03 19:13:47 其他开发