无法直接从Spark RDD将数据写入/保存到Ignite [英] Cannot write/save data to Ignite directly from a Spark RDD

查看:71
本文介绍了无法直接从Spark RDD将数据写入/保存到Ignite的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试使用jdbc编写数据帧来点燃,

I try to write dataframe to ignite using jdbc ,

Spark版本为:2.1

The Spark version is : 2.1

点燃版本:2.3

JDK:1.8

斯卡拉:2.11.8

Scala:2.11.8

这是我的代码段:

def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {

  val conn = DataSource.conn
  var psmt:PreparedStatement = null

  try {
    OperationIgniteUtil.deleteIgniteData(conn,targetTable)

    hiveDF.foreachPartition({
      partitionOfRecords => {
        partitionOfRecords.foreach(
          row => for ( i <- 0 until row.length ) {
            psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
            psmt.setObject(i+1, row.get(i))
            psmt.execute()
          }
        )
      }
    })

  }catch {
    case e: Exception =>  e.printStackTrace()
  } finally {
    conn.close
  }
}

然后我在spark上运行,它显示错误消息:

and then I run on spark ,it print erro message:

org.apache.spark.SparkException:任务不可序列化 在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:298) 在org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288)中 在org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108) 在org.apache.spark.SparkContext.clean(SparkContext.scala:2094) 在org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924) 在org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:923) 在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151) 在org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112) 在org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 在org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) 在org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply $ mcV $ sp(Dataset.scala:2305) 在org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305) 在org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305) 在org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57) 在org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) 在org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) 在com.pingan.pilot.ignite.common.OperationIgniteUtil $ .WriteToIgnite(OperationIgniteUtil.scala:72) 在com.pingan.pilot.ignite.etl.HdfsToIgnite $ .main(HdfsToIgnite.scala:36) 在com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)处 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:738) 在org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:187) 在org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212) 在org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126) 在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)处由以下原因引起:java.io.NotSerializableException: org.apache.ignite.internal.jdbc2.JdbcConnection序列化堆栈: -无法序列化的对象(类:org.apache.ignite.internal.jdbc2.JdbcConnection,值: org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) -字段(类:com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1, 名称:conn $ 1,类型:interface java.sql.Connection) -对象(com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1类, ) 在org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40)处 在org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) 在org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) 在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295) ...另外27个

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:924) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply$mcV$sp(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.apply(Dataset.scala:2305) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765) at org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304) at com.pingan.pilot.ignite.common.OperationIgniteUtil$.WriteToIgnite(OperationIgniteUtil.scala:72) at com.pingan.pilot.ignite.etl.HdfsToIgnite$.main(HdfsToIgnite.scala:36) at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.ignite.internal.jdbc2.JdbcConnection Serialization stack: - object not serializable (class: org.apache.ignite.internal.jdbc2.JdbcConnection, value: org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field (class: com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, name: conn$1, type: interface java.sql.Connection) - object (class com.pingan.pilot.ignite.common.OperationIgniteUtil$$anonfun$WriteToIgnite$1, ) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) ... 27 more

有人知道我要解决吗? 谢谢!

Anyone konws I to fix it? Thanks!

推荐答案

此处的问题是您无法序列化与Ignite DataSource.conn的连接.您提供给forEachPartition的闭包包含该连接作为其作用域的一部分,这就是Spark无法对其进行序列化的原因.

The problem here is you cannot serialize the connection to Ignite DataSource.conn. The closure you provide to forEachPartition contains the connection as part of its scope which is why Spark cannot serialize it.

幸运的是,Ignite提供了RDD的自定义实现,允许您将值保存到其中.您首先需要创建一个IgniteContext,然后检索Ignite的共享RDD,该RDD提供对Ignite的分布式访问,以保存您的RDD的Row:

Fortunately, Ignite provides a custom implementation of RDD which allows you to save values to it. You will need to create an IgniteContext first, then retrieve Ignite's shared RDD which provide distributed access to Ignite to save the Row of your RDD:

val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
...

// Retrieve Ignite's shared RDD
val igniteRdd = igniteContext.fromCache("partitioned")
igniteRDD.saveValues(hiveDF.toRDD)

可从 Apache Ignite文档访问更多信息.

这篇关于无法直接从Spark RDD将数据写入/保存到Ignite的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆