在Java Flink作业中使用Python用户定义的函数 [英] Using Python user defined function in a Java Flink Job

查看:268
本文介绍了在Java Flink作业中使用Python用户定义的函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

无论如何,是否可以在Java Flink作业中使用python用户定义的函数,或者无论如何进行通信,例如通过flink与python用户定义的函数与java进行转换的结果,以应用一些机器学习的东西:

Is there anyway to use a python user defined function within a Java Flink Job or anyway to communicate for example the result of a transformation done by flink with java with a python user defined function to apply some machine learning things:

我知道您可以通过pyFlink执行以下操作:

I know that from pyFlink you can do something like this:

table_env.register_java_function("hash_code", "my.java.function.HashCode")

但是我需要做类似的事情,但是要从Java添加python函数,或者如何将java转换的结果直接传递给Python UDF Flink作业?

But I need to do something like that but add the python function from java, or how can I pass the result of a java transformation to a Python UDF Flink job directly?

我希望这些问题不要太疯狂了,但我需要知道是否存在以Java为主要语言的Flink DataStream API与Python Table API进行通信的方式?这意味着从Java我需要做的是:来源->转换->接收器,但是其中一些转换可以触发Python函数,或者Python函数将等待Java转换完成以对Stream结果执行某些操作.

I hope these questions are not to crazy, but I need to know if exist somehow to communicate Flink DataStream API with Python Table API having Java as main language? this means that from Java I need to do: Source -> Transformations -> Sink, but some of these transformations can trigger a Python function or a Python function will be waiting for some Java transformation to finish to do something with the Stream result.

我希望有人能理解我在这里要做的事情.

I hope someone understand what I'm trying to do here.

亲切的问候!

推荐答案

此集成示例:假设Flink 1.11是当前版本,则pom.xml中需要此依赖项.

Example of this integration: This dependency is needed in your pom.xml, assuming that Flink 1.11 is the current version.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.2</version>
  <scope>provided</scope>
</dependency>

创建环境:

private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private StreamTableEnvironment tableEnv = getTableAPIEnv(env);

/*this SingleOutputStreamOperator will contains the result of the consumption from the  defined source*/
private SingleOutputStreamOperator<Event> stream; 


public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
        tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
        tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
        return tableEnv;
    }

从您要进行的转换开始,例如:

Start with the transformations that you want to do, for example:

SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));

/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
  /*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());

/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
 private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
        return stream.process(new UserProfileProcessFunction());
    }


/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
        Table events = tableEnv.fromDataStream(rowInput,
                $("id"), $("noOfHits"), $("timestamp"))
                .select("FunctionName(id, noOfHits, timestamp)");
        return tableEnv.toAppendStream(events, Row.class);
    }

最后

env.execute("Job Name");

function.py 脚本中的名为 FunctionName 的python函数示例:

An example of the python function called FunctionName into the function.py script:

@udf(
    input_types=[
        DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
    ],
    result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
    # function code here
    return f"{id}|{noOfHits}|{timestamp}"

这篇关于在Java Flink作业中使用Python用户定义的函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆