Spark Java:如何将数据从 HTTP 源移动到 Couchbase 接收器? [英] Spark Java: How to move data from HTTP source to Couchbase sink?

查看:24
本文介绍了Spark Java:如何将数据从 HTTP 源移动到 Couchbase 接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Web 服务器上有一个 .gz 文件,我想以流式方式使用该文件并将数据插入 Couchbase..gz 文件中只有一个文件,每行包含一个 JSON 对象.

I've a .gz file available on a Web server that I want to consume in a streaming manner and insert the data into Couchbase. The .gz file has only one file in it, which in turn contains one JSON object per line.

由于 Spark 没有 HTTP 接收器,所以我自己写了一个(如下所示).我正在使用 Couchbase Spark 连接器 进行插入.但是,在运行时,该作业实际上并未插入任何内容.我怀疑这是由于我对 Spark 缺乏经验并且不知道如何开始和等待终止.正如您在下面看到的,有 2 个地方可以进行此类调用.

Since Spark doesn't have a HTTP receiver, I wrote one myself (shown below). I'm using Couchbase Spark connector to do the insertion. However, when running, the job is not actually inserting anything. I've a suspicion that it is due to my inexperience with Spark and not knowing how to start and await termination. As you can see below, there are 2 places such calls can be made.

接收方:

public class HttpReceiver extends Receiver<String> {
    private final String url;

    public HttpReceiver(String url) {
        super(MEMORY_AND_DISK());
        this.url = url;
    }

    @Override
    public void onStart() {
        new Thread(() -> receive()).start();
    }

    private void receive() {
        try {
            HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setAllowUserInteraction(false);
            conn.setInstanceFollowRedirects(true);
            conn.setRequestMethod("GET");
            conn.setReadTimeout(60 * 1000);

            InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
            Reader decoder = new InputStreamReader(gzipStream, UTF_8);
            BufferedReader reader = new BufferedReader(decoder);

            String json = null;
            while (!isStopped() && (json = reader.readLine()) != null) {
                store(json);
            }
            reader.close();
            conn.disconnect();
        } catch (IOException e) {
            stop(e.getMessage(), e);
        }
    }

    @Override
    public void onStop() {

    }
}

数据加载:

public void load(String url) throws StreamingQueryException, InterruptedException {
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
        JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));

        lines.foreachRDD(rdd ->
                sql.read().json(rdd)
                        .select(new Column("id"),
                                new Column("name"),
                                new Column("rating"),
                                new Column("review_count"),
                                new Column("hours"),
                                new Column("attributes"))
                        .writeStream()
                        .option("idField", "id")
                        .format("com.couchbase.spark.sql")
                        .start()
//                        .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
        );

//        ssc.start();
        ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

注释行显示了我对开始和终止作业的困惑.另外,如果接收器有问题或可以改进,请随时对接收器发表评论.

The commented lines show my confusion with starting and terminating the jobs. Also, feel free to comment regarding the receiver if there's something wrong with it or can be improved.

在 Java 中使用 Spark v2.1.0.

Using Spark v2.1.0 with Java.

编辑 1:

也试过这个实现:

lines.foreachRDD(rdd ->
          couchbaseWriter(sql.read().json(rdd)
                  .select(new Column("id"),
                          new Column("name"),
                          new Column("rating"),
                          new Column("review_count"),
                          new Column("hours"),
                          new Column("attributes"))
                  .write()
                  .option("idField", "id")
                  .format("com.couchbase.spark.sql"))
                  .couchbase()
  );

  ssc.start();
  ssc.awaitTermination();

但它抛出IllegalStateException: SparkContext has been shutdown

11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler  - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
    at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)

编辑 2:原来编辑 1 中的错误是由我关闭上下文的 @PostDestruct 方法引起的.我正在使用 Spring 并且 bean 应该是单例的,但不知何故 Spark 导致它在作业完成之前销毁.我现在删除了 @PostDestruct 并进行了一些更改;以下似乎有效,但有悬而未决的问题:

Edit 2: Turns out the error from edit 1 was caused by a @PostDestruct method where I was closing the context. I'm using Spring and the bean is supposed to be singleton, but somehow Spark is causing it to destroy before the job finishes. I've now removed the @PostDestruct and made some changes; the following seems to be working but with open questions:

public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    lines.foreachRDD(rdd -> {
        try {
            Dataset<Row> select = sql.read().json(rdd)
                    .select("id", "name", "rating", "review_count", "hours", "attributes");
            couchbaseWriter(select.write()
                    .option("idField", "id")
                    .format(format))
                    .couchbase();
        } catch (Exception e) {
            // Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
        }
    });

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

未解决的问题:

  1. 不时抛出AnalysisException: 无法解析 'id' 给定的输入列:[];.这是我的接收器的问题吗?
  2. 当文档已经存在时,任务失败并出现以下异常.就我而言,我只想覆盖文档(如果存在),而不是炸毁.

  1. Time to time throws AnalysisException: cannot resolve 'id' given input columns: [];. Is this a problem with my receiver?
  2. When the document already exists, the task fails with the following exception. In my case, I'd simply like to overwrite the doc if present, not blow up.

Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException
at com.couchbase.client.java.CouchbaseAsyncBucket$13.call(CouchbaseAsyncBucket.java:475)

推荐答案

回答我自己的问题,这就是我最后毫无例外的工作:

Answering my own question, this is what I finally have working without any exceptions:

public void load(String dataDirURL, String format) throws InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    ObjectMapper objectMapper = new ObjectMapper();

    lines.foreachRDD(rdd -> {
                JavaRDD<RawJsonDocument> docRdd = rdd
                        .filter(content -> !isEmpty(content))
                        .map(content -> {
                            String id = "";
                            String modifiedContent = "";
                            try {
                                ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
                                if (node.has("id")) {
                                    id = node.get("id").textValue();
                                    modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            } finally {
                                return RawJsonDocument.create(id, modifiedContent);
                            }
                        })
                        .filter(doc -> !isEmpty(doc.id()));
                couchbaseDocumentRDD(docRdd)
                        .saveToCouchbase(UPSERT);
            }
    );

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

这篇关于Spark Java:如何将数据从 HTTP 源移动到 Couchbase 接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆