KSQL-使用GEO_DISTANCE从2条消息计算距离 [英] KSQL - calculate distance from 2 messages using GEO_DISTANCE

查看:110
本文介绍了KSQL-使用GEO_DISTANCE从2条消息计算距离的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个kafka主题,该主题中的每条消息都有经纬度和事件时间戳记。创建了一个引用主题的流,并希望使用geo_distance计算2点之间的距离。
示例

I've a kafka topic and each message in the topic has lat/lon and event timestamp. Created a stream referring to topic and would like to calculate distance between 2 point using geo_distance . example

GpsDateTime            lat              lon
2016-11-30 22:38:36,    32.685757,  -96.735942
2016-11-30 22:39:07,    32.687347,  -96.732841
2016-11-30 22:39:37,    32.68805,   -96.729726 

我想在上面的流上创建一个新流,并随着距离的增加而丰富。

I would like to create a new stream on the above stream and enrich it with distance.

GpsDateTime            lat              lon          Distance
2016-11-30 22:38:36,    32.685757,  -96.735942        0
2016-11-30 22:39:07,    32.687347,  -96.732841        0.340
2016-11-30 22:39:37,    32.68805,   -96.729726        0.302

使用KSQL是否有可能取得预期的结果?还是在处理新消息时如何引用上一条消息?

Is it possible to achieve desired results using KSQL ? Or how to refer previous message while processing new message?

推荐答案

首先,这些读数是否来自某种设备?如果是这样,您是否有一个唯一的ID(UUID)?我会将其放入您的流中,因此它想要 UUID,GpsDateTime,lat,lon

First, do these readings come from some sort of device? If so do you have a unique ID (UUID) for them? I would put that into your stream, so it would like like UUID, GpsDateTime, lat, lon.

您将需要创建一个相当基本的Kafka Streams应用程序。在此应用程序中,您会将流中的最新读数存储到StoreBuilder中。然后,当收到来自Kafka的新消息时,您将检索此最新值,进行计算,然后将新的经纬度值存储到StoreBuilder中。

You will need to create a fairly basic Kafka Streams app. Within this app you will be storing the most recent reading from your stream into a StoreBuilder. Then when a new message is received from Kafka you will retrieve this latest value, do your computation and then store the new lat,long values into the StoreBuilder.

当然我如果您只想让 ever 拥有一个纬度和经度值,并且您随后的所有值均是从一读得出的,则不清楚。或者,如果您想进行滚动计算,并始终比较上一次读数与当前读数之间的距离。

Of course I'm not clear if you are wanting to only ever have one lat,long value and all your subsequent values are computed from the 1st reading. Or if you want to have a rolling compute where you are always comparing the distance between the last and current reading.

无论如何,您可以在以下位置查看此代码的实际使用:< a href = https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java rel = nofollow noreferrer> https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

Anyway, you can see this code in practice at: https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java

此示例是字数示例,但可以根据您的使用案例快速进行转换。

This example is a word count example, but can be quickly converted for your use case.

静态最终类WordCountTransformerSupplier(第78行)将成为您的LatLongDistanceComputation。

The static final class WordCountTransformerSupplier (line 78) would become your LatLongDistanceComputation.

您将使用适当的类型(无论您存储的经/纬度值)创建StoreBuilder(第154行)。

You would create the StoreBuilder (line 154) with proper types (whatever you are storing your lat/lon as).

第165行实际上是从流入的值流中读取项目的地方。

Line 165 is where the item is actually being read from the stream of values flowing in.

当然,您需要编辑inputTopic和outputTopic(第66-67)。

And of course you need to edit the inputTopic and outputTopic (line 66-67) amongst a few other things.

这篇关于KSQL-使用GEO_DISTANCE从2条消息计算距离的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆