PySpark:使用newAPIHadoopFile从多行记录文本文件读取,映射和缩小 [英] PySpark: read, map and reduce from multiline record textfile with newAPIHadoopFile
问题描述
我试图解决一个类似于这个问题帖子。我的原始数据是一个包含多个传感器值(观测值)的文本文件。每个观察结果都有一个时间戳,但传感器名称只给出一次,而不是每行。但是在一个文件中有几个传感器。
时间MHist :: 852-YF-007
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05 -09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53 :00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05 -09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43 :00 0
2016-05-09 23:42:00 0
时间MHist :: 852-YF-008
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23 :46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
因此我想配置Hadoop在给出传感器信息的那些行上分割文件。然后从这些行中读取传感器名称(例如852-YF-007和852-YF-008),并使用MapReduce相应地读取每个传感器的值。
I在Python(Jupyter Notebook)中做到了这一点:
sheet = sc.newAPIHadoopFile(
'/ user / me / sample .txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop .io.Text',
conf = {'textinputformat.record.delimiter':'Time\tMHist'}
)
sf = sheet.filter(lambda(k ,v):
sf.map(lambda(k,v):v).splitlines())
sf.take(50)
$ b输出如下:
$ $ $ $ $ $ $ $ $ > [[u':: 852-YF-007''t',
u'2016-05-10 00:00:00''t0',
u'2016-05-09 23 :59:00'\\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0' ,
u'2016-05-09 23:56:00\t0',
u 2016-05-09 23:55:00'\\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53 :00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00'\\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05 -09 23:48:00'\\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\\ \\ t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00'\\t0',
u'2016-05-09 23:42:00\t0'],
[u':: 852- YF-008''t',
u'2016-05-10 00:00:00'\\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00'\\t0',
u'2016-05-09 23:57:00\t0',
u'2016- 05-09 23:56:00'\\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00 \t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52 :00'\\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00'\\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05 -09 23:47:00 \t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00 \\ t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0']]
我的问题是,如何进一步处理它以提取传感器名称并为该传感器设置价值线。有点喜欢这个
852-YF-007 - > sensor_lines阵列
852-YF-008 - > sensor_lines数组
线条本身会在稍后分割为时间戳和值。但我更感兴趣的是从行中分割传感器名称。
我个人认为:
-
使用
扩展分隔符::
sheet = sc.newAPIHadoopFile(
path,
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org。 apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf = {'textinputformat.record.delimiter':'Time\tMHist ::'}
)
-
放置键:
values = sheet.values()
- <
$ bnon_empty = values.filter(lambda x:x)
-
split:
grouped_lines = non_empty.map(str.splitlines)
-
单独的键和值:
来自操作员导入itemgette r
pairs = grouped_lines.map(itemgetter(0,slice(1,None)))
$
pairs.flatMapValues(lambda xs:[x.split( \ t)for x in xs])
import dateutil.parser
def process(pair):
_,content = pair
clean = [x.strip()for x in content.strip()。splitlines()]
如果不干净:
return []
k,vs = clean [0],clean [1:]
for v in vs:
try:
ds,x = v.split( \ t)
yield k,(dateutil.parser.parse(ds),float(x))#或int(x)
除ValueError外:
传递
sheet.flatMap(process)
I'm trying so solve a problem that is kind of similar to this post. My original data is a text file that contains values (observations) of several sensors. Each observation is given with a timestamp but the sensor name is given only once, and not in each line. But there a several sensors in one file.
Time MHist::852-YF-007
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time MHist::852-YF-008
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Therefore I want to configure Hadoop to split the file at those lines where the sensor-information is given. Then read the sensor name (e.g. 852-YF-007 and 852-YF-008) from those lines and use MapReduce for reading the values for each sensor accordingly.
I did this in Python (Jupyter Notebook):
sheet = sc.newAPIHadoopFile(
'/user/me/sample.txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)
sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())
sf.take(50)
The output is like this:
[[u'::852-YF-007\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0'],
[u'::852-YF-008\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0']]
My question is, how to further process this to extract the sensor name and having the value-lines for that sensor. Somewhat likes this
852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines
The lines themselves will be then split into timestamp and value later on. But I'm more interested in splitting the sensor names from the lines.
Personally I would:
extend delimiter with
::
sheet = sc.newAPIHadoopFile( path, 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 'org.apache.hadoop.io.LongWritable', 'org.apache.hadoop.io.Text', conf={'textinputformat.record.delimiter': 'Time\tMHist::'} )
drop keys:
values = sheet.values()
filter out empty entries
non_empty = values.filter(lambda x: x)
split:
grouped_lines = non_empty.map(str.splitlines)
separate keys and values:
from operator import itemgetter pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
and finally split values:
pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
All of that can done with a single function of course:
import dateutil.parser
def process(pair):
_, content = pair
clean = [x.strip() for x in content.strip().splitlines()]
if not clean:
return []
k, vs = clean[0], clean[1:]
for v in vs:
try:
ds, x = v.split("\t")
yield k, (dateutil.parser.parse(ds), float(x)) # or int(x)
except ValueError:
pass
sheet.flatMap(process)
这篇关于PySpark:使用newAPIHadoopFile从多行记录文本文件读取,映射和缩小的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!