提取特定列以形成文本文件以在Scala中制作数据框 [英] Extract specific columns form a text file to make a dataframe in scala

查看:89
本文介绍了提取特定列以形成文本文件以在Scala中制作数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要清理scala中的一些数据。我有以下原始数据,它们存在于文本文件中:

I need to clean some data in scala. I have the following raw data and they are exist in a text file:

06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160

我需要通过以下方式将所有这些文件放入数据框:

I need to have all of them in a dataframe in the following way:

+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0    |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1   |10.0.0.2  |5001   |41516   |346     |163     |  0  |
|06:36:15.718078 |10.0.0.2   |10.0.0.1  |41516  |5001    |  0     | 58     |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+

我使用以下代码来完成上述操作。

I used the following code to do that to gain the above dataframe.

  val customSchema = StructType(Array(
      StructField("time_stamp_0", StringType, true),
      StructField("sender_ip_1", StringType, true),
      StructField("receiver_ip_2", StringType, true),
      StructField("s_port_3", StringType, true),
      StructField("r_port_4", StringType, true),
      StructField("acknum_5", StringType, true),
      StructField("winnum_6", StringType, true),
      StructField("len_7", IntegerType, true)))

    ///////////////////////////////////////////////////make train dataframe
    val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
    val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
      val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
      val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
      val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
      val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
      val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
      val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
      val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
      val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""

      val firstFixed = first.take(first.lastIndexOf("."))
      val secondfix = second.take(second.lastIndexOf("."))
      val thirdFixed = third.take(third.lastIndexOf("."))
      Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
    })
    val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)

但是问题是从需要的列中没有提取任何东西!能否请您指导我为什么将第三列提取为空?谢谢

But the problem is that from the thired column nothing extracted! Can you please guid me why the third column is extracted empty? Thanks

推荐答案

您的输入数据不是固定长度的,因此获取所需的解决方案有些棘手。考虑到您所提供的输入数据可以作为解决方案。您可以根据需要的增加进行更改

Your input data is not of fixed length so it was a bit tricky to get the solution you require. Considering the input data your provided following can be solution. You can change as your need increases

val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {

  val array1 = array(0).trim.split("IP")
  val array2 = array1(1).split(">")
  val array3 = array2(1).split(":")

  val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
  val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
  val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""

  val first = Try(array1(0).trim) getOrElse ""
  val second = Try(array2(0).trim) getOrElse ""
  val third = Try(array3(0)) getOrElse ""
  val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
  val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
  val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0

  val secondfix = second.take(second.lastIndexOf("."))
  val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
  val thirdFixed = third.take(third.lastIndexOf("."))
  val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)

  Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)

您将获得输出

+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0   |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1   | 10.0.0.2    |5001    |41516   |346     |163     |0    |
|06:36:15.718078|10.0.0.2   | 10.0.0.1    |41516   |5001    |0       |58      |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+

我希望解决方案对您有帮助

I hope the solution is helpful

这篇关于提取特定列以形成文本文件以在Scala中制作数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆