如何扇出 AWS kinesis 流? [英] How to fanout an AWS kinesis stream?

查看:27
本文介绍了如何扇出 AWS kinesis 流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想将输入 AWS Kinesis 流扇出/链接/复制到 N 个新的 Kinesis 流,以便写入输入 Kinesis 的每条记录都将出现在 N 个流中的每一个中.

是否有 AWS 服务或开源解决方案?

如果有现成的解决方案,我宁愿不编写代码来执行此操作.部分:

<块引用>

您还可以编写相互独立运行的 SQL 查询.例如,您可以编写两个 SQL 语句来查询相同的应用程序内流,但将输出发送到不同的应用程序内流.

我设法实现了如下:

  • 创建了三个流:input、output1、output2
  • 创建了两个 Amazon Kinesis Analytics 应用程序:copy1、copy2

Amazon Kinesis Analytics SQL 应用程序如下所示:

创建或替换流DESTINATION_SQL_STREAM"(记录 VARCHAR(16));创建或替换泵COPY_PUMP1"AS插入DESTINATION_SQL_STREAM"从SOURCE_SQL_STREAM_001"中选择流日志";

此代码创建了一个 (将其视为一个连续的选择语句),它从 input 流中进行选择并输出到 output1溪流.我创建了另一个相同的应用程序,输出到 output2 流.

为了测试,我将数据发送到 input 流:

#!/usr/bin/env python导入json,时间从 boto 进口 kinesiskinesis = kinesis.connect_to_region("us-west-2")我 = 0而真:数据={}数据['日志'] = '记录' + str(i)我 += 1打印数据kinesis.put_record("input", json.dumps(data), "key")时间.sleep(2)

我让它运行了一段时间,然后使用以下代码显示输出:

from boto import kinesiskinesis = kinesis.connect_to_region("us-west-2")iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']记录 = kinesis.get_records(iterator, 5)打印 [r['Data'] for r in records['Records']]

输出是:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"记录 3"}', u'{"LOG":"记录 4"}']

我再次为 output2 运行它并显示相同的输出.

选项 2:使用 AWS Lambda

如果您要扇出多个流,更有效的方法可能是创建 AWS Lambda 函数:

  • 由 Amazon Kinesis 流记录触发
  • 将记录写入多个 Amazon Kinesis输出"流

您甚至可以让 Lambda 函数根据命名约定自行发现输出流(例如,任何名为 app-output-* 的流).

I'd like to fanout/chain/replicate an Input AWS Kinesis stream To N new Kinesis streams, So that each record written to the input Kinesis will appear in each of the N streams.

Is there an AWS service or an open source solution?

I prefer not to write code to do that if there's a ready-made solution. AWS Kinesis firehose is a no solution because it can't output to kinesis. Perhaps a AWS Lambda solution if that won't be too expensive to run?

解决方案

There are two ways you could accomplish fan-out of an Amazon Kinesis stream:

  • Use Amazon Kinesis Analytics to copy records to additional streams
  • Trigger an AWS Lambda function to copy records to another stream

Option 1: Using Amazon Kinesis Analytics to fan-out

You can use Amazon Kinesis Analytics to generate a new stream from an existing stream.

From the Amazon Kinesis Analytics documentation:

Amazon Kinesis Analytics applications continuously read and process streaming data in real-time. You write application code using SQL to process the incoming streaming data and produce output. Then, Amazon Kinesis Analytics writes the output to a configured destination.

Fan-out is mentioned in the Application Code section:

You can also write SQL queries that run independent of each other. For example, you can write two SQL statements that query the same in-application stream, but send output into different in-applications streams.

I managed to implement this as follows:

  • Created three streams: input, output1, output2
  • Created two Amazon Kinesis Analytics applications: copy1, copy2

The Amazon Kinesis Analytics SQL application looks like this:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(log VARCHAR(16));

CREATE OR REPLACE PUMP "COPY_PUMP1" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "log" FROM "SOURCE_SQL_STREAM_001";

This code creates a pump (think of it as a continual select statement) that selects from the input stream and outputs to the output1 stream. I created another identical application that outputs to the output2 stream.

To test, I sent data to the input stream:

#!/usr/bin/env python

import json, time
from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
i = 0

while True:
  data={}
  data['log'] =  'Record ' + str(i)
  i += 1
  print data
  kinesis.put_record("input", json.dumps(data), "key")
  time.sleep(2)

I let it run for a while, then displayed the output using this code:

from boto import kinesis

kinesis = kinesis.connect_to_region("us-west-2")
iterator = kinesis.get_shard_iterator('output1', 'shardId-000000000000', 'TRIM_HORIZON')['ShardIterator']
records = kinesis.get_records(iterator, 5)
print [r['Data'] for r in records['Records']]

The output was:

[u'{"LOG":"Record 0"}', u'{"LOG":"Record 1"}', u'{"LOG":"Record 2"}', u'{"LOG":"Record 3"}', u'{"LOG":"Record 4"}']

I ran it again for output2 and the identical output was shown.

Option 2: Using AWS Lambda

If you are fanning-out to many streams, a more efficient method might be to create an AWS Lambda function:

  • Triggered by Amazon Kinesis stream records
  • That writes records to multiple Amazon Kinesis 'output' streams

You could even have the Lambda function self-discover the output streams based on a naming convention (eg any stream named app-output-*).

这篇关于如何扇出 AWS kinesis 流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆