读取并绘制从大文件中读取的数据的图形 [英] Reading and graphing data read from huge files
问题描述
我们有很大的文件,将1-1.5 GB的顺序(主要是日志文件)与可轻松解析为csv的原始数据相结合,随后应将其图形化以生成一组图形图像.
当前,我们正在使用bash脚本将原始数据转换为一个csv文件,其中仅包含需要绘制图形的数字,然后将其输入到gnuplot脚本中.但是这个过程非常缓慢.我试图通过用单个awk
命令替换一些管道cut
,tr
s等来加快bash脚本的速度,尽管这提高了速度,但整个过程仍然很慢.
因此,我开始相信有用于此过程的更好的工具.我目前正在寻找用python + numpy或R重写此过程.我的一个朋友建议使用JVM,如果要这样做,我将使用clojure,但不确定JVM的性能如何.
我在处理这类问题方面经验不足,因此,任何有关如何进行的建议都很好.谢谢.
另外,我将要存储生成的中间数据(即csv)(到磁盘),因此,如果我选择要生成的中间数据,则不必重新生成它.外观不同的图.
原始数据文件每行只有一条记录,其字段由定界符(|
)分隔.并非所有字段都是数字.我在输出csv中需要的每个字段都是通过在输入记录上应用特定公式获得的,该公式可以使用输入数据中的多个字段.输出的csv每行将有3-4个字段,我需要在(可能是)条形图中绘制1-2、1-3、1-4字段的图形.我希望可以提供更好的图片.
我已经对@adirau的脚本进行了一些修改,并且看起来运行良好.我已经走了足够远的距离,可以读取数据,发送到处理器线程池(伪处理,将线程名称附加到数据),并通过另一个 collector 线程将其聚合到输出文件中. /p>
PS:我不确定这个问题的标签,请随时纠正.
python听起来是个不错的选择,因为它具有良好的线程API(尽管实现起来有问题),matplotlib和pylab.我会从您的角度想起更多的规格,但这也许对您来说是一个很好的起点: matplotlib:使用线程进行异步绘图. 我会选择一个线程来处理大容量磁盘I/O读取并将队列排队到线程池中以进行数据处理(如果记录长度固定,则可以通过预先计算读取偏移量并将偏移量仅传递给线程池来加快处理速度) ;使用diskio线程,我将映射数据源文件,读取预定义的num字节+再读取一次,以最终将最后一个字节捕获到当前数据源lineinput的末尾;应在接近平均行输入长度的位置选择numbytes;接下来是通过队列进行池填充,以及在线程池中进行的数据处理/绘图;我在这里的情况不佳(确切地描述了您的绘图),但我希望这会有所帮助.
有file.readlines([sizehint])一次抓取多行;好吧,可能不是那么快,因为文档说它在内部使用readline()
快速骨架代码
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable
We have pretty large files, the order of 1-1.5 GB combined (mostly log files) with raw data that is easily parseable to a csv, which is subsequently supposed to be graphed to generate a set of graph images.
Currently, we are using bash scripts to turn the raw data into a csv file, with just the numbers that need to be graphed, and then feeding it into a gnuplot script. But this process is extremely slow. I tried to speed up the bash scripts by replacing some piped cut
s, tr
s etc. with a single awk
command, although this improved the speed, the whole thing is still very slow.
So, I am starting to believe there are better tools for this process. I am currently looking to rewrite this process in python+numpy or R. A friend of mine suggested using the JVM, and if I am to do that, I will use clojure, but am not sure how the JVM will perform.
I don't have much experience in dealing with these kind of problems, so any advice on how to proceed would be great. Thanks.
Edit: Also, I will want to store (to disk) the generated intermediate data, i.e., the csv, so I don't have to re-generate it, should I choose I want a different looking graph.
Edit 2: The raw data files have one record per one line, whose fields are separated by a delimiter (|
). Not all fields are numbers. Each field I need in the output csv is obtained by applying a certain formula on the input records, which may use multiple fields from the input data. The output csv will have 3-4 fields per line, and I need graphs that plot 1-2, 1-3, 1-4 fields in a (may be) bar chart. I hope that gives a better picture.
Edit 3: I have modified @adirau's script a little and it seems to be working pretty well. I have come far enough that I am reading data, sending to a pool of processor threads (pseudo processing, append thread name to data), and aggregating it into an output file, through another collector thread.
PS: I am not sure about the tagging of this question, feel free to correct it.
python sounds to be a good choice because it has a good threading API (the implementation is questionable though), matplotlib and pylab. I miss some more specs from your end but maybe this could be a good starting point for you: matplotlib: async plotting with threads. I would go for a single thread for handling bulk disk i/o reads and sync queueing to a pool of threads for data processing (if you have fixed record lengths things may get faster by precomputing reading offsets and passing just the offsets to the threadpool); with the diskio thread I would mmap the datasource files, read a predefined num bytes + one more read to eventually grab the last bytes to the end of the current datasource lineinput; the numbytes should be chosen somewhere near your average lineinput length; next is pool feeding via the queue and the data processing / plotting that takes place in the threadpool; I don't have a good picture here (of what are you plotting exactly) but I hope this helps.
EDIT: there's file.readlines([sizehint]) to grab multiple lines at once; well it may not be so speedy cuz the docs are saying its using readline() internally
EDIT: a quick skeleton code
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable
这篇关于读取并绘制从大文件中读取的数据的图形的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!