火花流:从 kafka 读取 CSV 字符串,写入镶木地板 [英] spark streaming: read CSV string from kafka, write to parquet

查看:42
本文介绍了火花流:从 kafka 读取 CSV 字符串,写入镶木地板的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有很多从 Kafka 读取 json 的在线示例(写入 parquet) - 但我不知道如何将架构应用于来自 kafka 的 CSV 字符串.

There are lots of online examples of reading json from Kafka (to write to parquet) - but I cannot figure out how to apply a schema to a CSV string from kafka.

流式数据:

customer_1945,cusaccid_995,27999941    
customer_1459,cusaccid_1102,27999942

架构:

schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())

阅读流:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
  .option("subscribe", "test") \
  .load()

我将其用于 JSON:

I used this for JSON:

interval=df \
  .select(from_json(col("value").cast("string"), schema).alias("json")) \
  .select("json.*")

在使用指定的架构将其写入 parquet 之前:

Before writing it to parquet with the assigned schema:

query=interval     \
  .writeStream  \
  .format("parquet") \
  .option("checkpointLocation", "/user/whatever/checkpoint24") \
  .start("/user/ehatever/interval24")

因为我不能将 from_json() 用于 CSV - 我不知道如何将架构应用于数据帧,以便我可以使用类似的 writeStream() 命令.

As I cannot use from_json() for CSV - I dont know how to apply the schema to the dataframe so that I can use a similar writeStream() command.

推荐答案

我就是这样做的.没有 from_json,提取 csv 字符串:

This is how I did it. Without from_json, extract the csv string:

interval=df.select(col("value").cast("string")) .alias("csv").select("csv.*")

然后将其拆分为列.这可以使用上面相同的语句编写为镶木地板文件

And then split it into columns. This can be written as a parquet file using the same statement above

interval2=interval \
      .selectExpr("split(value,',')[0] as customer_id" \
                 ,"split(value,',')[1] as customer_acct_id" \
                 ,"split(value,',')[2] as serv_acct_id" \
                 ,"split(value,',')[3] as installed_service_id" \
                 ,"split(value,',')[4] as meter_id" \
                 ,"split(value,',')[5] as channel_number" \
                 ... etc
                 )

这篇关于火花流:从 kafka 读取 CSV 字符串,写入镶木地板的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆