提取特定列以形成文本文件以在Scala中制作数据框 [英] Extract specific columns form a text file to make a dataframe in scala
问题描述
我需要清理scala中的一些数据。我有以下原始数据,它们存在于文本文件中:
I need to clean some data in scala. I have the following raw data and they are exist in a text file:
06:36:15.718068 IP 10.0.0.1.5001 > 10.0.0.2.41516: Flags [.], ack 346, win 163, options [nop,nop,TS val 1654418 ecr 1654418], length 0
06:36:15.718078 IP 10.0.0.2.41516 > 10.0.0.1.5001: Flags [.], seq 1:65161, ack 0, win 58, options [nop,nop,TS val 1654418 ecr 1654418], length 65160
我需要通过以下方式将所有这些文件放入数据框:
I need to have all of them in a dataframe in the following way:
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|reciver_2 |s_por_3|r_por_4 |acknu_5 |winnum_6|len_7|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
|06:36:15.718068 |10.0.0.1 |10.0.0.2 |5001 |41516 |346 |163 | 0 |
|06:36:15.718078 |10.0.0.2 |10.0.0.1 |41516 |5001 | 0 | 58 |65160|
+----------------+-----------+----------+-------+--------+--------+--------+-----+
我使用以下代码来完成上述操作。
I used the following code to do that to gain the above dataframe.
val customSchema = StructType(Array(
StructField("time_stamp_0", StringType, true),
StructField("sender_ip_1", StringType, true),
StructField("receiver_ip_2", StringType, true),
StructField("s_port_3", StringType, true),
StructField("r_port_4", StringType, true),
StructField("acknum_5", StringType, true),
StructField("winnum_6", StringType, true),
StructField("len_7", IntegerType, true)))
///////////////////////////////////////////////////make train dataframe
val Dstream_Train = sc.textFile("/Users/xxxxxx/Desktop/xxxxx/Test/trace8.txt")
val Row_Dstream_Train = Dstream_Train.map(line => line.split(">")).map(array => {
val first = Try(array(0).trim.split(" ")(0)) getOrElse ""
val second = Try(array(1).trim.split(" ")(0)) getOrElse ""
val third = Try(array(2).trim.split(" ")(0).replace(":", "")) getOrElse ""
val fourth = Try(array(3).trim.split(" ")(0)) getOrElse ""
val fifth = Try(array(4).trim.split(" ") (0)) getOrElse ""
val sixth = Try(array(5).trim.split(" ") (0)) getOrElse ""
val seventh = Try(array(6).trim.split(" ")(0)) getOrElse ""
val eighth = Try(array(7).trim.split(" ")(0)) getOrElse ""
val firstFixed = first.take(first.lastIndexOf("."))
val secondfix = second.take(second.lastIndexOf("."))
val thirdFixed = third.take(third.lastIndexOf("."))
Row.fromSeq(Seq(firstFixed, secondfix, thirdFixed, fourth,fifth,sixth,seventh,eighth))
})
val Frist_Dataframe = session.createDataFrame(Row_Dstream_Train, customSchema)
但是问题是从需要的列中没有提取任何东西!能否请您指导我为什么将第三列提取为空?谢谢
But the problem is that from the thired column nothing extracted! Can you please guid me why the third column is extracted empty? Thanks
推荐答案
您的输入数据不是固定长度的,因此获取所需的解决方案有些棘手。考虑到您所提供的输入数据可以作为解决方案。您可以根据需要的增加进行更改
Your input data is not of fixed length so it was a bit tricky to get the solution you require. Considering the input data your provided following can be solution. You can change as your need increases
val Row_Dstream_Train = Dstream_Train.map(line => line.split(",")).map(array => {
val array1 = array(0).trim.split("IP")
val array2 = array1(1).split(">")
val array3 = array2(1).split(":")
val acknum5 = if(array(1).contains("seq")) array(2) else array(1)
val winnum6 = if(array(1).contains("seq")) array(3) else array(2)
val len7 = if(array(1).contains("seq")) array(1).trim.split(" ")(1) else ""
val first = Try(array1(0).trim) getOrElse ""
val second = Try(array2(0).trim) getOrElse ""
val third = Try(array3(0)) getOrElse ""
val sixth = Try(acknum5.trim.split(" ")(1)) getOrElse ""
val seventh = Try(winnum6.trim.split(" ")(1)) getOrElse ""
val eighth = Try(len7.substring(len7.lastIndexOf(":")+1, len7.length).toInt) getOrElse 0
val secondfix = second.take(second.lastIndexOf("."))
val sport3 = second.substring(second.lastIndexOf(".")+1, second.length)
val thirdFixed = third.take(third.lastIndexOf("."))
val rport4 = third.substring(third.lastIndexOf(".")+1, third.length)
Row.fromSeq(Seq(first, secondfix, thirdFixed, sport3,rport4,sixth,seventh,eighth))
})
val Frist_Dataframe = sqlContext.createDataFrame(Row_Dstream_Train, customSchema)
您将获得输出
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|time_stamp_0 |sender_ip_1|receiver_ip_2|s_port_3|r_port_4|acknum_5|winnum_6|len_7|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
|06:36:15.718068|10.0.0.1 | 10.0.0.2 |5001 |41516 |346 |163 |0 |
|06:36:15.718078|10.0.0.2 | 10.0.0.1 |41516 |5001 |0 |58 |65161|
+---------------+-----------+-------------+--------+--------+--------+--------+-----+
我希望解决方案对您有帮助
I hope the solution is helpful
这篇关于提取特定列以形成文本文件以在Scala中制作数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!