迭代引发DataFrame的行并在Spark中设置值 [英] Iterating Throws Rows of a DataFrame and Setting Value in Spark

查看:41
本文介绍了迭代引发DataFrame的行并在Spark中设置值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想以我是Spark Noob(4天前才开始读书)开头这个问题.不过,我正在尝试在Python的Pandas库的帮助下移植我编写的内容,以便可以利用我们刚刚启动的集群.熊猫数据框 df 中的数据如下所示:

I would like to preface this question with I'm a Spark Noob (just started reading a book 4 days ago). Nevertheless, I'm trying to port over something I wrote with the help of the Pandas library in Python so that I can take advantage of the cluster we just spun up. The data in the pandas dataframe df looks like this:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |              NaN |
| 2017-01-01 00:01:01 | customer1 | user2 | app2        |              NaN |
| 2017-01-01 00:02:01 | customer1 | user1 | app2        |              NaN |
| 2017-01-01 00:03:01 | customer1 | user1 | app1        |              NaN |
+---------------------+-----------+-------+-------------+----------------------+

在Python中,我编写了以下内容:

In Python, I wrote the following:

unique_users = df.User.unique().tolist()
for j in range(0, len(unique_users):
    user = unique_users[0]
    access_events_for_user = df[df.User == user].copy()
    indexes_for_access_events = access_events_for_user.index
    applications_used = dict()
    for i in range(0, len(access_events)):
        current_access_event_ts = df.loc[current_auth_index].TimeStamp 
        if i == 0:
            current_access_event_index = int(indexes_for_access_events[i])
            df[current_access_event_index, 'TimeSinceApplicaiton'] = 2592000
            continue
    if df.loc[current_access_event_index].Application in applications_used:
        time_since = current_access_event_ts - \ 
            applications_used[df.loc[current_access_event_index].Application]).total_seconds()
        df.loc[current_access_event_index, ’TimeSinceApplication] = time_since
        applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts
    else:
        df.loc[current_access_event_index, ’TimeSinceApplication] = 2592000
        applications_used[df.loc[current_access_event_index].Application] = current_access_event_ts

它吐出这样的东西:

+---------------------+-----------+-------+-------------+----------------------+
|      TimeStamp      | Customer  | User  | Application | TimeSinceApplication |
+---------------------+-----------+-------+-------------+----------------------+
| 2017-01-01 00:00:01 | customer1 | user1 | app1        |              2592000 |
| 2017-01-01 00:01:01 | customer1 | user2 | app2        |              2592000 |
| 2017-01-01 00:02:01 | customer1 | user1 | app2        |              2592000 |
| 2017-01-01 00:03:01 | customer1 | user1 | app1        |                  180 |
|                     |           |       |             |                      |
+---------------------+-----------+-------+-------------+----------------------+

基本上,我想获取用户访问该应用程序以来的时间.如果这是用户第一次访问该应用程序,则将其设置为默认30天.我们可以按客户对数据进行分区,并按时间戳进行排序,以便井井有条.我只是不确定如何在Spark中像调用

Basically, I'm trying to get the time since the user visited the application. If it's the first time the user has accessed the application, I just set it to the default of 30 days. We can partition the data by customer and order it by timestamp so that it is in order. I'm just unsure of how to do this without calling a collect() in Spark like the answers in here, which would defeat the purpose of Spark. Is this even possible?

推荐答案

这正在接近DataFrame API可能达到的复杂性极限.也许其他人可能会建议使用DataFrames做到这一点的方法,但是我个人认为RDD API更适合于此.这是一个示例,可让您大致了解如何构建Spark的算法:

This is approaching the limit of complexity that's possible with the DataFrame API. Someone else may be able to suggest a method of doing this with DataFrames, but personally I think the RDD API is much more suited to this. Here's an example to give you an idea of how to structure your algorithms for Spark:

data = [(datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1'),
        (datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2'),
        (datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2'),
        (datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1')]

rdd = sc.parallelize(data)

def toTimeSince(row):
    cust_user_app, timestamps = row
    timestamps = sorted(timestamps)
    result = [(timestamps[0], *cust_user_app, timedelta(30))]
    previous_timestamp = timestamps[0]
    for timestamp in sorted(timestamps)[1:]:
        result.append((timestamp, *cust_user_app, timestamp - previous_timestamp))
    return result

(rdd
 .map(lambda row: (row[1:], [row[0]])) # Data looks like ((customer, user, app), [timestamp])
 .reduceByKey(lambda a, b: a + b) # Data looks like ((customer, user, app), list_of_timestamps)
 .flatMap(toTimeSince) # Data looks like (timestamp, customer, user, app, time_since_previous)
 .collect())

结果:

[(datetime.datetime(2017, 1, 1, 0, 1, 1), 'customer1', 'user2', 'app2', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 2, 1), 'customer1', 'user1', 'app2', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 0, 1), 'customer1', 'user1', 'app1', datetime.timedelta(30)),
 (datetime.datetime(2017, 1, 1, 0, 3, 1), 'customer1', 'user1', 'app1', datetime.timedelta(0, 180))]

关键点是:

  • 您所描述的算法本质上并不适合Spark-行之间存在很强的依赖性(每行都必须通过与另一行进行比较来计算),这很难并行化.
  • 我的建议是使用Spark汇总具有相同客户,用户和应用程序的记录的时间戳列表.然后,可以很容易地为每个客户-用户-应用程序组合排序时间戳,并扩展回所需的数据集.

这篇关于迭代引发DataFrame的行并在Spark中设置值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆