无法通过StreamExecutionEnvironment使用S3接收器写入S3-Apache Flink 1.1.4 [英] Unable to write to S3 using S3 sink using StreamExecutionEnvironment - Apache Flink 1.1.4

查看:168
本文介绍了无法通过StreamExecutionEnvironment使用S3接收器写入S3-Apache Flink 1.1.4的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个简单的Apache Flink项目,该项目将从Kafka主题读取数据并将该数据写入S3存储桶.运行项目时,我没有收到任何错误,它成功读取了Kafka主题中的每条消息,但是没有任何内容写入我的S3存储桶.没有错误,因此很难尝试和调试正在发生的事情.以下是我的项目和配置.这仅在我使用StreamExecutionEnviornment时发生.如果我尝试使用常规的批处理ExecutionEnviornment来生产S3,那么它将起作用.

I have created a simple Apache Flink project that will read data from a Kafka topic and write that data to an S3 bucket. I do not receive any errors when I run the project and it successfully reads each message from the Kafka topic, but nothing is written to my S3 bucket. There are no errors so it is difficult to try and debug what is going on. Below is my project and my configurations. This is only occurring when I am using a StreamExecutionEnviornment. If I try to just produce to S3 using a regular batch ExecutionEnviornment it works.

S3测试Java程序

public class S3Test {

public static void main(String[] args) throws Exception {
    // parse input arguments
    final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));


    // write kafka stream to standard out.
    //messageStream.print();
    String id = UUID.randomUUID().toString();
    messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);

    env.execute("Write to S3 Example");
}
}

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.7.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.2.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.2.5</version>
    </dependency>

    <!-- Apache Kafka Dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

core-site.xml(Hadoop配置)

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
</property>

<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
 large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
    <name>fs.s3a.access.key</name>
    <value>***************</value>
</property>

<!-- set your AWS access key -->
<property>
    <name>fs.s3a.secret.key</name>
    <value>****************</value>
</property>

</configuration>

推荐答案

要通过Flink从Kafka主题保留到S3,需要使用RollingSink. RollingSink使用Bucketer来指定零件文件将被保存到的目录的名称. DateTime是默认的Bucketer,但您也可以创建一个自定义的.只要达到最大批处理大小,就会保存并关闭零件文件,然后将创建一个新的零件文件.下面的代码有效:

Persisting from Kafka topic to S3 via Flink requires the use of the RollingSink. RollingSink uses Bucketer to specify the name of the directories to which the part files will be saved. DateTime is the default Bucketer, but you can also create a custom one. Part files will be saved and closed whenever the max batch size is reached and then a new part file will be created. The code below works:

public class TestRollingSink {

    public static void main(String[] args){
        Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml");

    final ParameterTool parameterTool = ParameterTool.fromMap(configs);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.getConfig().disableSysoutLogging();
    env.getConfig().setGlobalJobParameters(parameterTool);
    env.socketTextStream("localhost", 9092);

    DataStream<String> parsed = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));

    env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);

    RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST");
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
    sink.setWriter(new StringWriter<String>());
    sink.setBatchSize(200);
    sink.setPendingPrefix("file-");
    sink.setPendingSuffix(".txt");
    parsed.print();
    parsed.addSink(sink).setParallelism(1);

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

这篇关于无法通过StreamExecutionEnvironment使用S3接收器写入S3-Apache Flink 1.1.4的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆