选择所有字段作为json字符串作为Flink SQL中的新字段 [英] Select all fields as json string as new field in Flink SQL
问题描述
我正在使用Flink Table API.我有一个表定义,我想选择所有字段并将它们转换为新字段中的JSON字符串.
I am using Flink Table API. I have a table definition that I want to select all fields and convert them to a JSON string in a new field.
我的表有三个字段; a:字符串,b:整数,c:时间戳.
如果我愿意
INSERT INTO kinesis
SELECT a, b, c from my_table
kinesis流具有json记录;
The kinesis stream has json records;
{
"a" : value,
"b": value,
"c": value
}
但是,我想要一些类似于Spark的功能;
However, I want something similar to Spark's functions;
INSERT INTO kinesis
SELECT "constant_value" as my source, to_json(struct(*)) as playload from my_table
所以,预期结果是
{
"my_source": "constant_value",
"payload": "json string from the first example that has a,b,c"
}
我在Flink中看不到任何 to_json
或 struct()
函数.有可能实现吗?
I can't see any to_json
or struct()
functions in Flink. Is it possible to implement?
推荐答案
您可能必须实现自己的用户定义的聚合函数.
you might have to implement your own user-defined aggregate function.
这就是我所做的,在这里我假设输入到UDF的情况是
this is what i did, here i assume the input to the UDF looks like
to_json('col1',col1,'col2',col2)
to_json('col1', col1, 'col2', col2)
public class RowToJson extends ScalarFunction {
public String eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
if(row.length % 2 != 0) {
throw new Exception("Wrong key/value pairs!");
}
String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
String name = row[index].toString();
Object value = row[index+1];
... ...
}).collect(Collectors.joining(",", "{", "}"));
return json;
}
}
如果您希望udf可以用于分组依据,则必须从AggregateFunction扩展udf类
if you expect udf could be used for group by, you have to extend your udf class from AggregateFunction
public class RowsToJson extends AggregateFunction<String, List<String>>{
@Override
public String getValue(List<String> accumulator) {
return accumulator.stream().collect(Collectors.joining(",", "[", "]"));
}
@Override
public List<String> createAccumulator() {
return new ArrayList<String>();
}
public void accumulate(List<String> acc, @DataTypeHint(inputGroup = InputGroup.ANY) Object... row) throws Exception {
if(row.length % 2 != 0) {
throw new Exception("Wrong key/value pairs!");
}
String json = IntStream.range(0, row.length).filter(index -> index % 2 == 0).mapToObj(index -> {
String name = row[index].toString();
Object value = row[index+1];
... ...
}).collect(Collectors.joining(",", "{", "}"));
acc.add(json);
}
}
这篇关于选择所有字段作为json字符串作为Flink SQL中的新字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!