根据上次某个条件为真,将一列中的数据与另一行对齐 [英] Align data in one column with another row, based on the last time some condition was true

查看:47
本文介绍了根据上次某个条件为真,将一列中的数据与另一行对齐的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解析数百万行存在不幸缺陷的日志文件.与单个事件相关的数据可以跨日志条目拆分,但没有直接链接可以将多行数据重新排列为单行;相反,我必须推断这种关系.

I’m trying to parse millions of lines of log files that suffer from an unfortunate deficiency. Data relating to a single event can be split across log entries but there is no direct link with which to realign the data across several rows into a single row; instead I have to infer the relationship.

简要背景:

  1. 有 4 个我关心的对象会被多次修改
  2. 有一个由 8 个线程组成的线程池,它们将随机收集其中一项并开始处理它.此事件由 thing_nABC 标识,它们都具有非空值,我可以也从这个记录的事件中获取线程号.
  3. 在日志后面的某个地方,会有一个关于线程执行了多少迭代的日志条目.此事件将不包含其他信息(即它不会报告其操作的 thing_n)
  4. thread_num/thing_n 配对会不断变化
  5. 任意数量的线程都可以记录点 2 和 3 之间任意数量的事件,因此您不能简单地 .shift() Iterations 列将数据重新排列为单个行.
  1. There are 4 objects that I care about that will be modified many times
  2. There is a threadpool of 8 threads that will collect one of those things at random and begin processing it. This event is identified by thing_n, A, B, and C all having non-null values and I can get the thread number from this logged event too.
  3. Somewhere later in the log, there will be a log entry on how many iterations that thread carried out. This event will contain no other information (i.e. it will not report thing_n that it operated on)
  4. thread_num/thing_n pairings will change constantly
  5. Any number of threads can log any number of events between point 2 and 3, so you cannot simply .shift() the Iterations column to realign data into a single row.

不知何故,我需要将 Iterations 列与 thing_I_care_aboutABC 不为空,thread_num 匹配.有时间戳(不在我的 MCVE 中),如果有帮助,所有事件都按升序排序.

Somehow I need to realign the Iterations column with the previous (and only the previous) row in which thing_I_care_about, A, B, C are not null, and the thread_num matches. There are timestamps (not in my MCVE), and all events are sorted in ascending order if that helps.

示例输入:

   thing_I_care_about  thread_num    A    B    C      Iterations
0  thing_1             2             X    X    X      NaN
1  NaN                 2             X    X    NaN    NaN
2  thing_2             3             NaN  X    X      NaN
3  NaN                 2             NaN  NaN  NaN    110.0
4  thing_3             7             X    X    X      NaN
5  thing_4             5             X    X    NaN    NaN
6  NaN                 7             NaN  NaN  NaN    150.0

示例输出:

   thing_I_care_about  thread_num    A    B    C      Realigned Iterations
0  thing_1             2             X    X    X      110.0
1  NaN                 2             X    X    NaN    NaN
2  thing_2             3             NaN  X    X      NaN
3  NaN                 2             NaN  NaN  NaN    NaN
4  thing_3             7             X    X    X      150.0
5  thing_4             5             X    X    NaN    NaN
6  NaN                 7             NaN  NaN  NaN    NaN

我可以管理纯 python 方法(底部),但这种分析将按需重复进行,并且必须处理数亿个此类事件.从概念上讲,我能想到在 Pandas 中这样做的唯一方法是:

I can manage a pure python approach (bottom), but this analysis will be done repeatedly on demand and have to process hundreds of millions of such events. Conceptually, the only way I can think of doing this in Pandas is:

  1. groupby() thread_num 并按时间戳对每个组进行排序
  2. 尝试以某种方式为每个线程获取一个 DF,交替 notnull([thing_n, A, B, C, thread_num])notnull([thread_num, Iterations]) 行,以便我可以 shift(-1) 它们重新对齐数据
  3. 以某种方式将其与原始 DataFrame 联系起来
  1. groupby() thread_num and sort each group by their timestamp
  2. Try somehow to get a DF for each thread with alternating notnull([thing_n, A, B, C, thread_num]) and notnull([thread_num, Iterations]) rows so that I could shift(-1) them to realign the data
  3. Somehow tie this back to the original DataFrame

但是,我似乎无法通过这种方法取得进展.有什么聪明的方法可以做到这一点,还是我一直在用 Python 处理这部分?

However, I cannot seem to make progress with this approach. Is there any smart way of doing this, or am I stuck processing this part in Python?

纯python方法:

import numpy as np
import pandas as pd

raw_data = [['thing_I_care_about', 'thread_num', 'A', 'B', 'C', 'Iterations'], ['thing_1', 2, 'X', 'X', 'X', np.nan], [np.nan, 2, 'X', 'X', np.nan, np.nan], ['thing_2', 3, np.nan, 'X', 'X', np.nan], [np.nan, 2, np.nan, np.nan, np.nan, 110], ['thing_3', 7, 'X', 'X', 'X', np.nan], ['thing_4', 5, 'X', 'X', np.nan, np.nan], [np.nan, 7, np.nan, np.nan, np.nan, 150]]

data = pd.DataFrame(raw_data[1:], columns=raw_data[0])
print "Input format"
print data

header_dict = {item: x for x, item in enumerate(data.columns)}

# Take data out of DF to become nested list
data_list = data.as_matrix()

# Track the row in which a thread starts its process
active_threads = {} 

# Create a list to become to re-aligned column in the DF at the end for num iterations
realigned_data = [np.nan for x in xrange(len(data_list))]

for x, entry in enumerate(data_list):
    thread_num = int(entry[header_dict['thread_num']])

    if all([pd.notnull(entry[header_dict['thing_I_care_about']]),
           pd.notnull(entry[header_dict['A']]),
           pd.notnull(entry[header_dict['B']]),
           pd.notnull(entry[header_dict['C']])]):
        active_threads[thread_num] = x

    elif pd.notnull(entry[header_dict['Iterations']]) and entry[header_dict['thread_num']] in active_threads:
        realigned_data[active_threads[thread_num]] = entry[header_dict['Iterations']]

data['realigned_iterations'] = realigned_data
print "Output format"
print data

推荐答案

IIUC,我认为你可以这样做.创建两个掩码,一个表示当前迭代值所在的行.而且,第二个掩码将 True 放在您希望迭代值也移动的第一条记录上.然后使用 cumsum 对第一个掩码进行分组,并将当前值放在所有记录上,然后使用带有 where 的第二个掩码.

IIUC, I think you can do it this way. Create two masking one representing the rows where the current Iteration value is now. And, the second mask puts True on the first record where you want the Iteration value to move too. Then group on the first mask with cumsum and put that current value on all records, then use the second mask with where.

mask=(df['thing_I_care_about'].isnull() &
      df['A'].isnull() &
      df['B'].isnull() &
      df['C'].isnull())

fmask  = (df['thing_I_care_about'].notnull() &
      df['A'].notnull() &
      df['B'].notnull() &
      df['C'].notnull())

df.assign(Iterations=df.groupby(mask[::-1].cumsum())['Iterations'].transform(lambda x: x.iloc[-1]).where(fmask))

输出:

  thing_I_care_about  thread_num    A    B    C  Iterations
0            thing_1           2    X    X    X       110.0
1                NaN           2    X    X  NaN         NaN
2            thing_2           3  NaN    X    X         NaN
3                NaN           2  NaN  NaN  NaN         NaN
4            thing_3           7    X    X    X       150.0
5            thing_4           5    X    X  NaN         NaN
6                NaN           7  NaN  NaN  NaN         NaN

这篇关于根据上次某个条件为真,将一列中的数据与另一行对齐的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆