amazon-kinesis相关内容

将请求映射到没有lambda的AWS服务中,并在Api Gateway上使用AWS服务代理集成

所以我有一种情况,我想使用一个端点并将提供的请求直接映射到Kinesis流中. 我能够在aws控制台中手动完成此操作. 但是有没有办法使用无服务器或无服务器插件将集成更改为AWS服务? 我试图找到一种方法来部署与aws服务直接通信的端点,而没有lambda,并且找不到它. 解决方案 已经有一段时间了,但是最近我注意到现在有一​​个插件可以帮助您设置此确切配置,custom: a ..

我们可以编写一个AWS Lambda函数来查询Kinesis Streams吗

我有Amazon Kinesis Streams,其中包含所有点击流数据,我们想编写一个API来查询Kinesis Streams. 我的计划是创建一个API网关,该网关调用一个AWS Lambda函数,该函数将查询Kinesis Streams并将其返回. 是否可以使用Lambda查询Kinesis Streams,还是应该使用Kinesis Analytics和Lambda? ..
发布时间:2020-08-23 03:12:25 其他开发

使用先前的序列号或时间戳运行时,从Kinesis读取将提供空记录

我正在尝试借助 阅读推送到Kinesis流的消息 get_records()和get_shard_iterator()API. 我的生产者在处理结束时会不断推送记录,而消费者也每30分钟就会运行一次cron.因此,我尝试将当前读取的消息的序列号存储在数据库中,并使用AFTER_SEQUENCE_NUMBER分片迭代器以及最后读取的序列号.但是,在推送新消息后,第二次相同(第一次成功读 ..
发布时间:2020-08-23 03:12:22 其他开发

当缓慢的使用者在流处理中产生反压(火花,aws)时,避免数据丢失

我是分布式流处理(Spark)的新手.我已经阅读了一些教程/示例,这些教程/示例涵盖了背压如何导致生产者因过载的消费者而减慢速度的情况.给出的经典示例是摄取和分析推文.当流量出现意外增长而使用户无法承受负载时,他们会施加背压,生产者会通过将速率降低一些来做出响应. 我没有真正看到的是实践中使用什么方法来处理由于整个管道容量较低而无法立即处理的大量传入实时数据? 我想这的答案取决于业务领 ..

Microsoft Azure EventHub中的事件保留

我正在检查有关事件中心中邮件保留的详细信息. 假设我已将tentionalPolicy设置为1天,并且已经发送了一些消息.然后,如果我将邮件的retentionPolicy更改为3天,现有的eventData也会保留3天吗? 解决方案 关于保留策略的另一个重要细节-EventHubs 不在消息级别应用保留策略.它在文件系统级别. EventHubs是高吞吐量事件获取管道.简而言之,它是云上 ..
发布时间:2020-08-23 03:11:18 其他开发

AWS Kinesis,并发Lambda处理,保证有序

我有一个Lambda,事件源指向Kinesis Stream使用者(具有任意数量的分片) 我想确保Lambda按顺序而不是同时处理流中具有相同“分区键"的项目. (这被用作对象的标识,我不希望多个Lambda在同一对象上同时执行逻辑.) 例如,如果流中的项目具有分区键: 1,2,1,3,4,1,2,1 如果我们采用从左到右的处理顺序,则Lambda将同时使用分区键1,2、3 ..

Kinesis Streams和Flink

我有一个关于在Kinesis流中分片数据的问题.在将用户数据发送到我的运动流时,我想使用随机分区键,以便分片中的数据均匀分布.为了简化此问题,我想通过在Flink应用程序中键入userId来聚合用户数据. 我的问题是:如果分片是随机分区的,那么一个userId的数据分布在多个Kinesis分片上,Flink可以处理读取的多个分片,然后重新分发数据,以便单个userId的所有数据流到同一聚合器 ..
发布时间:2020-08-23 03:09:06 其他开发

Kinesis最大分片读取数/秒和多个使用者

因此,我有一个AWS Kinesis流,可以在其中为多个使用者发布事件.对于他们中的大多数来说,接收热数据很重要-这意味着其中许多人可能会同时轮询和读取最新数据.根据AWS文档,增加分片数量将提高并行度,而每个分片每秒读取数最大为5/sec.我的问题是是否(以及如何?)添加更多的分片是否可以帮助我的所有使用者都处于最新状态并尝试从同一分片读取新的传入数据的情况?似乎“每秒读取数"限制会自动对您可以 ..
发布时间:2020-08-23 03:09:02 其他开发

在AWS Kinesis Analytics SQL中分析滞后的滚动窗口

我有一个用例,它似乎应该得到Kinesis Analytics SQL的支持,但是我似乎无法弄清楚. 这是我的情况: 我有一个输入数据流,其中每个事件都有一个event_time 字段和device_id字段. 我想按event_time和device_id汇总数据.这里event_time是作为源数据中的字段提供的,不是将行添加到Kinesis Analytics应用程序的ROWT ..
发布时间:2020-08-23 03:08:58 其他开发

Amazon-Kinesis:记录每个碎片

我有一个Amazon Kinesis流,包含多个分片.分片的数量(因此也就是消费者的数量)不是一个常数. 我想将不常见的事件类型广播到流中的每个消费者. 生产者是否有办法广播记录,即发现碎片并将记录放到每个记录上? 解决方案 您可以执行此操作!有点... 使用参数"ExplicitHashKey"的技巧. 这使您可以设置用于记录的哈希键,因此可以选择数据正在处理的分 ..
发布时间:2020-08-23 03:06:52 Java开发

Firehose JSON-> S3实木复合地板-> ETL Spark,错误:无法推断Parquet的架构

这似乎很容易,就像这是这套功能的核心用例一样,但这是一个又一个问题. 最新的方法是尝试通过Glue Dev端点(PySpark和Scala端点)运行命令. 按照此处的说明进行操作: https://docs.aws.amazon.com/glue/latest/dg/dev-endpoint-tutorial-repl.html import sys from pyspark.c ..
发布时间:2020-08-23 03:06:49 其他开发

Flink Kinesis使用者未存储最后成功处理的序列号

我们正在使用Flink Kinesis Consumer将来自Kinesis流的数据消费到我们的Flink应用程序中. KCL库使用DynamoDB表存储最后成功处理的Kinesis流序列号.以便下次启动应用程序时,它将从上次停止的地方恢复. 但是,似乎Flink Kinesis Consumer没有维护任何此类序列号.在任何持久性存储中.因此,我们需要依靠ShardIteratort ..
发布时间:2020-08-23 03:06:45 其他开发

将数据从Amazon Aurora同步到Redshift

我正在尝试在AWS Aurora和Redshift之间建立同步.实现同步的最佳方法是什么? 可能的同步方式可以是:- 查询表以查找表中的更改(因为我仅执行插入操作,更新无关紧要),将这些更改导出到S3存储桶中的平面文件,并使用Redshift copy命令插入到Redshift中./p> 使用 python Publisher 和Boto3将更改发布到Kinesis流中然后在Fi ..

AWS Firehose换行符

关于在Firehose中添加换行符,我已经阅读了很多类似的问题,但它们都是围绕在源代码中添加换行符的.问题是我无权访问源,并且第三方正在将数据管道传输到我们的Kinesis实例,并且无法将'\ n'添加到源中. 我尝试使用以下代码进行firehose数据转换: 'use strict'; console.log('Loading function'); exports.handler ..