使用pyflink从本地系统以批处理模式读取CSV文件 [英] Reading a csv file in batch mode using pyflink from local system

查看:24
本文介绍了使用pyflink从本地系统以批处理模式读取CSV文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在编写pyflink作业时尝试读取已建立的CSV文件。我使用文件系统连接器来获取数据,但是在DDL上执行EXECUTE_SQL()并稍后在表上执行查询之后,我收到一个错误,说明它无法获取下一个结果。我无法解决此错误。我已经检查了CSV文件,它是完全正确的,并且可以与 pandas 一起工作,但是在这里我不明白为什么它不能获取下一行。请查找附加的代码以供参考。

from pyflink.common.serialization import SimpleStringEncoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import EnvironmentSettings, StreamTableEnvironment, BatchTableEnvironment
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit
import pandas as pd
from inspect import getmembers, isfunction
import os
## CREATE THE ENVIRONMENT

# create a blink batch TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
table_env = BatchTableEnvironment.create(environment_settings=env_settings)
#connector for ingesting the data
source_ddl = """
                CREATE TABLE MyUserTable (
                        timestamp_info TIMESTAMP(3),
                        column_a FLOAT,
                        column_b FLOAT,
                        column_c INT,
                        ) WITH (
                          'connector' = 'filesystem',          
                          'path' = 'file:///Users/abc/Projects/temp.csv', 
                          'format' = 'csv'

                        )"""


#connector for data output/sink
sink_ddl = """
                CREATE TABLE results (
                            timestamp_info TIMESTAMP(3),
                            score FLOAT)
                            WITH (
                                'connector' = 'filesystem',
                                'path' = 'file:///Users/abc/Projects/results.csv',
                                'format' = 'csv'
                            )"""

#make the table corresponding to the schema mentioned
source_table = table_env.execute_sql(source_ddl)
sink_table = table_env.execute_sql(sink_ddl)

#convert the sql table to table API
table_path = table_env.from_path("MyUserTable")

# execute SELECT statement
table_result2 = table_env.execute_sql("SELECT timestamp_info,column_a FROM MyUserTable")
table_result2.print()

即将出现的错误:-

WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.12.2.jar) to field java.lang.Class.ANNOTATION
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
+-------------------------+--------------------------------+
|          timestamp_info |                       column_a |
+-------------------------+--------------------------------+
Traceback (most recent call last):
  File "local_implementation.py", line 51, in <module>
    table_result2.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/table/table_result.py", line 219, in print
    self._j_table_result.print()
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__
    return_value = get_return_value(
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 147, in deco
    return f(*a, **kw)
  File "/Users/avil.a/opt/anaconda3/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o46.print.
: java.lang.RuntimeException: Failed to fetch next result
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
    at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
    at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
    at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
    at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
    at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
    at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.io.IOException: Failed to fetch job execution result
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
    at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
    ... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2086)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
    ... 18 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680)
    at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658)
    at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2158)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
    at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
    ... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:564)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize CSV row.
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:257)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:162)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
    at java.sql/java.sql.Timestamp.valueOf(Timestamp.java:196)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.convertToTimestamp(CsvToRowDataConverters.java:250)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createNullableConverter$ac6e531e$1(CsvToRowDataConverters.java:113)
    at org.apache.flink.formats.csv.CsvToRowDataConverters.lambda$createRowConverter$18bb1dd$1(CsvToRowDataConverters.java:98)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory$CsvInputFormat.nextRecord(CsvFileSystemFormatFactory.java:251)

推荐答案

时间戳格式与csv文件中的时间索引不匹配。此外,当像这样读取CSV文件时,它将获取第一行并尝试解析它,如果您的CSV文件中有头,那么它将给出错误,因为它一开始需要时间戳格式,但得到的字符串无法解析为所需的格式。因此,请确保CSV文件中没有标头。

这篇关于使用pyflink从本地系统以批处理模式读取CSV文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆