dstream 火花流中的切片功能不起作用 [英] slice function in dstream spark streaming not work

查看:76
本文介绍了dstream 火花流中的切片功能不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Spark 流提供滑动窗口功能,用于获取最后 k 的 rdd.但是我想尝试使用 slice 函数来获取最后 k 的 rdd,如果我想在当前时间之前的范围时间内查询 rdd.

Spark streaming providing sliding window function for get rdd for last k. But I want to try use slice function to get rdd for last k, in a case I want to query rdd during range time before current time.

delta = timedelta(seconds=30)
datates = datamap.slice(datetime.now()-delta,datetime.now())

执行代码时出现这个错误

And I get this error when execute the code

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/home/hduser/spark-1.5.0/<ipython-input-1364-f8d325e33d4c> in <module>()
----> 1 datates = datamap.slice(datetime.now()-delta,datetime.now())

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in slice(self, begin, end)
    411         `begin`, `end` could be datetime.datetime() or unix_timestamp
    412         """
--> 413         jrdds = self._jdstream.slice(self._jtime(begin), self._jtime(end))
    414         return [RDD(jrdd, self._sc, self._jrdd_deserializer) for jrdd in jrdds]
    415

/home/hduser/spark-1.5.0/python/pyspark/streaming/dstream.pyc in _jdstream(self)
    629
    630         jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer)
--> 631         dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
    632         self._jdstream_val = dstream.asJavaDStream()
    633         return self._jdstream_val

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    699         answer = self._gateway_client.send_command(command)
    700         return_value = get_return_value(answer, self._gateway_client, None,
--> 701                 self._fqn)
    702
    703         for temp_arg in temp_args:

/home/hduser/spark-1.5.0/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

Py4JJavaError: An error occurred while calling None.org.apache.spark.streaming.api.python.PythonTransformedDStream.
: java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported
        at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224)
        at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
        at org.apache.spark.streaming.api.python.PythonDStream.<init>(PythonDStream.scala:172)
        at org.apache.spark.streaming.api.python.PythonTransformedDStream.<init>(PythonDStream.scala:189)
        at sun.reflect.GeneratedConstructorAccessor80.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
        at py4j.Gateway.invoke(Gateway.java:214)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
        at py4j.GatewayConnection.run(GatewayConnection.java:207)
        at java.lang.Thread.run(Thread.java:745)

如何解决这个错误?谢谢

How to solve this error? Thank you

推荐答案

根据错误信息,

不支持在停止上下文后添加新的输入、转换和输出操作"

"Adding new inputs, transformations, and output operations after stopping a context is not supported"

看起来使用了 ssc.stop() 而不是 ssc.awaitTermination() .请提供有关程序中 Spark Streaming Context (ssc) 设置的更多信息.

it looks like ssc.stop() instead of ssc.awaitTermination() was used. Please provide more information about the Spark Streaming Context (ssc) setup in the program.

这篇关于dstream 火花流中的切片功能不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆