Apache Spark Kinesis 示例不起作用 [英] Apache Spark Kinesis Sample not working

查看:29
本文介绍了Apache Spark Kinesis 示例不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试运行 JavaKinesisWordCountASL 示例.

I am trying to run the JavaKinesisWordCountASL example.

该示例似乎连接到我的 Kinesis Stream 并从该流中获取数据(如下面的日志所示).但是,Sparks 不会调用示例中传递给 unionStreams.flatMap 方法的调用函数,也不会打印任何字数.

The example seem to connect to my Kinesis Stream and gets data from the stream (as shown in the log below). However, Sparks does not invoke the call function passed to the unionStreams.flatMap method in the example and does not prints any wordcount.

我尝试使用 Java 8 和 Java 7 运行.我在 ubuntu 实例上运行它.同样的例子也适用于我的 macbook.

I have tried running using both Java 8 and Java 7. I am running it on an ubuntu instance. The same example works on my macbook.

14/11/15 01:59:42 INFO scheduler.ReceiverTracker:流 1 收到 0 个块14/11/15 01:59:42 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=3512,maxMem=93824483314/11/15 01:59:42 INFO storage.MemoryStore:块 input-0-1416016781800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加 input-0-1416016781800(大小:264.0 B,空闲:894.8 MB)14/11/15 01:59:42 INFO storage.BlockManagerMaster:更新块输入信息-0-141601678180014/11/15 01:59:42 INFO scheduler.JobScheduler:添加时间为 1416016782000 毫秒的作业14/11/15 01:59:42 INFO network.SendingConnection:发起与 [ip-10-80-91-13.ec2.internal/10.80.91.13:39149] 的连接14/11/15 01:59:42 INFO network.SendingConnection:连接到 [ip-10-80-91-13.ec2.internal/10.80.91.13:39149],1 条消息待处理14/11/15 01:59:42 INFO network.ConnectionManager:接受来自 [ip-10-80-91-13.ec2.internal/10.80.91.13:56700] 的连接14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 已经存在于这台机器上;不重新添加它14/11/15 01:59:42 INFO 接收器.BlockGenerator:推块输入-0-141601678180014/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) 调用 curMem=3776,maxMem=93824483314/11/15 01:59:43 INFO storage.MemoryStore:块 input-0-1416016782800 作为值存储在内存中(估计大小 256.0 B,可用 894.8 MB)14/11/15 01:59:43 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加 input-0-1416016782800(大小:256.0 B,空闲:894.8 MB)14/11/15 01:59:43 INFO storage.BlockManagerMaster:更新块输入信息-0-141601678280014/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 已经存在于这台机器上;不重新添加它14/11/15 01:59:43 INFO 接收器.BlockGenerator:推块输入-0-141601678280014/11/15 01:59:44 INFO scheduler.ReceiverTracker:流 0 收到 2 个块14/11/15 01:59:44 INFO scheduler.ReceiverTracker:流 1 收到 0 个块14/11/15 01:59:44 INFO scheduler.JobScheduler:添加时间为 1416016784000 毫秒的作业14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流 0 收到 0 个块14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流 1 收到 0 个块14/11/15 01:59:46 INFO scheduler.JobScheduler:添加时间为 1416016786000 毫秒的作业14/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布了 17 个数据.14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) 调用 curMem=4032, maxMem=93824483314/11/15 01:59:46 INFO storage.MemoryStore:块 input-1-1416016786000 作为值存储在内存中(估计大小 248.0 B,可用 894.8 MB)14/11/15 01:59:46 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加 input-1-1416016786000(大小:248.0 B,免费:894.8 MB)14/11/15 01:59:46 INFO storage.BlockManagerMaster:更新块输入信息-1-141601678600014/11/15 01:59:46 WARN storage.BlockManager: Block input-1-1416016786000 已经存在于这台机器上;不重新添加它14/11/15 01:59:46 INFO 接收器.BlockGenerator:推块输入-1-141601678600014/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布了 14 个数据.14/11/15 01:59:48 INFO scheduler.ReceiverTracker:流 0 收到 0 个块14/11/15 01:59:48 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4280,maxMem=93824483314/11/15 01:59:48 INFO scheduler.ReceiverTracker:流 1 收到 1 个块14/11/15 01:59:48 INFO storage.MemoryStore:块 input-0-1416016787800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加 input-0-1416016787800(大小:264.0 B,空闲:894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerMaster:更新块输入信息-0-141601678780014/11/15 01:59:48 INFO scheduler.JobScheduler:添加时间为 1416016788000 毫秒的作业14/11/15 01:59:48 WARN storage.BlockManager: Block input-0-1416016787800 已经存在于这台机器上;不重新添加它14/11/15 01:59:48 INFO 接收器.BlockGenerator:推块输入-0-141601678780014/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 0 收到 1 个块14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 1 收到 0 个块14/11/15 01:59:50 INFO scheduler.JobScheduler:添加时间为 1416016790000 毫秒的作业14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4544,maxMem=93824483314/11/15 01:59:51 INFO storage.MemoryStore:块 input-0-1416016790800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加 input-0-1416016790800(大小:264.0 B,空闲:894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerMaster:更新块输入信息-0-141601679080014/11/15 01:59:51 WARN storage.BlockManager: Block input-0-1416016790800 已经存在于这台机器上;不重新添加它14/11/15 01:59:51 INFO 接收器.BlockGenerator:推块输入-0-1416016790800

14/11/15 01:59:42 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:42 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=3512, maxMem=938244833 14/11/15 01:59:42 INFO storage.MemoryStore: Block input-0-1416016781800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerInfo: Added input-0-1416016781800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:42 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016781800 14/11/15 01:59:42 INFO scheduler.JobScheduler: Added jobs for time 1416016782000 ms 14/11/15 01:59:42 INFO network.SendingConnection: Initiating connection to [ip-10-80-91-13.ec2.internal/10.80.91.13:39149] 14/11/15 01:59:42 INFO network.SendingConnection: Connected to [ip-10-80-91-13.ec2.internal/10.80.91.13:39149], 1 messages pending 14/11/15 01:59:42 INFO network.ConnectionManager: Accepted connection from [ip-10-80-91-13.ec2.internal/10.80.91.13:56700] 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 already exists on this machine; not re-adding it 14/11/15 01:59:42 INFO receiver.BlockGenerator: Pushed block input-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) called with curMem=3776, maxMem=938244833 14/11/15 01:59:43 INFO storage.MemoryStore: Block input-0-1416016782800 stored as values in memory (estimated size 256.0 B, free 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerInfo: Added input-0-1416016782800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 256.0 B, free: 894.8 MB) 14/11/15 01:59:43 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016782800 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 already exists on this machine; not re-adding it 14/11/15 01:59:43 INFO receiver.BlockGenerator: Pushed block input-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 0 received 2 blocks 14/11/15 01:59:44 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:44 INFO scheduler.JobScheduler: Added jobs for time 1416016784000 ms 14/11/15 01:59:46 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks 14/11/15 01:59:46 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:46 INFO scheduler.JobScheduler: Added jobs for time 1416016786000 ms 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: Successfully published 17 datums. 14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) called with curMem=4032, maxMem=938244833 14/11/15 01:59:46 INFO storage.MemoryStore: Block input-1-1416016786000 stored as values in memory (estimated size 248.0 B, free 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerInfo: Added input-1-1416016786000 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 248.0 B, free: 894.8 MB) 14/11/15 01:59:46 INFO storage.BlockManagerMaster: Updated info of block input-1-1416016786000 14/11/15 01:59:46 WARN storage.BlockManager: Block input-1-1416016786000 already exists on this machine; not re-adding it 14/11/15 01:59:46 INFO receiver.BlockGenerator: Pushed block input-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable: Successfully published 14 datums. 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks 14/11/15 01:59:48 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=4280, maxMem=938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 1 received 1 blocks 14/11/15 01:59:48 INFO storage.MemoryStore: Block input-0-1416016787800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerInfo: Added input-0-1416016787800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:48 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler: Added jobs for time 1416016788000 ms 14/11/15 01:59:48 WARN storage.BlockManager: Block input-0-1416016787800 already exists on this machine; not re-adding it 14/11/15 01:59:48 INFO receiver.BlockGenerator: Pushed block input-0-1416016787800 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 0 received 1 blocks 14/11/15 01:59:50 INFO scheduler.ReceiverTracker: Stream 1 received 0 blocks 14/11/15 01:59:50 INFO scheduler.JobScheduler: Added jobs for time 1416016790000 ms 14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) called with curMem=4544, maxMem=938244833 14/11/15 01:59:51 INFO storage.MemoryStore: Block input-0-1416016790800 stored as values in memory (estimated size 264.0 B, free 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerInfo: Added input-0-1416016790800 in memory on ip-10-80-91-13.ec2.internal:39149 (size: 264.0 B, free: 894.8 MB) 14/11/15 01:59:51 INFO storage.BlockManagerMaster: Updated info of block input-0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: Block input-0-1416016790800 already exists on this machine; not re-adding it 14/11/15 01:59:51 INFO receiver.BlockGenerator: Pushed block input-0-1416016790800

推荐答案

这可能与您拥有多少工作线程有关.当我使用 --master local[2] 运行应用程序时,我遇到了同样的问题.我花了很多时间寻找答案,却一无所获.出于好奇,我改为 --master local[4] 并且它起作用了.我不知道根本原因.也许熟悉 Spark 的人能给我们启发.

This might have something to do with how many worker thread you got. I had the same problem when I ran the app with --master local[2]. I spent numerous hours searching for an answer and found nothing. Just out of curiosity, I changed to --master local[4] and it worked. I do not know the root cause. Maybe somebody more familiar with Spark can enlighten us.

注意:就我而言,我的 Kinesis 流有两个分片.因此,该应用创建了两个输入流,每个分片一个.

Note: in my case, my Kinesis stream had two shards. So the app created two input streams, one for each shard.

这篇关于Apache Spark Kinesis 示例不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆