如何在Python中的Spark Dataframe上应用任何类型的Map Transformation [英] How to apply any sort of Map Transformation on Spark Dataframe in Python

查看:95
本文介绍了如何在Python中的Spark Dataframe上应用任何类型的Map Transformation的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark Structure Streaming,代码如下:

I am using Spark Structure Streaming, the code is in following:

 def convert_timestamp_to_datetime(timestamp):
    return datetime.fromtimestamp(timestamp)


 def extract():
       spark = SparkSession \
         .builder \
         .appName("StructuredNetworkWordCount") \
         .getOrCreate()

    json_schema = \
         StructType() \
        .add(StructField("TIMESTAMP", FloatType(), True)) \
        .add(StructField("index", IntegerType(), True)) \
        .add(StructField("CUSTOMER_ID", StringType(), True)) \
        .add(StructField("CODE_ID", StringType(), True)) \
        .add(StructField("PROCESS", StringType(), True))

     my_df = spark \
         .readStream \
         .format("kafka") \
         .option("kafka.bootstrap.servers", "localhost:9092") \
         .option("subscribe", "simple_json_12_10trx") \
         .option("startingOffsets", "earliest") \
         .load()
     my_df = my_df.select(from_json(col('value').cast('string'), json_schema).alias("json"))
convert_timestamp_datetime_udf = udf(lambda x: convert_timestamp_to_datetime(x), TimestampType())
      return my_df.select('json.*', convert_timestamp_datetime_udf('json.TIMESTAMP').alias('DATETIME'))
  
 def transform_load(my_df, epoch_id):
       update_obj = my_df.groupBy('CUSTOMER_ID').agg(F.count('CUSTOMER_ID').alias('count_t'),F.collect_set('CODE_ID').alias('unique_CODE'))
update_obj.show()
update(update_obj)


 if __name__ == '__main__':
     start = time.time()
     df = extract()
     query = df.writeStream \
        .outputMode('append')\
        .foreachBatch(transform_load)\
        .start() \
        .awaitTermination()

我想访问分布的Spark Dataframe的每一行.因此,我必须使用Map转换.我只是添加了此简单代码来测试Spark Map.但是,我在控制台中未收到任何输出.实际上, func 不会运行.

I want to access each row of Spark Dataframe distributed. So, I must use Map transformation. I just add this simple code to test Spark Map. But, I do not receive any output in console. In fact, func is not run.

 def func(df):
      df.take(3)

 def update(df):
      df.rdd.map(func,preservesPartitioning=False)

请您指导我这里有什么问题吗?

Would you please guide me what is wrong here?

非常感谢.

推荐答案

问题已解决.

我忘了在地图后使用 action .另外,我不能使用 df.take(3),因为 func 中没有任何数据帧,它是 rdd 而没有 take 属性.我这样更改代码:

I forgot to use action after map. Also, I cannot use df.take(3) since there is not any dataframe in func, it is rdd which does not have take attribute. I change the code like this :

 def func(x):
    print(x.CUSTOMER_ID)

 def update(df):
    df.rdd.map(func,preservesPartitioning=False).count()

count()是我用来查看地图结果的动作.

count() is an action that I use to see map result.

这篇关于如何在Python中的Spark Dataframe上应用任何类型的Map Transformation的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆