Apache Spark Kinesis Integration:已连接,但未收到任何记录 [英] Apache Spark Kinesis Integration: connected, but no records received

查看:53
本文介绍了Apache Spark Kinesis Integration:已连接,但未收到任何记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

tldr; ,因为它不接收数据,所以不能使用Kinesis Spark Streaming集成.

tldr; Can't use Kinesis Spark Streaming integration, because it receives no data.

  1. 已建立测试流,nodejs应用每秒发送1条简单记录.
  2. 在环境中使用docker-compose,AWS凭证设置具有主节点和工作节点(4个核心)的标准Spark 1.5.2集群
  3. spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar 已下载并添加到类路径
  4. 提交了
  5. job.py job.jar (仅读取和打印).
  6. 一切似乎都还可以,但是没有收到任何记录.
  1. Testing stream is set up, nodejs app sends 1 simple record per second.
  2. Standard Spark 1.5.2 cluster is set up with master and worker nodes (4 cores) with docker-compose, AWS credentials in environment
  3. spark-streaming-kinesis-asl-assembly_2.10-1.5.2.jar is downloaded and added to classpath
  4. job.py or job.jar (just reads and prints) submitted.
  5. Everything seems to be okay, but no records what-so-ever are received.

KCL Worker线程有时会说"Sleeping ..."-它可能会被无声地破坏(我检查了我能找到的所有stderr,但没有任何提示).也许吞没了OutOfMemoryError ...但是我对此表示怀疑,因为每秒1条记录的数量.

From time to time the KCL Worker thread says "Sleeping ..." - it might be broken silently (I checked all the stderr I could find, but no hints). Maybe swallowed OutOfMemoryError... but I doubt that, because of the amount of 1 record per second.



    -------------------------------------------
    Time: 1448645109000 ms
    -------------------------------------------

    15/11/27 17:25:09 INFO JobScheduler: Finished job streaming job 1448645109000 ms.0 from job set of time 1448645109000 ms
    15/11/27 17:25:09 INFO KinesisBackedBlockRDD: Removing RDD 102 from persistence list
    15/11/27 17:25:09 INFO JobScheduler: Total delay: 0.002 s for time 1448645109000 ms (execution: 0.001 s)
    15/11/27 17:25:09 INFO BlockManager: Removing RDD 102
    15/11/27 17:25:09 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[102] at createStream at NewClass.java:25 of time 1448645109000 ms
    15/11/27 17:25:09 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645107000 ms)
    15/11/27 17:25:09 INFO InputInfoTracker: remove old batch metadata: 1448645107000 ms
    15/11/27 17:25:10 INFO JobScheduler: Added jobs for time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Starting job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    -------------------------------------------
    Time: 1448645110000 ms
    -------------------------------------------
          <----- Some data expected to show up here!
    15/11/27 17:25:10 INFO JobScheduler: Finished job streaming job 1448645110000 ms.0 from job set of time 1448645110000 ms
    15/11/27 17:25:10 INFO JobScheduler: Total delay: 0.003 s for time 1448645110000 ms (execution: 0.001 s)
    15/11/27 17:25:10 INFO KinesisBackedBlockRDD: Removing RDD 103 from persistence list
    15/11/27 17:25:10 INFO KinesisInputDStream: Removing blocks of RDD KinesisBackedBlockRDD[103] at createStream at NewClass.java:25 of time 1448645110000 ms
    15/11/27 17:25:10 INFO BlockManager: Removing RDD 103
    15/11/27 17:25:10 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1448645108000 ms)
    15/11/27 17:25:10 INFO InputInfoTracker: remove old batch metadata: 1448645108000 ms
    15/11/27 17:25:11 INFO JobScheduler: Added jobs for time 1448645111000 ms
    15/11/27 17:25:11 INFO JobScheduler: Starting job streaming job 1448645111000 ms.0 from job set of time 1448645111000 ms

请让我知道任何提示,我真的很想使用Spark进行实时分析...除了不接收数据的小细节外,其他所有方法都可以:

Please let me know any hints, I'd really like to use Spark for real time analytics... everything but this small detail of not receiving data :) seems to be ok.

PS:我感到奇怪的是,Spark忽略了我的存储级别(内存和磁盘2)和检查点间隔(20,000 ms)的设置

PS: I find strange that somehow Spark ignores my settings of Storage level (mem and disk 2) and Checkpoint interval (20,000 ms)



    15/11/27 17:23:26 INFO KinesisInputDStream: metadataCleanupDelay = -1
    15/11/27 17:23:26 INFO KinesisInputDStream: Slide time = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
    15/11/27 17:23:26 INFO KinesisInputDStream: Checkpoint interval = null
    15/11/27 17:23:26 INFO KinesisInputDStream: Remember duration = 1000 ms
    15/11/27 17:23:26 INFO KinesisInputDStream: Initialized and validated org.apache.spark.streaming.kinesis.KinesisInputDStream@74b21a6

源代码(java):



    public class NewClass {

        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setAppName("appname").setMaster("local[3]");
            JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
            JavaReceiverInputDStream kinesisStream = KinesisUtils.createStream(
                    ssc, "webassist-test", "test", "https://kinesis.us-west-1.amazonaws.com", "us-west-1",
                    InitialPositionInStream.LATEST,
                    new Duration(20000),
                    StorageLevel.MEMORY_AND_DISK_2()
            );
            kinesisStream.print();
            ssc.start();
            ssc.awaitTermination();
        }
    }

Python代码(在尝试将pprinting并发送到MongoDB之前都尝试过):

Python code (tried both pprinting before and sending to MongoDB):



    from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
    from pyspark import SparkContext, StorageLevel
    from pyspark.streaming import StreamingContext
    from sys import argv

    sc = SparkContext(appName="webassist-test")
    ssc = StreamingContext(sc, 5)

    stream = KinesisUtils.createStream(ssc,
         "appname",
         "test",
         "https://kinesis.us-west-1.amazonaws.com",
         "us-west-1",
         InitialPositionInStream.LATEST,
         5,
         StorageLevel.MEMORY_AND_DISK_2)

    stream.pprint()
    ssc.start()
    ssc.awaitTermination()

注意:我还尝试使用 stream.foreachRDD(lambda rdd:rdd.foreachPartition(send_partition))将数据发送到MongoDB,但是没有将其粘贴到此处,因为您需要一个MongoDB实例并且它是与问题无关-输入中没有记录.

Note: I also tried sending data to MongoDB with stream.foreachRDD(lambda rdd: rdd.foreachPartition(send_partition)) but not pasting it here, since you'd need a MongoDB instance and it's not related to the problem - no records come in on the input already.

还有一件事-KCL永远不会提交.相应的DynamoDB如下所示:

One more thing - the KCL never commits. The corresponding DynamoDB looks like this:


leaseKey  checkpoint  leaseCounter  leaseOwner  ownerSwitchesSinceCheckpoint
shardId-000000000000  LATEST  614  localhost:d92516...  8

用于提交的命令:

spark-submit --executor-memory 1024m --master spark://IpAddress:7077 /path/test.py

在MasterUI中,我可以看到:

In the MasterUI I can see:

 Input Rate
   Receivers: 1 / 1 active
   Avg: 0.00 events/sec
 KinesisReceiver-0
   Avg: 0.00 events/sec
...
 Completed Batches (last 76 out of 76)

感谢您的帮助!

推荐答案

在与Kinesis连接时,过去在Spark Streaming中没有显示记录活动的问题.

I've had issues with no record activity being shown in Spark Streaming in the past when connecting with Kinesis.

我会尝试这些方法以获取更多反馈/与Spark不同的行为:

I'd try these things to get more feedback/a different behaviour from Spark:

  1. 确保使用输出操作(例如 foreachRDD print )强制对 DStream 转换操作进行评估另存为 ...

  1. Make sure that you force the evaluation of your DStream transformation operations with output operations like foreachRDD, print, saveas...

在DynamoDB中创建流或清除现有流时,使用新名称作为"Kinesis应用程序名称"参数来在DynamoDB中创建新的KCL应用程序.

Create a new KCL Application in DynamoDB using a new name for the "Kinesis app name" parameter when creating the stream or purge the existing one.

在创建流时,在TRIM_HORIZON和LATEST之间切换以获取初始位置.

Switch between TRIM_HORIZON and LATEST for initial position when creating the stream.

尝试这些更改时重新启动上下文.

Restart the context when you try these changes.

添加代码后的

也许我缺少明显的东西,但是我无法发现您的源代码有任何问题.您是否有n + 1个cpus运行此应用程序(n是Kinesis分片的数量)?

EDIT after code was added: Perhaps I'm missing something obvious, but I cannot spot anything wrong with your source code. Do you have n+1 cpus running this application (n is the number of Kinesis shards)?

如果您运行一个从Docker实例中的分片读取的KCL应用程序(Java/Python/...),它可以工作吗?也许您的网络配置有问题,但是我希望指出一些错误消息.

If you run a KCL application (Java/Python/...) reading from the shards in your docker instance, does it work? Perhaps there's something wrong with your network configuration, but I'd expect some error messages pointing it out.

如果这很重要/有一点时间,您可以在docker实例中快速实现kcl阅读器,并将其与Spark应用程序进行比较.一些网址:

If this is important enough / you have a bit of time, you can quickly implement kcl reader in your docker instance and will allow you to compare with your Spark Application. Some urls:

Python

Java

Python示例

另一种选择是在另一个集群中运行您的Spark Streaming应用程序并进行比较.

Another option is to run your Spark Streaming application in a different cluster and to compare.

P.S .:我目前在不同集群中使用带有Kinesis的Spark Streaming 1.5.2,它按预期处理记录/显示活动.

P.S.: I'm currently using Spark Streaming 1.5.2 with Kinesis in different clusters and it processes records / shows activity as expected.

这篇关于Apache Spark Kinesis Integration:已连接,但未收到任何记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆