来自 Kafka 的 Spark Streaming 有错误 numRecords 不能为负 [英] Spark Streaming from Kafka has error numRecords must not be negative

查看:71
本文介绍了来自 Kafka 的 Spark Streaming 有错误 numRecords 不能为负的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一种奇怪的错误,因为我仍然将数据推送到 kafka 并使用来自 kafka 的消息和 线程main"中的异常;java.lang.IllegalArgumentException:要求失败:numRecords 不能为负也有点奇怪.我搜索并没有得到任何相关的资源.

Its kind of strange error because I still push data to kafka and consume message from kafka and Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative is kind of strange too. I search and don't get any resource related to.

让我解释一下我的集群.我有 1 个服务器是主服务器,代理运行 mesos,我在上面设置了 3 个 kafka 代理.然后我在该集群上运行 spark-job.我正在使用 spark 1.5.2

Let me explain my cluster. I have 1 server is master and agents run mesos, on that I set up 3 brokers of kafka like that. Then I run spark-job on that cluster. I am using spark 1.5.2

brokers:
  id: 0
  active: true
  state: running
  resources: cpus:1.00, mem:1024, heap:512, port:31000
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:test-master
  task:
    id: broker-0-c32082d0-a544-4260-b7c4-0239d99f0972
    state: running
    endpoint: test-master:31000
  metrics:
    collected: 2016-01-25 17:46:47+08
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 1

  id: 1
  active: true
  state: running
  resources: cpus:1.00, mem:1024, heap:512, port:31001
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:test-master
  task:
    id: broker-1-7b30d6ad-6b19-4420-b743-c6f7f1adfb07
    state: running
    endpoint: test-master:31001
  metrics:
    collected: 2016-01-25 17:46:31+08
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 0

  id: 2
  active: true
  state: running
  resources: cpus:1.00, mem:1024, heap:512, port:31002
  failover: delay:1m, max-delay:10m
  stickiness: period:10m, hostname:test-master
  task:
    id: broker-2-8ef6437b-79b2-4183-8653-17cf2fe4591f
    state: running
    endpoint: test-master:31002
  metrics:
    collected: 2016-01-25 17:46:38+08
    under-replicated-partitions: 0
    offline-partitions-count: 0
    is-active-controller: 0

然后我运行 spark-streaming 作业从 kafka 获取数据然后解析.

Then I run spark-streaming job get data from kafka then parsing.

我检查经纪人正在使用

kafkacat -b test-master:31001,test-master:31000,test-master:31002 -t bid_event

它得到了数据,但是当我运行 spark-job 时出现错误

It got data but when I run spark-job I get error

6/01/25 17:44:52 INFO SparkContext: Running Spark version 1.5.2
16/01/25 17:44:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/01/25 17:44:52 INFO SecurityManager: Changing view acls to: ubuntu
16/01/25 17:44:52 INFO SecurityManager: Changing modify acls to: ubuntu
16/01/25 17:44:52 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ubuntu); users with modify permissions: Set(ubuntu)
16/01/25 17:44:53 INFO Slf4jLogger: Slf4jLogger started
16/01/25 17:44:53 INFO Remoting: Starting remoting
16/01/25 17:44:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.xxx.xxx.25:51816]
16/01/25 17:44:53 INFO Utils: Successfully started service 'sparkDriver' on port 51816.
16/01/25 17:44:53 INFO SparkEnv: Registering MapOutputTracker
16/01/25 17:44:53 INFO SparkEnv: Registering BlockManagerMaster
16/01/25 17:44:53 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-97e9787b-3a67-4d00-aff6-a5e02b271a74
16/01/25 17:44:53 INFO MemoryStore: MemoryStore started with capacity 441.9 MB
16/01/25 17:44:53 INFO HttpFileServer: HTTP File server directory is /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd/httpd-4724a937-4d03-4bd0-99f1-7b9f1129291e
16/01/25 17:44:53 INFO HttpServer: Starting HTTP Server
16/01/25 17:44:53 INFO Utils: Successfully started service 'HTTP file server' on port 51817.
16/01/25 17:44:53 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/25 17:44:53 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/25 17:44:53 INFO SparkUI: Started SparkUI at http://10.xxx.xxx.25:4040
16/01/25 17:44:54 INFO SparkContext: Added JAR file:/home/ubuntu/spark-jobs/./rtb_spark-assembly-1.0-deps.jar at http://10.xxx.xxx.25:51817/jars/rtb_spark-assembly-1.0-deps.jar with timestamp 1453715094219
16/01/25 17:44:54 INFO SparkContext: Added JAR file:/home/ubuntu/spark-jobs/./rtb-spark.jar at http://10.xxx.xxx.25:51817/jars/rtb-spark.jar with timestamp 1453715094222
16/01/25 17:44:54 INFO Utils: Copying /home/ubuntu/spark-jobs/./test.conf to /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd/userFiles-cdef27e0-c357-4ebb-adcf-ccf963ff9d60/test.conf
16/01/25 17:44:54 INFO SparkContext: Added file file:/home/ubuntu/spark-jobs/./test.conf at http://10.xxx.xxx.25:51817/files/test.conf with timestamp 1453715094309
16/01/25 17:44:54 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@712: Client environment:zookeeper.version=zookeeper C client 3.4.5
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@716: Client environment:host.name=knx-rtb-server-google-test
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@723: Client environment:os.name=Linux
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@724: Client environment:os.arch=3.13.0-76-generic
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@725: Client environment:os.version=#120~precise1-Ubuntu SMP Tue Jan 19 11:09:43 UTC 2016
I0125 17:44:54.444169  5444 sched.cpp:166] Version: 0.26.0
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@733: Client environment:user.name=ubuntu
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@741: Client environment:user.home=/home/ubuntu
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@log_env@753: Client environment:user.dir=/home/ubuntu/spark-jobs
2016-01-25 17:44:54,444:5202(0x7f2d7604f700):ZOO_INFO@zookeeper_init@786: Initiating client connection, host=test-master:2181 sessionTimeout=10000 watcher=0x7f2ded821210 sessionId=0 sessionPasswd=<null> context=0x7f2d54001470 flags=0
2016-01-25 17:44:54,444:5202(0x7f2d6e6fb700):ZOO_INFO@check_events@1703: initiated connection to server [10.xxx.xxx.25:2181]
2016-01-25 17:44:54,446:5202(0x7f2d6e6fb700):ZOO_INFO@check_events@1750: session establishment complete on server [10.xxx.xxx.25:2181], sessionId=0x15278112832012c, negotiated timeout=10000
I0125 17:44:54.447082  5439 group.cpp:331] Group process (group(1)@10.xxx.xxx.25:28249) connected to ZooKeeper
I0125 17:44:54.447120  5439 group.cpp:805] Syncing group operations: queue size (joins, cancels, datas) = (0, 0, 0)
I0125 17:44:54.447140  5439 group.cpp:403] Trying to create path '/mesos' in ZooKeeper
I0125 17:44:54.448109  5439 detector.cpp:156] Detected a new leader: (id='28')
I0125 17:44:54.448246  5439 group.cpp:674] Trying to get '/mesos/json.info_0000000028' in ZooKeeper
I0125 17:44:54.448755  5440 detector.cpp:482] A new leading master (UPID=master@10.xxx.xxx.25:5050) is detected
I0125 17:44:54.448832  5440 sched.cpp:264] New master detected at master@10.xxx.xxx.25:5050
I0125 17:44:54.448977  5440 sched.cpp:274] No credentials provided. Attempting to register without authentication
I0125 17:44:54.449766  5440 sched.cpp:643] Framework registered with a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003
16/01/25 17:44:54 INFO MesosSchedulerBackend: Registered as framework ID a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003
16/01/25 17:44:54 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51820.
16/01/25 17:44:54 INFO NettyBlockTransferService: Server created on 51820
16/01/25 17:44:54 INFO BlockManagerMaster: Trying to register BlockManager
16/01/25 17:44:54 INFO BlockManagerMasterEndpoint: Registering block manager 10.xxx.xxx.25:51820 with 441.9 MB RAM, BlockManagerId(driver, 10.xxx.xxx.25, 51820)
16/01/25 17:44:54 INFO BlockManagerMaster: Registered BlockManager
16/01/25 17:44:55 INFO ForEachDStream: metadataCleanupDelay = -1
16/01/25 17:44:55 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Slide time = 30000 ms
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Remember duration = 30000 ms
16/01/25 17:44:55 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@270235cb
16/01/25 17:44:55 INFO ForEachDStream: Slide time = 30000 ms
16/01/25 17:44:55 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/01/25 17:44:55 INFO ForEachDStream: Checkpoint interval = null
16/01/25 17:44:55 INFO ForEachDStream: Remember duration = 30000 ms
16/01/25 17:44:55 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@219b66f
16/01/25 17:44:55 INFO RecurringTimer: Started timer for JobGenerator at time 1453715100000
16/01/25 17:44:55 INFO JobGenerator: Started JobGenerator at 1453715100000 ms
16/01/25 17:44:55 INFO JobScheduler: Started JobScheduler
16/01/25 17:44:55 INFO StreamingContext: StreamingContext started
16/01/25 17:45:00 INFO VerifiableProperties: Verifying properties
16/01/25 17:45:00 INFO VerifiableProperties: Property auto.commit.interval.ms is overridden to 1000
16/01/25 17:45:00 INFO VerifiableProperties: Property auto.offset.reset is overridden to smallest
16/01/25 17:45:00 INFO VerifiableProperties: Property group.id is overridden to bid_event_consumer_group_zk
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.connect is overridden to test-master:2181
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.session.timeout.ms is overridden to 400
16/01/25 17:45:00 INFO VerifiableProperties: Property zookeeper.sync.time.ms is overridden to 200
16/01/25 17:45:00 ERROR JobScheduler: Error generating jobs for time 1453715100000 ms
java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: numRecords must not be negative
    at scala.Predef$.require(Predef.scala:233)
    at org.apache.spark.streaming.scheduler.StreamInputInfo.<init>(InputInfoTracker.scala:38)
    at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:165)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
    at scala.Option.orElse(Option.scala:257)
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:247)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$2.apply(JobGenerator.scala:245)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:245)
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/01/25 17:45:00 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/01/25 17:45:00 INFO JobGenerator: Stopping JobGenerator immediately
16/01/25 17:45:00 INFO RecurringTimer: Stopped timer for JobGenerator after time 1453715100000
16/01/25 17:45:00 INFO JobGenerator: Stopped JobGenerator
16/01/25 17:45:00 INFO JobScheduler: Stopped JobScheduler
16/01/25 17:45:00 INFO StreamingContext: StreamingContext stopped successfully
16/01/25 17:45:00 INFO SparkContext: Invoking stop() from shutdown hook
16/01/25 17:45:00 INFO SparkUI: Stopped Spark web UI at http://10.xxx.xxx.25:4040
16/01/25 17:45:00 INFO DAGScheduler: Stopping DAGScheduler
I0125 17:45:00.281819  5579 sched.cpp:1805] Asked to stop the driver
I0125 17:45:00.281951  5437 sched.cpp:1043] Stopping framework 'a636c17f-2b0d-46f7-9b15-5a3d6e9918a4-0003'
16/01/25 17:45:00 INFO MesosSchedulerBackend: driver.run() returned with code DRIVER_STOPPED
16/01/25 17:45:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/25 17:45:00 INFO MemoryStore: MemoryStore cleared
16/01/25 17:45:00 INFO BlockManager: BlockManager stopped
16/01/25 17:45:00 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/25 17:45:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/25 17:45:00 INFO SparkContext: Successfully stopped SparkContext
16/01/25 17:45:00 INFO ShutdownHookManager: Shutdown hook called
16/01/25 17:45:00 INFO ShutdownHookManager: Deleting directory /tmp/spark-442ac339-2ac7-427f-9e6b-5b5cb18a54dd

推荐答案

检查您的 Kafka 主题偏移量.您在代码中提供的值可能超出范围.

Check your Kafka topic offset. The one you have given in the code might be out of range.

即它可能小于最早的偏移量或大于最新的偏移量.

i.e it could be less than earliest Offset or greater than the latest Offset.

这篇关于来自 Kafka 的 Spark Streaming 有错误 numRecords 不能为负的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆