Spark Java:如何将数据从HTTP源移动到Couchbase接收器? [英] Spark Java: How to move data from HTTP source to Couchbase sink?

查看:62
本文介绍了Spark Java:如何将数据从HTTP源移动到Couchbase接收器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想以流方式使用Web服务器上的一个.gz文件,并将数据插入Couchbase. .gz文件中只有一个文件,该文件又每行包含一个JSON对象.

I've a .gz file available on a Web server that I want to consume in a streaming manner and insert the data into Couchbase. The .gz file has only one file in it, which in turn contains one JSON object per line.

由于Spark没有HTTP接收器,所以我自己写了一个(如下所示).我正在使用 Couchbase Spark连接器进行插入.但是,在运行时,该作业实际上并未插入任何内容.我怀疑这是由于我对Spark的经验不足,并且不知道如何启动和等待终止.如您在下面看到的,可以在两个地方进行此类呼叫.

Since Spark doesn't have a HTTP receiver, I wrote one myself (shown below). I'm using Couchbase Spark connector to do the insertion. However, when running, the job is not actually inserting anything. I've a suspicion that it is due to my inexperience with Spark and not knowing how to start and await termination. As you can see below, there are 2 places such calls can be made.

接收器:

public class HttpReceiver extends Receiver<String> {
    private final String url;

    public HttpReceiver(String url) {
        super(MEMORY_AND_DISK());
        this.url = url;
    }

    @Override
    public void onStart() {
        new Thread(() -> receive()).start();
    }

    private void receive() {
        try {
            HttpURLConnection conn = (HttpURLConnection) new URL(url).openConnection();
            conn.setAllowUserInteraction(false);
            conn.setInstanceFollowRedirects(true);
            conn.setRequestMethod("GET");
            conn.setReadTimeout(60 * 1000);

            InputStream gzipStream = new GZIPInputStream(conn.getInputStream());
            Reader decoder = new InputStreamReader(gzipStream, UTF_8);
            BufferedReader reader = new BufferedReader(decoder);

            String json = null;
            while (!isStopped() && (json = reader.readLine()) != null) {
                store(json);
            }
            reader.close();
            conn.disconnect();
        } catch (IOException e) {
            stop(e.getMessage(), e);
        }
    }

    @Override
    public void onStop() {

    }
}

数据加载:

public void load(String url) throws StreamingQueryException, InterruptedException {
        JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
        JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(url));

        lines.foreachRDD(rdd ->
                sql.read().json(rdd)
                        .select(new Column("id"),
                                new Column("name"),
                                new Column("rating"),
                                new Column("review_count"),
                                new Column("hours"),
                                new Column("attributes"))
                        .writeStream()
                        .option("idField", "id")
                        .format("com.couchbase.spark.sql")
                        .start()
//                        .awaitTermination(sparkProperties.getTerminationTimeoutMillis())
        );

//        ssc.start();
        ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

带注释的行显示了我对开始和终止工作的困惑.另外,如果接收器有问题或可以改进,请随时对接收器发表评论.

The commented lines show my confusion with starting and terminating the jobs. Also, feel free to comment regarding the receiver if there's something wrong with it or can be improved.

在Java中使用Spark v2.1.0.

Using Spark v2.1.0 with Java.

编辑1 :

还尝试了此实现:

lines.foreachRDD(rdd ->
          couchbaseWriter(sql.read().json(rdd)
                  .select(new Column("id"),
                          new Column("name"),
                          new Column("rating"),
                          new Column("review_count"),
                          new Column("hours"),
                          new Column("attributes"))
                  .write()
                  .option("idField", "id")
                  .format("com.couchbase.spark.sql"))
                  .couchbase()
  );

  ssc.start();
  ssc.awaitTermination();

但是它抛出IllegalStateException: SparkContext has been shutdown

11004 [JobScheduler] ERROR org.apache.spark.streaming.scheduler.JobScheduler  - Error running job streaming job 1488664987000 ms.0
java.lang.IllegalStateException: SparkContext has been shutdown
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1910)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1981)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1088)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1082)
    at org.apache.spark.sql.execution.datasources.json.InferSchema$.infer(InferSchema.scala:69)

编辑2 : 原来来自编辑1的错误是由我关闭上下文的@PostDestruct方法引起的.我正在使用Spring,并且该bean应该是单例的,但是以某种方式Spark导致它在作业完成之前被销毁.现在,我删除了@PostDestruct并进行了一些更改;以下内容似乎有效,但有未解决的问题:

Edit 2: Turns out the error from edit 1 was caused by a @PostDestruct method where I was closing the context. I'm using Spring and the bean is supposed to be singleton, but somehow Spark is causing it to destroy before the job finishes. I've now removed the @PostDestruct and made some changes; the following seems to be working but with open questions:

public void load(String dataDirURL, String format) throws StreamingQueryException, InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    lines.foreachRDD(rdd -> {
        try {
            Dataset<Row> select = sql.read().json(rdd)
                    .select("id", "name", "rating", "review_count", "hours", "attributes");
            couchbaseWriter(select.write()
                    .option("idField", "id")
                    .format(format))
                    .couchbase();
        } catch (Exception e) {
            // Time to time throws AnalysisException: cannot resolve '`id`' given input columns: [];
        }
    });

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

悬而未决的问题:

  1. 定时投掷 AnalysisException: cannot resolve ' id ' given input columns: [];.我的接收器有问题吗?
  2. 文档已经存在时,任务将失败,但以下情况除外.就我而言,我只想覆盖该文档(如果存在),而不是炸掉.

  1. Time to time throws AnalysisException: cannot resolve 'id' given input columns: [];. Is this a problem with my receiver?
  2. When the document already exists, the task fails with the following exception. In my case, I'd simply like to overwrite the doc if present, not blow up.

Lost task 1.0 in stage 2.0 (TID 4, localhost, executor driver): com.couchbase.client.java.error.DocumentAlreadyExistsException
at com.couchbase.client.java.CouchbaseAsyncBucket$13.call(CouchbaseAsyncBucket.java:475)

推荐答案

回答我自己的问题,这就是我最终在没有任何异常的情况下所做的工作:

Answering my own question, this is what I finally have working without any exceptions:

public void load(String dataDirURL, String format) throws InterruptedException {
    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(1000));
    JavaReceiverInputDStream<String> lines = ssc.receiverStream(new HttpReceiver(dataDirURL));

    ObjectMapper objectMapper = new ObjectMapper();

    lines.foreachRDD(rdd -> {
                JavaRDD<RawJsonDocument> docRdd = rdd
                        .filter(content -> !isEmpty(content))
                        .map(content -> {
                            String id = "";
                            String modifiedContent = "";
                            try {
                                ObjectNode node = objectMapper.readValue(content, ObjectNode.class);
                                if (node.has("id")) {
                                    id = node.get("id").textValue();
                                    modifiedContent = objectMapper.writeValueAsString(node.retain(ALLOWED_FIELDS));
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                            } finally {
                                return RawJsonDocument.create(id, modifiedContent);
                            }
                        })
                        .filter(doc -> !isEmpty(doc.id()));
                couchbaseDocumentRDD(docRdd)
                        .saveToCouchbase(UPSERT);
            }
    );

    ssc.start();
    ssc.awaitTerminationOrTimeout(sparkProperties.getTerminationTimeoutMillis());
}

这篇关于Spark Java:如何将数据从HTTP源移动到Couchbase接收器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆