spark job(scala)将类型Date写入Cassandra [英] spark job (scala) write type Date to Cassandra

查看:125
本文介绍了spark job(scala)将类型Date写入Cassandra的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用DSE 5.1(spark 2.0.2.6和cassandra 3.10.0.1652)



我的Cassandra表:

 创建表ks.tbl(
dk int,
date date,
ck int,
val int,
主键(dk,date,ck)
)带有排序顺序的BY(日期DESC,ck ASC);

具有以下数据:

  dk |日期| ck | val 
---- + ------------ + ---- + -----
1 | 2017-01-01 | 1 | 100
1 | 2017-01-01 | 2 | 200

我的代码必须读取此数据并写入相同的内容,但要加上昨天的日期(编译成功) :

  package com.datastax.spark.example 

导入com.datastax.spark.connector。 _
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark。{SparkConf,SparkContext}
导入com.github.nscala_time.time._
导入com.github.nscala_time.time.Imports._

对象测试扩展了App {

val conf = new SparkConf()。setAppName( DSE演算应用程序测试)
val sc = new SparkContext(conf)

val昨天=(DateTime.now-1.days)。 toString(StaticDateTimeFormat.forPattern( yyyy-MM-dd))

val tbl = sc.cassandraTable( ks, tbl)。select( dk, date, ck, val)。where( dk = 1)

tbl.map(row =>(row.getInt( dk),昨天,row.getInt( ck ),row.getInt( val)))。saveToCassandra( ks, tbl)

sc.stop()
sys.exit(0)
}

当我运行此应用程序时:

  dse spark-submit --class com.datastax.spark.example .test test-assembly-0.1.jar 

无法正确写入Cassandra。似乎日期变量未正确插入地图中。
我得到的错误是:

 错误:
WARN 2017-05-08 22:23: 16,472 org.apache.spark.scheduler.TaskSetManager:在阶段0.0中丢失任务0.0(TID 0,<我的节点之一的IP>):java.io.IOException:无法将语句写入ks.tbl。
在com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:207)
在com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:175)
在com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:112)
在com。 datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
在com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
在com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
在com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
在com.datastax.spark.connector.writer.TableWriter.write(TableWrit er.scala:149)com.datastax.spark.connector.RDDFunctions
在com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)
在com.datastax.spark.connector.RDDFunctions $ $ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
在org.apache.spark.scheduler。 org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)的Task.run(Task.scala:86)
(java.util.concurrent.ThreadPoolExecutor.runWorker( ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:748)

但是,当我按如下所示直接在map语句中插入日期(字符串)时,代码确实插入了数据正确:

  tbl.map(row => (row.getInt( dk), 2017-02-02,row.getInt( ck),row.getInt( val)))。saveToCassandra( ks, tbl)

如果我昨天将其设置为整数(自历元以来的天数),它也可以正确插入数据。这将是最佳选择,但不能昨天以这种方式运行



编辑:实际上,这不能正确插入数据。无论我将昨天设置为1还是1亿,它总是插入epoch('1970-01-01)



失败的代码可以正常运行,并且与我期望的一样在DSE Spark控制台中。



我只是不知道自己在做什么错。欢迎任何帮助。



编辑2:执行程序0标准错误日志确实表明它试图在列日期中插入Null值,但是由于它是聚类列,因此这显然是不可能的。

解决方案

为Spark Job编写代码时,重要的是要意识到何时设置了特定变量以及何时对其进行了序列化。让我们看一下 App 特性文档


注意事项 p>

应注意的是,此特征是使用
DelayedInit功能实现的,这意味着对象的字段
不会在主函数之前被初始化方法已执行。


这意味着引用 App



我的猜测是,您编写的lambda包含对val的引用,该val在App类的Delayed init部分中初始化。这意味着未运行 Main 方法的执行程序上的代码的序列化版本将获得值的未初始​​化版本(空)。



将常量切换为 lazy val (或将其移动到单独的对象或类中)将通过确保值已初始化来解决此问题远程(惰性val)或简单地序列化初始化(单独的类/对象)。


I'm using DSE 5.1 (spark 2.0.2.6 and cassandra 3.10.0.1652)

My Cassandra table:

CREATE TABLE ks.tbl (
   dk int,
   date date,
   ck int,
   val int,
PRIMARY KEY (dk, date, ck)
) WITH CLUSTERING ORDER BY (date DESC, ck ASC);

with the following data:

 dk | date       | ck | val
----+------------+----+-----
  1 | 2017-01-01 |  1 | 100
  1 | 2017-01-01 |  2 | 200

My code must read this data and write the same thing but with yesterday's date (it compiles successfully):

package com.datastax.spark.example

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
import com.github.nscala_time.time._
import com.github.nscala_time.time.Imports._

object test extends App {

  val conf = new SparkConf().setAppName("DSE calculus app TEST")
  val sc = new SparkContext(conf)

  val yesterday= (DateTime.now - 1.days).toString(StaticDateTimeFormat.forPattern("yyyy-MM-dd"))

  val tbl = sc.cassandraTable("ks","tbl").select("dk","date","ck","val").where("dk=1")

  tbl.map(row => (row.getInt("dk"),yesterday,row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

  sc.stop()
  sys.exit(0)
}

When I run this app:

dse spark-submit --class com.datastax.spark.example.test test-assembly-0.1.jar

It fails to properly write to Cassandra. It seems the date variable is not inserted in the map correctly. The error I get is:

Error:
WARN  2017-05-08 22:23:16,472 org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, <IP of one of my nodes>): java.io.IOException: Failed to write statements to ks.tbl.
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:207)
        at com.datastax.spark.connector.writer.TableWriter$$anonfun$writeInternal$1.apply(TableWriter.scala:175)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
        at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
        at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
        at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
        at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:175)
        at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:162)
        at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:149)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:36)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:86)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)

However, when I insert a date (string) directly in the map statement as follows, the code does insert the data correctly:

tbl.map(row => (row.getInt("dk"),"2017-02-02",row.getInt("ck"),row.getInt("val"))).saveToCassandra("ks","tbl")

It also insert the data correctly if I set yesterday to an integer (days since epoch). This will be optimal, but can't get 'yesterday' to behave this way

EDIT: This does not insert data correctly, actually. No matter if I set 'yesterday' to 1 or 100,000,000 it always insert epoch ('1970-01-01)

The code that fails behave correctly and as I would expect in the DSE Spark console.

I just can't figure out how what I'm doing wrong. Any help is welcome.

EDIT2: The excecutor 0 stderr log does show that its trying to insert a Null value in the column date, that's obviously not possible since its a clustering column.

解决方案

When writing code for a Spark Job it's important to realize when particular variables are set and when they are serialized. Let's take a look at a note from the App trait docs

Caveats

It should be noted that this trait is implemented using the DelayedInit functionality, which means that fields of the object will not have been initialized before the main method has been executed.

This means references to the variables used in the body of the App are possibly not initialized on the Executors when the code is actually being run.

My guess is that the lambda you have written contains a reference to a val which is initialized in the Delayed init portion of the App class. This means the serialized version of the code on the executor which doesn't run the Main method gets the uninitialized version of the value (null).

Switching the constant to a lazy val (or moving it into a separate object or class) would fix this issue by making sure the value is initialized remotely (lazy val) or simply serialized initialized (separate class/object).

这篇关于spark job(scala)将类型Date写入Cassandra的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆