PyFlink 向量化 UDF 抛出 NullPointerException [英] PyFlink Vectorized UDF throws NullPointerException

查看:77
本文介绍了PyFlink 向量化 UDF 抛出 NullPointerException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 ML 模型,它接受两个 numpy.ndarray - usersitems - 并返回一个 numpy.ndarray predictions.在普通的 Python 代码中,我会这样做:

I have a ML model that takes two numpy.ndarray - users and items - and returns an numpy.ndarray predictions. In normal Python code, I would do:

model = load_model()

df = load_data() # the DataFrame includes 4 columns, namely, user_id, movie_id, rating, and timestamp

users = df.user_id.values
items = df.movie_id.values

predictions = model(users, items)

我正在考虑将此代码移植到 Flink 中以利用其分布式特性.我的假设是:通过在多个 Flink 节点上分配预测工作负载,我应该能够更快地运行整个预测.

I am looking into porting this code into Flink to leverage its distributed nature. My assumption is: by distributing the prediction workload on multiple Flink nodes, I should be able to run the whole prediction faster.

所以我编写了一个 PyFlink 作业.注意我实现了一个名为 predict 的 UDF 来运行预测.

So I compose a PyFlink job. Note I implement an UDF called predict to run the prediction.

# batch_prediction.py

model = load_model()

settings = EnvironmentSettings.new_instance().use_blink_planner().build()
exec_env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(exec_env, environment_settings=settings)

SOURCE_DDL = """
CREATE TABLE source (
    user_id INT,
    movie_id INT,
    rating TINYINT,
    event_ms BIGINT
) WITH (
    'connector' = 'filesystem',
    'format' = 'csv',
    'csv.field-delimiter' = '\t',
    'path' = 'ml-100k/u1.test'
)
"""

SINK_DDL = """
CREATE TABLE sink (
    prediction DOUBLE
) WITH (
    'connector' = 'print'
)
"""

t_env.execute_sql(SOURCE_DDL)
t_env.execute_sql(SINK_DDL)
t_env.execute_sql(
    "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
).wait()

这是 UDF.

# batch_prediction.py (cont)

@udf(result_type=DataTypes.DOUBLE())
def predict(user, item):
    return model([user], [item]).item()

t_env.create_temporary_function("predict", predict)

作业运行良好.然而,预测实际上在 source 表的每一行上运行,这不是高效的.相反,我想将 80,000 (user_id, movie_id) 对分成 100 个批次,每个批次有 800 行.该作业触发 model(users, items) 函数 100 次(= # of batch),其中 usersitems 都有 800 个元素.

The job runs fine. However, the prediction actually runs on each and every row of the source table, which is not performant. Instead, I want to split the 80,000 (user_id, movie_id) pairs into, let's say, 100 batches, with each batch having 800 rows. The job triggers the model(users, items) function 100 times (= # of batch), where both users and items have 800 elements.

我找不到办法做到这一点.通过查看 docs,矢量化的用户定义函数可以工作.

I couldn't find a way to do this. By looking at the docs, vectorized user-defined functions may work.

# batch_prediction.py (snippet)

# I add the func_type="pandas"
@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(user, item):
    ...

不幸的是,它没有.

> python batch_prediction.py
...
Traceback (most recent call last):
  File "batch_prediction.py", line 55, in <module>
    "INSERT INTO sink SELECT PREDICT(user_id, movie_id) FROM source"
  File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/table/table_result.py", line 76, in wait
    get_method(self._j_table_result, "await")()
  File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/usr/local/anaconda3/envs/flink-ml/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o51.await.
: java.util.concurrent.ExecutionException: org.apache.flink.table.api.TableException: Failed to wait job finish
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.table.api.internal.TableResultImpl.awaitInternal(TableResultImpl.java:119)
    at org.apache.flink.table.api.internal.TableResultImpl.await(TableResultImpl.java:86)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Failed to wait job finish
    at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:59)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.isFirstRowReady(TableResultImpl.java:368)
    at org.apache.flink.table.api.internal.TableResultImpl.lambda$awaitInternal$1(TableResultImpl.java:107)
    at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1640)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
    at org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:57)
    ... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:229)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:996)
    at akka.dispatch.OnComplete.internal(Future.scala:264)
    at akka.dispatch.OnComplete.internal(Future.scala:261)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
    at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:217)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:208)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:610)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:419)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:286)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:201)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    ... 4 more
Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
    at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1108)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1082)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1213)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:91)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
    at java.lang.Thread.run(Thread.java:748)
Caused by: TimerException{java.lang.RuntimeException: Failed to close remote bundle}
    ... 13 more
Caused by: java.lang.RuntimeException: Failed to close remote bundle
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291)
    at org.apache.flink.table.runtime.operators.python.scalar.arrow.RowDataArrowPythonScalarFunctionOperator.invokeFinishBundle(RowDataArrowPythonScalarFunctionOperator.java:77)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
    ... 12 more
Caused by: java.lang.NullPointerException
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369)
    ... 18 more

错误消息不是很有帮助.任何人都可以帮忙吗?谢谢!

The error messages are not very helpful. Can anyone help? Thanks!

注意:可以在此处找到源代码.要运行代码,您需要在本地安装 Anaconda,然后:

Note: source code can be found here. To run the code, you will need Anaconda locally, then:

conda env create -f environment.yml
conda activate flink-ml

推荐答案

感谢 Apache Flink 社区的 Dian Fu.见 线程.

Credits to Dian Fu from Apache Flink community. See thread.

对于 Pandas UDF,每个输入参数的输入类型是 Pandas.Series,结果类型也应该是 Pandas.Series.此外,结果的长度应与输入相同.您能否检查一下您的 Pandas UDF 实现是否属于这种情况?

For Pandas UDF, the input type for each input argument is Pandas.Series and the result type should also be a Pandas.Series. Besides, the length of the result should be the same as the inputs. Could you check if this is the case for your Pandas UDF implementation?

然后我决定为我的 UDF 添加一个 pytest 单元测试来验证输入和输出类型.方法如下:

Then I decide to add a pytest unit test for my UDF to verify the input and output type. Here is how:

import pandas as pd

from udf_def import predict


def test_predict():
    f = predict._func
    users = pd.Series([1, 2, 3])
    items = pd.Series([1, 4, 9])
    preds = f(users, items)
    assert isinstance(preds, pd.Series)
    assert len(preds) == 3

它揭示了实现错误.然后我修复我的 UDF 实现以通过单元测试.

It uncovers the implementation mistake. Then I fix my UDF implementation to pass the unit test.

from model_def import MatrixFactorization

@udf(result_type=DataTypes.DOUBLE(), func_type="pandas")
def predict(users, items):
    n_users, n_items = 943, 1682
    model = MatrixFactorization(n_users, n_items)
    model.load_state_dict(torch.load("model.pth"))
    return pd.Series(model(users, items).detach().numpy())

使用新的 UDF 实现,我可以运行批量预测.

With the new UDF implementation, I am able to run the batch prediction.

> python batch_prediction.py
1> +I(-2.3802385330200195)
1> +I(20.000154495239258)
1> +I(-30.704544067382812)
1> +I(-11.602800369262695)
1> +I(10.998968124389648)
...

可以在此处找到更新的源代码.

The updated source code can be found here.

这篇关于PyFlink 向量化 UDF 抛出 NullPointerException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆