使用Scala转换PySpark RDD [英] Transforming PySpark RDD with Scala

查看:143
本文介绍了使用Scala转换PySpark RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

TL; DR-我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]发送到Scala库.不过,字符串不是由Py4j转换的.

TL;DR - I have what looks like a DStream of Strings in a PySpark application. I want to send it as a DStream[String] to a Scala library. Strings are not converted by Py4j, though.

我正在研究一个PySpark应用程序,该应用程序使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想用Scala代码调用一个方法,并将其传递给DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串并未转换为Java字符串,而是被序列化了.

I'm working on a PySpark application that pulls data from Kafka using Spark Streaming. My messages are strings and I would like to call a method in Scala code, passing it a DStream[String] instance. However, I'm unable to receive proper JVM strings in the Scala code. It looks to me like the Python strings are not converted into Java strings but, instead, are serialized.

我的问题是:如何从DStream对象中获取Java字符串?

My question would be: how to get Java strings out of the DStream object?

这是我想出的最简单的Python代码:

Here is the simplest Python code I came up with:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()

我正在PySpark中运行此代码,并将其路径传递给我的JAR:

I'm running this code in PySpark, passing it the path to my JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar

在Scala方面,我有:

On the Scala side, I have:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}

现在,假设我将一些数据发送到Kafka:

Now, let's say I send some data into Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN

Scala代码中的println语句显示如下内容:

The println statement in the Scala code prints something that looks like:

[B@758aa4d9

我希望得到foo bar.

现在,如果我将Scala代码中的简单println语句替换为以下内容:

Now, if I replace the simple println statement in the Scala code with the following:

rdd.foreach(v => println(v.getClass.getCanonicalName))

我得到:

java.lang.ClassCastException: [B cannot be cast to java.lang.String

这表明字符串实际上是作为字节数组传递的.

This suggests that the strings are actually passed as arrays of bytes.

如果我只是简单地尝试将此字节数组转换为字符串(我知道我什至没有指定编码):

If I simply try to convert this array of bytes into a string (I know I'm not even specifying the encoding):

      def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
        val dstream = jdstream.dstream
        dstream.foreachRDD(rdd => {
          rdd.foreach(bytes => println(new String(bytes)))
        })
      }

我得到的东西看起来像 (特殊字符可能会被剥掉):

I get something that looks like (special characters might be stripped off):

�]qXfoo barqa.

这表明Python字符串已序列化(已腌制?).我该如何检索适当的Java字符串呢?

This suggests the Python string was serialized (pickled?). How could I retrieve a proper Java string instead?

推荐答案

长话短说,没有受支持的方法可以执行类似的操作.不要在生产中尝试此方法.您已被警告.

Long story short there is no supported way to do something like this. Don't try this in production. You've been warned.

通常,除了驱动程序上的一些基本RPC调用外,Spark不会将Py4j用于其他任何事情,并且不会在任何其他机器上启动Py4j网关.在需要时(主要是MLlib和SQL的某些部分),Spark使用 Pyrolite 来序列化在JVM和Java之间传递的对象. Python.

In general Spark doesn't use Py4j for anything else than some basic RPC calls on the driver and doesn't start Py4j gateway on any other machine. When it is required (mostly MLlib and some parts of SQL) Spark uses Pyrolite to serialize objects passed between JVM and Python.

API的这一部分是私有的(Scala)或内部的(Python),因此不适合常规使用.从理论上讲,您还是可以每批访问它:

This part of the API is either private (Scala) or internal (Python) and as such not intended for general usage. While theoretically you access it anyway either per batch:

package dummy

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.streaming.api.java.JavaDStream
import org.apache.spark.sql.DataFrame

object PythonRDDHelper {
  def go(rdd: JavaRDD[Any]) = {
    rdd.rdd.collect {
      case s: String => s
    }.take(5).foreach(println)
  }
}

完整流:

object PythonDStreamHelper {
  def go(stream: JavaDStream[Any]) = {
    stream.dstream.transform(_.collect {
      case s: String => s
    }).print
  }
}

或将单个批次显示为DataFrames(可能是最不邪恶的选择):

or exposing individual batches as DataFrames (probably the least evil option):

object PythonDataFrameHelper {
  def go(df: DataFrame) = {
    df.show
  }
}

并按以下方式使用这些包装器:

and use these wrappers as follows:

from pyspark.streaming import StreamingContext
from pyspark.mllib.common import _to_java_object_rdd
from pyspark.rdd import RDD

ssc = StreamingContext(spark.sparkContext, 10)
spark.catalog.listTables()

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output)
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd)
))

# Reserialize and convert to JavaDStream<Object>
# This is the only option which allows further transformations
# on DStream
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD(  # Reserialize but keep as Python RDD
        _to_java_object_rdd(rdd), ssc.sparkContext
    ))._jdstream
)

# Convert to DataFrame and pass to Scala sink.
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
        rdd.map(lambda x: (x, )).toDF()._jdf
    )
)

ssc.start()
ssc.awaitTerminationOrTimeout(30)
ssc.stop()

不支持此功能,未经测试,因此除了使用Spark API进行实验外,对其他任何功能都没有用.

this is not supported, untested and as such rather useless for anything else than the experiments with Spark API.

这篇关于使用Scala转换PySpark RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆