星火定制流丢弃大部分数据 [英] Spark custom streaming dropping most of the data

查看:192
本文介绍了星火定制流丢弃大部分数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在下面的火花利用客户接收流的例子作为<一个火花网站上提供给href=\"https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java\"相对=nofollow>星火顾客接收器。

不过,这项工作似乎放弃了我的大部分数据。任何数据量我流,它成功地在消费者接收。然而,当我做任何地图/ flatmap操作,我只看到10行数据。这是总是这样,无论我多么的数据流。

我修改这个程序从的ActiveMQ 队列中读取。如果我看的ActiveMQ Web界面,火花作业成功消耗了我的所有生成数据。然而,每批次仅10个数据被处理。我试图改变批量大小不同的价值观和尝试了当地以及6节点火花集群 - 到处都是相同的结果。

这是很无奈,因为我不知道为什么被处理的数据量有限。有什么事,我在这里丢失?

这是我的星火计划项目。自定义接收器包括在内。此外,我真的不产生任何套接字连接。相反,我硬编码用于测试目的的消息。相同的行为时流创建套接字连接的。

  / *
 *一个或多个下授权给Apache软件基金会(ASF)
 * contributor许可证协议。请参阅使用分布式通知文件
 *本作品的关于版权的所有权信息。
 *本asf文件许可证可以在apache许可证2.0版
 *(以下简称许可证);你可能不使用这个文件除了在合规
 *许可证。您可以在获得许可证的副本
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 *除非适用法律要求或书面同意,软件
 *许可下发布的分布在原样的基础,
 *无担保或任何形式的条件,无论是前preSS或暗示的保证。
 *请参阅许可证的特定语言的管理权限和
 *根据许可证的限制。
 * /包com.rzt.main;进口com.google.common.collect.Lists;进口org.apache.spark.SparkConf;
进口org.apache.spark.api.java.function.FlatMapFunction;
进口org.apache.spark.api.java.function.Function2;
进口org.apache.spark.api.java.function.PairFunction;
进口org.apache.spark.storage.StorageLevel;
进口org.apache.spark.streaming.Duration;
进口org.apache.spark.streaming.api.java.JavaDStream;
进口org.apache.spark.streaming.api.java.JavaPairDStream;
进口org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
进口org.apache.spark.streaming.api.java.JavaStreamingContext;
进口org.apache.spark.streaming.receiver.Receiver;
进口scala.Tuple2;进口java.io.BufferedReader中;
进口java.io.InputStreamReader中;
进口java.net.ConnectException;
进口的java.net.Socket;
进口的java.util.regex.Pattern;/ **
 *是通过套接字接收数据的自定义接收器。接收的字节数
 *间preTED文字和\\ n分隔行被看作是记录。他们
 *然后计数并打印。
 *
 *用途:TestReceiv3&LT;主&GT; &LT;&主机GT; &LT;港口&GT; &LT;主&GT;是星火主
 * URL。在本地模式,&LT;主&GT;应该是局部[N]'其中n&GT; 1.&LT;&主机GT;和
 *&LT;港口&GT;火花流将连接到接收数据的TCP服务器。
 *
 *要在本地机器上运行,你需要先运行一个服务器Netcat的`$
 * NC -lk 9999`,然后运行的例子`$斌/运行例子
 * org.apache.spark.examples.streaming.TestReceiv3本地主机9999`
 * /公共类TestReceiv3扩展接收器&LT;串GT; {
    私有静态最终模式空间= Pattern.compile();    公共静态无效的主要(字串[] args){        //创建以1第二批大小的情况下
        SparkConf sparkConf =新SparkConf()setAppName(TestReceiv3)setMaster(本地[4])。;
        JavaStreamingContext SSC =新JavaStreamingContext(sparkConf,新的持续时间(1000));        //与目标IP自定义接收器创建一个输入流:端口
        //算
        在\\输入流//话ñ分隔文本(例如,通过产生的NC)
        JavaReceiverInputDStream&LT;串GT;线= ssc.receiverStream(新TestReceiv3(TEST,1));
        JavaDStream&LT;串GT;字= lines.flatMap(新FlatMapFunction&LT;字符串,字符串&GT;(){
            公众可迭代&LT;串GT;调用(字符串x){
                的System.out.println(收到消息+ X);
                返回Lists.newArrayList(X);
            }
        });        words.print();
        ssc.start();
        ssc.awaitTermination();
    }    // =============接收器code,它通过套接字接收数据
    // ==============    字符串主机= NULL;
    INT端口= -1;    公共TestReceiv3(字符串HOST_,诠释port_){
        超级(StorageLevel.MEMORY_AND_DISK_2());
        主机= HOST_;
        端口= port_;
    }    公共无效调用onStart(){
        //开始,超过一个连接接收数据线
        新的Thread(){
            @覆盖
            公共无效的run(){
                接收();
            }
        }。开始();
    }    公共无效的onStop(){
        //没有什么很多工作要做,因为线程调用接收()
        //旨在阻止自身isStopped()返回false
    }    / **创建一个套接字连接,直到停止接收器接收数据* /
    私人无效接收(){
        Socket套接字= NULL;
        字符串userInput = NULL;        尝试{            INT I = 0;
            //直到停止或折断继续读书连接
            而(真){
                我++;
                存储(MESSAGE+ I);
                如果(我== 1000)
                    打破;
            }            //试图重新启动再次连接时,服务器处于活动状态
            //再次
            重新启动(试图重新连接);
        }赶上(的Throwable t)的{
            重新启动(接收数据时出错,T);
        }
    }
}


解决方案

您所看到的输出从 words.print未来() DStream.print 只打印DSTREAM的前10个元素。
从<一个href=\"http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.dstream.DStream\"相对=nofollow>文档:


  

高清打印():单位


  
  

打印在此DSTREAM产生的每个RDD的前十个元素。
  这是一个输出操作者,所以这DSTREAM将被登记为
  输出流并有兑现。


您将需要存储的流数据的地方(就像使用 DStream.saveAsTextFiles(...),以检查它在它的全部。

I'm following the example for spark streaming using customer receiver as given in the spark site available at Spark customer receiver.

However, the job seems to drop most my data. Whatever the amount of data I stream, it is successfully received at the consumer. However, when I do any map/ flatmap operations on it, I just see 10 rows of data. This is always the case no matter how much data I stream.

I have modified this program to read from ActiveMQ queue. If I look at ActiveMQ web interface, the spark job successfully consumes all the data I generate. HOWEVER, only 10 data per batch is processed. I tried changing batch size to various values and tried it on local as well as 6 node spark cluster - everywhere the same result.

It's really frustrating as I don't know why a limited amount of data is being processed. Is there something I'm missing here ?

This is my spark program. The custom receiver is included. Also I'm not really creating any socket connection. Instead, I'm hard coding the message for test purposes. Behaves the same as when socket connection is created for stream.

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.rzt.main;

import com.google.common.collect.Lists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.receiver.Receiver;
import scala.Tuple2;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.Socket;
import java.util.regex.Pattern;

/**
 * Custom Receiver that receives data over a socket. Received bytes is
 * interpreted as text and \n delimited lines are considered as records. They
 * are then counted and printed.
 *
 * Usage: TestReceiv3 <master> <hostname> <port> <master> is the Spark master
 * URL. In local mode, <master> should be 'local[n]' with n > 1. <hostname> and
 * <port> of the TCP server that Spark Streaming would connect to receive data.
 *
 * To run this on your local machine, you need to first run a Netcat server `$
 * nc -lk 9999` and then run the example `$ bin/run-example
 * org.apache.spark.examples.streaming.TestReceiv3 localhost 9999`
 */

public class TestReceiv3 extends Receiver<String> {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) {

        // Create the context with a 1 second batch size
        SparkConf sparkConf = new SparkConf().setAppName("TestReceiv3").setMaster("local[4]");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));

        // Create a input stream with the custom receiver on target ip:port and
        // count the
        // words in input stream of \n delimited text (eg. generated by 'nc')
        JavaReceiverInputDStream<String> lines = ssc.receiverStream(new TestReceiv3("TEST", 1));
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String x) {
                System.out.println("Message received" + x);
                return Lists.newArrayList(x);
            }
        });

        words.print();
        ssc.start();
        ssc.awaitTermination();
    }

    // ============= Receiver code that receives data over a socket
    // ==============

    String host = null;
    int port = -1;

    public TestReceiv3(String host_, int port_) {
        super(StorageLevel.MEMORY_AND_DISK_2());
        host = host_;
        port = port_;
    }

    public void onStart() {
        // Start the thread that receives data over a connection
        new Thread() {
            @Override
            public void run() {
                receive();
            }
        }.start();
    }

    public void onStop() {
        // There is nothing much to do as the thread calling receive()
        // is designed to stop by itself isStopped() returns false
    }

    /** Create a socket connection and receive data until receiver is stopped */
    private void receive() {
        Socket socket = null;
        String userInput = null;

        try {

            int i = 0;
            // Until stopped or connection broken continue reading
            while (true) {
                i++;
                store("MESSAGE " + i);
                if (i == 1000)
                    break;
            }

            // Restart in an attempt to connect again when server is active
            // again
            restart("Trying to connect again");
        } catch (Throwable t) {
            restart("Error receiving data", t);
        }
    }
}

解决方案

The output you are seeing is coming from words.print(). DStream.print only prints the first 10 elements of the DStream. From the docs:

def print(): Unit

Print the first ten elements of each RDD generated in this DStream. This is an output operator, so this DStream will be registered as an output stream and there materialized.

You will need to store the streaming data somewhere (like using DStream.saveAsTextFiles(...) in order to inspect it in its totality.

这篇关于星火定制流丢弃大部分数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆