合并档案 [英] Combining files

查看:60
本文介绍了合并档案的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是scala的新手.我有两个RDD,需要将训练和测试数据分开.在一个文件中,我具有所有数据,而在另一个文件中,仅具有测试数据.我需要从完整的数据集中删除测试数据.

I am new to scala. I have two RDD's and I need to separate out my training and testing data. In one file I have all the data and in another just the testing data. I need to remove the testing data from my complete data set.

完整的数据文件的格式为(用户ID,MovID,评分,时间戳):

The complete data file is of the format(userID,MovID,Rating,Timestamp):

res8: Array[String] = Array(1, 31, 2.5, 1260759144)

测试数据文件的格式为(userID,MovID):

The test data file is of the format(userID,MovID):

res10: Array[String] = Array(1, 1172)

如何生成不会使caes与测试数据集匹配的rating_train我正在使用以下功能,但返回的列表显示为空:

How do I generate ratings_train that will not have the caes matched with the testing dataset I am using the following function but the returned list is showing empty:

  def create_training(data: RDD[String], ratings_test: RDD[String]): ListBuffer[Array[String]] = {
val ratings_split = dropheader(data).map(line => line.split(","))
val ratings_testing = dropheader(ratings_test).map(line => line.split(",")).collect()
var ratings_train = new ListBuffer[Array[String]]()
ratings_split.foreach(x => {
  ratings_testing.foreach(y => {
    if (x(0) != y(0) || x(1) != y(1)) {
      ratings_train += x
    }
  })
})
return ratings_train

}

更改了代码,但遇到内存问题.

changed code but running into memory issues.

推荐答案

这可能有效.

def create_training(data: RDD[String], ratings_test: RDD[String]): Array[Array[String]] = {
  val ratings_split = dropheader(data).map(line => line.split(","))
  val ratings_testing = dropheader(ratings_test).map(line => line.split(","))

  ratings_split.filter(x => {
    ratings_testing.exists(y =>
      (x(0) == y(0) && x(1) == y(1))
    ) == false
  })
}

  1. 您发布的代码段在逻辑上不正确.如果测试数据中没有一行,则该行将仅是最终数据的一部分.但是,如果代码中与任何测试数据都不匹配,则在代码中选择了该行.但是我们应该检查它是否与所有测试数据不匹配,然后只有我们才能确定它是否为有效行.
  2. 您正在使用RDD,但现在正在探索它们的全部功能.我猜您正在从csv文件读取输入.然后,您可以在RDD中构造数据,而无需基于逗号将字符串吐出并将其手动处理为ROW.您可以看一下spark的DataFrame API.这些链接可能有帮助: https://www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes
  1. The code snippets you posted are not logically correct. A row will only be part of the final data if it has no presence in the test data. But in the code you picked the row if it does not match with any of the test data. But we should check whether it does not match with all of the test data and then only we can decide whether it is a valid row or not.
  2. You are using RDD, but now exploring the full power of them. I guess you are reading the input from a csv file. Then you can structure your data in the RDD, no need to spit the string based on comma character and manually processing them as ROW. You can take a look at the DataFrame API of spark. These links may help: https://www.tutorialspoint.com/spark_sql/spark_sql_dataframes.htm , http://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes

使用正则表达式:

  def main(args: Array[String]): Unit = {
    // creating test data set
    val data = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID, Rating, Timestamp",
      "1, 31, 2.5, 1260759144",
      "2, 31, 2.5, 1260759144"))

    val ratings_test = spark.sparkContext.parallelize(Seq(
      //      "userID, MovID",
      "1, 31",
      "2, 30",
      "30, 2"
    ))

    val result = getData(data, ratings_test).collect()
    // the result will only contain "2, 31, 2.5, 1260759144"
  }

  def getData(data: RDD[String], ratings_test: RDD[String]): RDD[String] = {
    val ratings = dropheader(data)
    val ratings_testing = dropheader(ratings_test)

    // Broadcasting the test rating data to all spark nodes, since we are collecting this before hand.
    // The reason we are collecting the test data is to avoid call collect in the filter logic
    val ratings_testing_bc = spark.sparkContext.broadcast(ratings_testing.collect.toSet)

    ratings.filter(rating => {
      ratings_testing_bc.value.exists(testRating => regexMatch(rating, testRating)) == false
    })
  }

  def regexMatch(data: String, testData: String): Boolean = {
    // Regular expression to find first two columns
    val regex = """^([^,]*), ([^,\r\n]*),?""".r

    val (dataCol1, dataCol2) = regex findFirstIn data match {
      case Some(regex(col1, col2)) => (col1, col2)
    }

    val (testDataCol1, testDataCol2) = regex findFirstIn testData match {
      case Some(regex(col1, col2)) => (col1, col2)
    }

    (dataCol1 == testDataCol1) && (dataCol2 == testDataCol2)
  }

这篇关于合并档案的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆