带有Spark 1.6.1 Hadoop 2.7.2的Google Dataproc中具有空记录的Kinesis流 [英] Kinesis Stream with Empty Records in Google Dataproc with Spark 1.6.1 Hadoop 2.7.2

查看:125
本文介绍了带有Spark 1.6.1 Hadoop 2.7.2的Google Dataproc中具有空记录的Kinesis流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从Google Dataproc连接到Amazon Kinesis Stream,但只获得空RDD.

I am trying to connect to Amazon Kinesis Stream from Google Dataproc but am only getting Empty RDDs.

Command: spark-submit  --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX        --awsSecretKey XXXX

详细日志: https://gist.github.com/sshrestha-datalicious/e3fc8ebb4916f27735a97e9fcc42136 a>

Detailed Log: https://gist.github.com/sshrestha-datalicious/e3fc8ebb4916f27735a97e9fcc42136c

更多详细信息
Spark 1.6.1
Hadoop 2.7.2
使用的程序集:/usr/lib/spark/lib/spark-assembly-1.6.1-hadoop2.7.2.jar

More Details
Spark 1.6.1
Hadoop 2.7.2
Assembly Used: /usr/lib/spark/lib/spark-assembly-1.6.1-hadoop2.7.2.jar

令人惊讶的是,当我通过以下命令下载并使用包含SPARK 1.6.1和Hadoop 2.6.0的程序集时,它可以工作.

Surprisingly that works when I download and use the assembly containing SPARK 1.6.1 with Hadoop 2.6.0 with the following command.

Command: SPARK_HOME=/opt/spark-1.6.1-bin-hadoop2.6 spark-submit  --verbose --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.2 demo_kinesis_streaming.py --awsAccessKeyId XXXXX        --awsSecretKey XXXX

我不确定两个hadoop版本与Kinesis ASL之间是否存在版本冲突,或者与Google Dataproc的自定义设置有关.

I am not sure if there is any version conflict between the two hadoop versions and Kinesis ASL or it has to do with custom settings with Google Dataproc.

任何帮助将不胜感激.

谢谢
苏伦

推荐答案

我们的团队处于类似情况,我们设法解决了这一问题:

Our team was in a similar situation and we managed to solve it:

我们在相同的环境上运行:

We are running on the same environment:

  • 带有Hadoop 2.7的Spark 1.6.1的DataProc映像版本1
  • 一个简单的SparkStream Kinesis脚本归结为:

  • DataProc Image Version 1 with Spark 1.6.1 with Hadoop 2.7
  • A simple SparkStream Kinesis Script that boils down to this:

# Run the script as
# spark-submit  \
#    --packages org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1\
#    demo_kinesis_streaming.py\
#    --awsAccessKeyId FOO\
#    --awsSecretKey BAR\
#    ... 

import argparse

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.storagelevel import StorageLevel

from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream

ap = argparse.ArgumentParser()
ap.add_argument('--awsAccessKeyId', required=True)
ap.add_argument('--awsSecretKey', required=True)
ap.add_argument('--stream_name')
ap.add_argument('--region')
ap.add_argument('--app_name')
ap = ap.parse_args()

kinesis_application_name = ap.app_name
kinesis_stream_name = ap.stream_name
kinesis_region = ap.region
kinesis_endpoint_url = 'https://kinesis.{}.amazonaws.com'.format(ap.region)

spark_context = SparkContext(appName=kinesis_application_name)
streamingContext = StreamingContext(spark_context, 60)

kinesisStream = KinesisUtils.createStream(
    ssc=streamingContext,
    kinesisAppName=kinesis_application_name,
    streamName=kinesis_stream_name,
    endpointUrl=kinesis_endpoint_url,
    regionName=kinesis_region,
    initialPositionInStream=InitialPositionInStream.TRIM_HORIZON,
    checkpointInterval=60,
    storageLevel=StorageLevel.MEMORY_AND_DISK_2,
    awsAccessKeyId=ap.awsAccessKeyId,
    awsSecretKey=ap.awsSecretKey
)

kinesisStream.pprint()

streamingContext.start()
streamingContext.awaitTermination()

  • 该代码已经在AWS EMR和本地环境中使用相同的Spark 1.6.1和Hadoop 2.7设置进行了工作测试.

  • The code had been tested working on AWS EMR and on local environment using the same Spark 1.6.1 with Hadoop 2.7 setup.

    1. 通过gcloud命令提交作业;
    2. ssh进入群集主节点并以yarn客户端模式运行;
    3. ssh进入群集主节点并以local[*]身份运行.
    1. Submit job via gcloud command;
    2. ssh into Cluster Master Node and run in yarn client mode;
    3. ssh into Cluster Master Node and run as local[*].

  • 通过使用以下值更新/etc/spark/conf/log4.properties启用详细日志记录:

    Upon enabling verbose logging by updating /etc/spark/conf/log4.properties with the following value:

        log4j.rootCategory=DEBUG, console
        log4j.appender.console=org.apache.log4j.ConsoleAppender
        log4j.appender.console.target=System.err
        log4j.appender.console.layout=org.apache.log4j.PatternLayout
        log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n
        log4j.logger.org.eclipse.jetty=ERROR
        log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
        log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=DEBUG
        log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=DEBUG
        log4j.logger.org.apache.spark=DEBUG 
        log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=DEBUG
        log4j.logger.org.spark-project.jetty.server.handler.ContextHandler=DEBUG
        log4j.logger.org.apache=DEBUG
        log4j.logger.com.amazonaws=DEBUG
    


    我们在日志中注意到了一些奇怪的东西(请注意,spark-streaming-kinesis-asl_2.10:1.6.1使用aws-sdk-java/1.9.37作为依赖项,而以某种方式使用aws-sdk-java/1.7.4 [由用户代理建议]):


    We've notice something weird in the log(Note that spark-streaming-kinesis-asl_2.10:1.6.1 uses aws-sdk-java/1.9.37 as dependence while somehow aws-sdk-java/1.7.4 was used [suggested by user-agent]):

        16/07/10 06:30:16 DEBUG com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer: PROCESS task encountered execution exception:
        java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
            at java.util.concurrent.FutureTask.report(FutureTask.java:122)
            at java.util.concurrent.FutureTask.get(FutureTask.java:192)
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.checkAndSubmitNextTask(ShardConsumer.java:137)
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardConsumer.consumeShard(ShardConsumer.java:126)
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:334)
            at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
    
        Caused by: java.lang.NoSuchMethodError: com.amazonaws.services.kinesis.model.GetRecordsResult.getMillisBehindLatest()Ljava/lang/Long;
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:119)
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48)
            at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
    
        content-length:282
        content-type:application/x-amz-json-1.1
        host:kinesis.ap-southeast-2.amazonaws.com
        user-agent:SparkDemo,amazon-kinesis-client-library-java-1.4.0, aws-sdk-java/1.7.4 Linux/3.16.0-4-amd64 OpenJDK_64-Bit_Server_VM/25.91-b14/1.8.0_91
        x-amz-date:20160710T063016Z
        x-amz-target:Kinesis_20131202.GetRecords
    


    看来DataProc已经使用较旧的AWS开发工具包作为依赖关系来构建自己的Spark,并且与需要更新版本的AWS开发工具包的代码一起使用时,它会崩溃,尽管我们不确定确切的模块是什么导致了此错误.

    更新: 基于@DennisHuo的评论,此行为是由Hadoop的泄漏类路径引起的: https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650

    Update: Base on @DennisHuo's comment, this behaviour is caused by Hadoop's leaky classpath: https://github.com/apache/hadoop/blob/branch-2.7.2/hadoop-project/pom.xml#L650

    最糟糕的是,AWS KCL 1.4.0(Spark 1.6.1使用)

    To make things worst, the AWS KCL 1.4.0 (used by Spark 1.6.1) will suppress any runtime error silently instead of throwing RuntimeException and causing a lot of headache while debugging.

    最终,我们的解决方案是在所有com.amazonaws.*阴影下构建org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1.

    Eventually Our solution was to build our org.apache.spark:spark-streaming-kinesis-asl_2.10:1.6.1 with all of its com.amazonaws.* shaded.

    使用以下pom构建JAR(更新spark/extra/kinesis-asl/pom.xml),并在spark-submit

    Building the JAR with the following pom (update spark/extra/kinesis-asl/pom.xml) and shit the new JAR with --jars flag in spark-submit

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-parent_2.10</artifactId>
        <version>1.6.1</version>
        <relativePath>../../pom.xml</relativePath>
      </parent>
    
      <!-- Kinesis integration is not included by default due to ASL-licensed code. -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kinesis-asl_2.10</artifactId>
      <packaging>jar</packaging>
      <name>Spark Kinesis Integration</name>
    
      <properties>
        <sbt.project.name>streaming-kinesis-asl</sbt.project.name>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.binary.version}</artifactId>
          <version>${project.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_${scala.binary.version}</artifactId>
          <version>${project.version}</version>
          <type>test-jar</type>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_${scala.binary.version}</artifactId>
          <version>${project.version}</version>
          <type>test-jar</type>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>amazon-kinesis-client</artifactId>
          <version>${aws.kinesis.client.version}</version>
        </dependency>
        <dependency>
          <groupId>com.amazonaws</groupId>
          <artifactId>amazon-kinesis-producer</artifactId>
          <version>${aws.kinesis.producer.version}</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.mockito</groupId>
          <artifactId>mockito-core</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.scalacheck</groupId>
          <artifactId>scalacheck_${scala.binary.version}</artifactId>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-test-tags_${scala.binary.version}</artifactId>
        </dependency>
      </dependencies>
    
      <build>
        <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
        <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
    
        <plugins>
            <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-shade-plugin</artifactId>
              <configuration>
                <shadedArtifactAttached>false</shadedArtifactAttached>
    
                <artifactSet>
                  <includes>
                    <!-- At a minimum we must include this to force effective pom generation -->
                    <include>org.spark-project.spark:unused</include>
                    <include>com.amazonaws:*</include>
                  </includes>
                </artifactSet>
    
                <relocations>
                  <relocation>
                    <pattern>com.amazonaws</pattern>
                    <shadedPattern>foo.bar.YO.com.amazonaws</shadedPattern>
                    <includes>
                      <include>com.amazonaws.**</include>
                    </includes>
                  </relocation>
                </relocations>
    
              </configuration>
              <executions>
                <execution>
                  <phase>package</phase>
                  <goals>
                    <goal>shade</goal>
                  </goals>
                </execution>
              </executions>
            </plugin>
        </plugins>
      </build>
    </project>
    

    这篇关于带有Spark 1.6.1 Hadoop 2.7.2的Google Dataproc中具有空记录的Kinesis流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    相关文章
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆