Java.lang.IllegalArgumentException:要求失败:在Double中找不到列 [英] Java.lang.IllegalArgumentException: requirement failed: Columns not found in Double

查看:385
本文介绍了Java.lang.IllegalArgumentException:要求失败:在Double中找不到列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Spark中工作,我有很多包含行的csv文件,行看起来像这样:

I am working in spark I have many csv files that contain lines, a line looks like that:

2017,16,16,51,1,1,4,-79.6,-101.90,-98.900

它可以包含更多或更少的字段,具体取决于csv文件

It can contain more or less fields, depends on the csv file

每个文件都对应一个cassandra表,我需要在其中插入文件包含的所有行,因此我基本上要做的就是获取该行,拆分其元素并将其放入List [Double]

Each file corresponds to a cassandra table, where I need to insert all the lines the file contains so what I basically do is get the line, split its elements and put them in a List[Double]

sc.stop
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf


val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
val sc = new SparkContext(conf)
val nameTable = "artport"
val ligne = "20171,16,165481,51,1,1,4,-79.6000,-101.7000,-98.9000"
val linetoinsert : List[String] = ligne.split(",").toList
var ainserer : Array[Double] = new Array[Double](linetoinsert.length)
for (l <- 0 to linetoinsert.length)yield {ainserer(l) = linetoinsert(l).toDouble}
val liste = ainserer.toList
val rdd = sc.parallelize(liste)
rdd.saveToCassandra("db", nameTable) //db is the name of my keyspace in cassandra

运行代码时出现此错误

java.lang.IllegalArgumentException: requirement failed: Columns not found in Double: [collecttime, sbnid, enodebid, rackid, shelfid, slotid, channelid, c373910000, c373910001, c373910002]
  at scala.Predef$.require(Predef.scala:224)
  at com.datastax.spark.connector.mapper.DefaultColumnMapper.columnMapForWriting(DefaultColumnMapper.scala:108)
  at com.datastax.spark.connector.writer.MappedToGettableDataConverter$$anon$1.<init>(MappedToGettableDataConverter.scala:37)
  at com.datastax.spark.connector.writer.MappedToGettableDataConverter$.apply(MappedToGettableDataConverter.scala:28)
  at com.datastax.spark.connector.writer.DefaultRowWriter.<init>(DefaultRowWriter.scala:17)
  at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:31)
  at com.datastax.spark.connector.writer.DefaultRowWriter$$anon$1.rowWriter(DefaultRowWriter.scala:29)
  at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:382)
  at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
  ... 60 elided

我发现,如果我的RDD类型为:

I figured out that the insertion works if my RDD was of type :

rdd: org.apache.spark.rdd.RDD[(Double, Double, Double, Double, Double, Double, Double, Double, Double, Double)]

但是我从我的工作中得到的是RDD org.apache.spark.rdd.RDD[Double]

But the one I get from what I am doing is RDD org.apache.spark.rdd.RDD[Double]

例如,我无法使用scala Tuple9,因为我不知道列表在执行前将包含的元素数量,该解决方案也无法解决我的问题,因为有时我的列超过100 csv和tuple停在Tuple22

I can't use scala Tuple9 for example because I don't know the number of elements my list is going to contain before execution, this solution also doesn't fit my problem because sometimes I have more than 100 columns in my csv and tuple stops at Tuple22

感谢您的帮助

推荐答案

正如@SergGr所述,Cassandra表具有包含已知列的架构.因此,在保存到Cassandra数据库之前,需要将Array映射到Cassandra schema.您可以为此使用Case Class.尝试以下代码,我假设Cassandra表中的每一列都是Double类型.

As @SergGr mentioned Cassandra table has a schema with known columns. So you need to map your Array to Cassandra schema before saving to Cassandra database. You can use Case Class for this. Try the following code, I assume each column in Cassandra table is of type Double.

//create a case class equivalent to your Cassandra table
case class Schema(collecttime: Double,
                  sbnid: Double,
                  enodebid: Double,
                  rackid: Double,
                  shelfid: Double,
                  slotid: Double,
                  channelid: Double,
                  c373910000: Double,
                  c373910001: Double,
                  c373910002: Double)
object test {

  import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
    val sc = new SparkContext(conf)
    val nameTable = "artport"
    val ligne = "20171,16,165481,51,1,1,4,-79.6000,-101.7000,-98.9000"
    //parse ligne string Schema case class
    val schema = parseString(ligne)
    //get RDD[Schema]
    val rdd = sc.parallelize(Seq(schema))
    //now you can save this RDD to cassandra
    rdd.saveToCassandra("db", nameTable)
    }


    //function to parse string to Schema case class
    def parseString(s: String): Schema = {
       //get each field from string array
       val Array(collecttime, sbnid, enodebid, rackid, shelfid, slotid,
       channelid, c373910000, c373910001, c373910002, _*) = s.split(",").map(_.toDouble)

       //map those fields to Schema class
       Schema(collecttime,
         sbnid,
         enodebid,
         rackid,
         shelfid,
         slotid,
         channelid,
         c373910000,
         c373910001,
         c373910002)
     }
}

这篇关于Java.lang.IllegalArgumentException:要求失败:在Double中找不到列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆