KSQL - 使用 GEO_DISTANCE 计算 2 条消息的距离 [英] KSQL - calculate distance from 2 messages using GEO_DISTANCE

查看:21
本文介绍了KSQL - 使用 GEO_DISTANCE 计算 2 条消息的距离的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 kafka 主题,主题中的每条消息都有经纬度和事件时间戳.创建了一个引用主题的流,并想使用 geo_distance 计算两点之间的距离.例子

I've a kafka topic and each message in the topic has lat/lon and event timestamp. Created a stream referring to topic and would like to calculate distance between 2 point using geo_distance . example

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

我想在上面的流上创建一个新的流并用距离来丰富它.

I would like to create a new stream on the above stream and enrich it with distance.

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

使用 KSQL 是否可以达到预期的结果?或者在处理新消息时如何引用以前的消息?

Is it possible to achieve desired results using KSQL ? Or how to refer previous message while processing new message?

推荐答案

首先,这些读数是否来自某种设备?如果是这样,您是否有他们的唯一 ID (UUID)?我会把它放到你的流中,所以它会喜欢 UUID, GpsDateTime, lat, lon.

First, do these readings come from some sort of device? If so do you have a unique ID (UUID) for them? I would put that into your stream, so it would like like UUID, GpsDateTime, lat, lon.

您需要创建一个相当基本的 Kafka Streams 应用程序.在此应用程序中,您将从流中读取的最新数据存储到 StoreBuilder.然后,当收到来自 Kafka 的新消息时,您将检索此最新值,进行计算,然后将新的经纬度值存储到 StoreBuilder.

You will need to create a fairly basic Kafka Streams app. Within this app you will be storing the most recent reading from your stream into a StoreBuilder. Then when a new message is received from Kafka you will retrieve this latest value, do your computation and then store the new lat,long values into the StoreBuilder.

当然,我不清楚您是否只想永远有一个经纬度值,并且所有后续值都是从第一次读数开始计算的.或者,如果您想进行滚动计算,始终比较上次读数和当前读数之间的距离.

Of course I'm not clear if you are wanting to only ever have one lat,long value and all your subsequent values are computed from the 1st reading. Or if you want to have a rolling compute where you are always comparing the distance between the last and current reading.

无论如何,你可以在实践中看到这段代码:https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

Anyway, you can see this code in practice at: https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

此示例是字数统计示例,但可以根据您的用例快速转换.

This example is a word count example, but can be quickly converted for your use case.

静态最终类 WordCountTransformerSupplier(第 78 行)将成为您的 LatLongDistanceComputation.

The static final class WordCountTransformerSupplier (line 78) would become your LatLongDistanceComputation.

您将创建具有适当类型(无论您将纬度/经度存储为什么)的 StoreBuilder(第 154 行).

You would create the StoreBuilder (line 154) with proper types (whatever you are storing your lat/lon as).

第 165 行是实际从流入的值流中读取项目的位置.

Line 165 is where the item is actually being read from the stream of values flowing in.

当然,您还需要编辑 inputTopic 和 outputTopic(第 66-67 行)以及其他一些内容.

And of course you need to edit the inputTopic and outputTopic (line 66-67) amongst a few other things.

这篇关于KSQL - 使用 GEO_DISTANCE 计算 2 条消息的距离的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆