在输出到控制台之前处理 Spark Structured Streaming 上的数据 [英] Processing Data on Spark Structured Streaming before outputting to the console
问题描述
我会尽量保持简单.我定期从 kafka 生产者读取一些数据并使用 Spark 结构化流输出以下内容
I'll try to keep it simple. I periodically read some data from a kafka producer and output the following using Spark Structured streaming
我有这样输出的数据:
+------------------------------------------+-------------------+--------------+-----------------+
|window |timestamp |Online_Emp |Available_Emp |
+------------------------------------------+-------------------+--------------+-----------------+
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:27|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:41|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:29|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:20|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:23|2 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:52|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:08|1 |0 |
|[2017-12-31 16:01:00, 2017-12-31 16:02:00]|2017-12-31 16:01:12|1 |0 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:02|1 |1 |
|[2017-12-31 16:00:00, 2017-12-31 16:01:00]|2017-12-31 16:00:11|1 |0 |
+------------------------------------------+-------------------+--------------+-----------------+
我希望它像这样输出:
Time Online_Emp Available_Emp
2017-01-01 00:00:00 52 23
2017-01-01 00:01:00 58 19
2017-01-01 00:02:00 65 28
所以基本上它计算每分钟在线的员工数(通过他们唯一的司机 ID)并显示有多少人可用.
So basically it counts the employees online per minute (through their unique driver id) and shows how many are available.
请注意,一个特定的员工 ID 可能在一分钟内在 available
和 on_duty
之间变化,我们需要在该分钟结束前进行最终统计
Note that one specific employ id may change between
available
andon_duty
within the minute and we need the final tally until the end of the minute
卡夫卡产品
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x).encode('utf-8'))
# schedule.every(1).minutes.do(_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) ) )
with open(filepath, 'r', encoding="utf16") as f:
for item in json_lines.reader(f):
dataDict.update({'timeStamp':item['timestamp'],
'emp_id':item['emp_id'],
'on_duty':item['on_duty']})
_producer.send(topic_name, value=( json.loads(json.dumps(dataDict))) )
sleep(1)
Spark 结构化流式传输
schema = StructType([ \
StructField("timeStamp", LongType()), \
StructField("emp_id", LongType()), \
StructField("on_duty", LongType())])
df = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe","emp_dstream")\
.option("startingOffsets", "earliest")\
.load()\
.selectExpr("CAST(value AS STRING)")\
.select(F.from_json(F.col("value"), schema).alias("value"))\
.select(F.col("value.*"))\
.withColumn("timestamp",F.from_unixtime(F.col("timestamp") / 1000))\
.groupBy(F.window(F.col("timestamp"), "1 minutes"), F.col("timestamp"))\
.agg(F.count(F.col("timeStamp")).alias("total_employees"),F.collect_list(F.col("on_duty")).alias("on_duty"),F.sum(F.when(F.col("on_duty") == 0, F.lit(1)).otherwise(F.lit(0))).alias("not_on_duty"))\
.writeStream\
.format("console")\
.outputMode("complete")\
.option("truncate", "false")\
.start()\
.awaitTermination()
如何获得所需的输出?
如有任何提示或帮助,我们将不胜感激!
Would be grateful for any hints or help!
推荐答案
您的代码运行完美.请检查以下 kafka 数据 &火花流输出.
Your code works perfect. Please check below kafka data & spark streaming output.
Batch 5
是您的最终结果,忽略其他批次,例如 Batch 0 到 4.始终考虑最新批次已经更新了 kafka 中可用数据的记录.
Batch 5
is your final result and ignore other batches like Batch 0 to 4. Always consider latest batch has updated records as of data available in kafka.
批次:0
No data in kafka.
Spark Streaming
+------+---------+---------------+-------+-----------+
|window|timestamp|total_employees|on_duty|not_on_duty|
+------+---------+---------------+-------+-----------+
+------+---------+---------------+-------+-----------+
批次:1
Published to kafka.
{"timeStamp": 1592669691475, "emp_id": 12471114, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691475, "emp_id": 12471124, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691475, "emp_id": 12471134, "on_duty": 0} //2020-06-20T21:44:51
Spark Streaming
+---------------------------------------------+-------------------+---------------+---------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|3 |[0, 0, 0]|3 |
+---------------------------------------------+-------------------+---------------+---------+-----------+
批次:2
Published to kafka.
{"timeStamp": 1592669691475, "emp_id": 12471144, "on_duty": 0} //2020-06-20T21:44:51 // seconds difference
{"timeStamp": 1592669691575, "emp_id": 12471124, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471234, "on_duty": 0} //2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471334, "on_duty": 1} //2020-06-20T21:44:51
Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|7 |[0, 0, 0, 1, 0, 0, 0]|6 |
+---------------------------------------------+-------------------+---------------+---------------------+-----------+
批次:3
Published to kafka.
{"timeStamp": 1592669691575, "emp_id": 12471124, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471424, "on_duty": 1} // 2020-06-20T21:44:51
{"timeStamp": 1592669631475, "emp_id": 12472188, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472288, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472388, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472488, "on_duty": 1} // 2020-06-20T21:43:51
Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|9 |[0, 1, 0, 0, 0, 1, 0, 0, 0]|7 |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|4 |[1, 0, 0, 1] |2 |
+---------------------------------------------+-------------------+---------------+---------------------------+-----------+
批次:4
Published to kafka.
{"timeStamp": 1592669691575, "emp_id": 12471524, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669691575, "emp_id": 12471624, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669631475, "emp_id": 12471188, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12472288, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12473388, "on_duty": 0} // 2020-06-20T21:43:51
{"timeStamp": 1592669631475, "emp_id": 12474488, "on_duty": 1} // 2020-06-20T21:43:51
Spark Streaming
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|11 |[0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]|9 |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|8 |[1, 0, 0, 1, 1, 0, 0, 1] |4 |
+---------------------------------------------+-------------------+---------------+---------------------------------+-----------+
批次:5
Published to kafka.
{"timeStamp": 1592669571475, "emp_id": 12482185, "on_duty": 1} // 2020-06-20T21:42:51
{"timeStamp": 1592669571475, "emp_id": 12483185, "on_duty": 1} // 2020-06-20T21:42:51
{"timeStamp": 1592669631475, "emp_id": 12484488, "on_duty": 1} // 2020-06-20T21:43:51
{"timeStamp": 1592669691575, "emp_id": 12491524, "on_duty": 0} // 2020-06-20T21:44:51
{"timeStamp": 1592669091575, "emp_id": 12491124, "on_duty": 0} // 2020-06-20T21:34:51
{"timeStamp": 1592669091575, "emp_id": 12491224, "on_duty": 1} // 2020-06-20T21:34:51
Spark Streaming
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
|window |timestamp |total_employees|on_duty |not_on_duty|
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
|[2020-06-20 21:34:00.0,2020-06-20 21:35:00.0]|2020-06-20 21:34:51|2 |[0, 1] |1 |
|[2020-06-20 21:42:00.0,2020-06-20 21:43:00.0]|2020-06-20 21:42:51|2 |[1, 1] |0 |
|[2020-06-20 21:44:00.0,2020-06-20 21:45:00.0]|2020-06-20 21:44:51|12 |[0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0]|10 |
|[2020-06-20 21:43:00.0,2020-06-20 21:44:00.0]|2020-06-20 21:43:51|9 |[1, 1, 0, 0, 1, 1, 0, 0, 1] |4 |
+---------------------------------------------+-------------------+---------------+------------------------------------+-----------+
这篇关于在输出到控制台之前处理 Spark Structured Streaming 上的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!