如何集成 Spark 和 Kafka 进行直播 [英] How to integrate Spark and Kafka for direct stream
问题描述
我在创建一个基本的 Spark 流应用程序时遇到了困难.
I am having difficulties creating a basic spark streaming application.
现在,我正在我的本地机器上尝试它.
Right now, am trying it on my local machine.
我已经完成了以下设置.
I have done following setup.
-设置 Zookeeper
-Setup Zookeeper
-设置Kafka(版本:kafka_2.10-0.9.0.1)
-Setup Kafka ( Version : kafka_2.10-0.9.0.1)
-使用以下命令创建主题
-Created a topic using below command
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
-使用以下命令在两个不同的 cmd 终端上启动生产者和消费者
-Started producer and consumer on two different cmd terminals using below commands
制作人:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
消费者:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
现在我可以在消费者控制台中接收我在生产者终端中输入的数据了.
Now I can receive the data which I enter in the producer terminal in consumer console.
现在正在尝试将 Kafka 集成到 Apache Spark 流中.
Now am trying to integrate Kafka into Apache Spark streaming.
以下是我从官方文档中引用的示例代码.Kafka &Spark 设置 和 Kafka &Spark 集成
Below is a sample code which I referenced from official documents. Kafka & Spark Setup and Kafka & Spark Integration
public class KafkaStreamingTry {
static Map<String, Object> kafkaParams = new HashMap<>();
public static void main(String[] args) throws InterruptedException, StreamingQueryException {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("Sampleapp");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "0");
kafkaParams.put("auto.offset.reset", "earliest"); // from-beginning?
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("test");
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
System.out.println("Direct Stream created? ");
stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
System.out.println("record key : "+record.key()+" value is : "+record.value());
return new Tuple2<>(record.key(), record.value());
}
});
System.out.println("Reached the end.");
}
}
如果我运行这个程序,下面是日志.
If I run this program, following is the log.
"C:\Program Files\Java\jdk1.8.0_11\bin\java" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.4\lib\idea_rt.jar=51332:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2017.1.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.8.0_11\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\jfxrt.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jfxswt.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_11\jre\lib\rt.jar;C:\Users\driftking9987\IdeaProjects\sparktest\target\classes;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-streaming_2.10\2.1.0\spark-streaming_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-tags_2.10\2.1.0\spark-tags_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\scalatest\scalatest_2.10\2.2.6\scalatest_2.10-2.2.6.jar;C:\Users\driftking9987\.m2\repository\org\spark-project\spark\unused\1.0.0\unused-1.0.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-core_2.10\2.1.0\spark-core_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-mapred\1.7.7\avro-mapred-1.7.7-hadoop2.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-ipc\1.7.7\avro-ipc-1.7.7.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro\1.7.7\avro-1.7.7.jar;C:\Users\driftking9987\.m2\repository\org\apache\avro\avro-ipc\1.7.7\avro-ipc-1.7.7-tests.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\jackson\jackson-core-asl\1.9.13\jackson-core-asl-1.9.13.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\jackson\jackson-mapper-asl\1.9.13\jackson-mapper-asl-1.9.13.jar;C:\Users\driftking9987\.m2\repository\com\twitter\chill_2.10\0.8.0\chill_2.10-0.8.0.jar;C:\Users\driftking9987\.m2\repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;C:\Users\driftking9987\.m2\repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;C:\Users\driftking9987\.m2\repository\org\objenesis\objenesis\2.1\objenesis-2.1.jar;C:\Users\driftking9987\.m2\repository\com\twitter\chill-java\0.8.0\chill-java-0.8.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\xbean\xbean-asm5-shaded\4.4\xbean-asm5-shaded-4.4.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-client\2.2.0\hadoop-client-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-common\2.2.0\hadoop-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\commons-cli\commons-cli\1.2\commons-cli-1.2.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-math\2.1\commons-math-2.1.jar;C:\Users\driftking9987\.m2\repository\xmlenc\xmlenc\0.52\xmlenc-0.52.jar;C:\Users\driftking9987\.m2\repository\commons-io\commons-io\2.1\commons-io-2.1.jar;C:\Users\driftking9987\.m2\repository\commons-lang\commons-lang\2.5\commons-lang-2.5.jar;C:\Users\driftking9987\.m2\repository\commons-configuration\commons-configuration\1.6\commons-configuration-1.6.jar;C:\Users\driftking9987\.m2\repository\commons-collections\commons-collections\3.2.1\commons-collections-3.2.1.jar;C:\Users\driftking9987\.m2\repository\commons-digester\commons-digester\1.8\commons-digester-1.8.jar;C:\Users\driftking9987\.m2\repository\commons-beanutils\commons-beanutils\1.7.0\commons-beanutils-1.7.0.jar;C:\Users\driftking9987\.m2\repository\commons-beanutils\commons-beanutils-core\1.8.0\commons-beanutils-core-1.8.0.jar;C:\Users\driftking9987\.m2\repository\com\google\protobuf\protobuf-java\2.5.0\protobuf-java-2.5.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-auth\2.2.0\hadoop-auth-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-compress\1.4.1\commons-compress-1.4.1.jar;C:\Users\driftking9987\.m2\repository\org\tukaani\xz\1.0\xz-1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-hdfs\2.2.0\hadoop-hdfs-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\mortbay\jetty\jetty-util\6.1.26\jetty-util-6.1.26.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-app\2.2.0\hadoop-mapreduce-client-app-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-common\2.2.0\hadoop-mapreduce-client-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-client\2.2.0\hadoop-yarn-client-2.2.0.jar;C:\Users\driftking9987\.m2\repository\com\google\inject\guice\3.0\guice-3.0.jar;C:\Users\driftking9987\.m2\repository\javax\inject\javax.inject\1\javax.inject-1.jar;C:\Users\driftking9987\.m2\repository\aopalliance\aopalliance\1.0\aopalliance-1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-server-common\2.2.0\hadoop-yarn-server-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-shuffle\2.2.0\hadoop-mapreduce-client-shuffle-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-api\2.2.0\hadoop-yarn-api-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-core\2.2.0\hadoop-mapreduce-client-core-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-yarn-common\2.2.0\hadoop-yarn-common-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-mapreduce-client-jobclient\2.2.0\hadoop-mapreduce-client-jobclient-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\hadoop\hadoop-annotations\2.2.0\hadoop-annotations-2.2.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-launcher_2.10\2.1.0\spark-launcher_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-network-common_2.10\2.1.0\spark-network-common_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\fusesource\leveldbjni\leveldbjni-all\1.8\leveldbjni-all-1.8.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.6.5\jackson-annotations-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-network-shuffle_2.10\2.1.0\spark-network-shuffle_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-unsafe_2.10\2.1.0\spark-unsafe_2.10-2.1.0.jar;C:\Users\driftking9987\.m2\repository\net\java\dev\jets3t\jets3t\0.7.1\jets3t-0.7.1.jar;C:\Users\driftking9987\.m2\repository\commons-codec\commons-codec\1.3\commons-codec-1.3.jar;C:\Users\driftking9987\.m2\repository\commons-httpclient\commons-httpclient\3.1\commons-httpclient-3.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-recipes\2.4.0\curator-recipes-2.4.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-framework\2.4.0\curator-framework-2.4.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\curator\curator-client\2.4.0\curator-client-2.4.0.jar;C:\Users\driftking9987\.m2\repository\com\google\guava\guava\14.0.1\guava-14.0.1.jar;C:\Users\driftking9987\.m2\repository\javax\servlet\javax.servlet-api\3.1.0\javax.servlet-api-3.1.0.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-math3\3.4.1\commons-math3-3.4.1.jar;C:\Users\driftking9987\.m2\repository\com\google\code\findbugs\jsr305\1.3.9\jsr305-1.3.9.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\slf4j-api\1.7.16\slf4j-api-1.7.16.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\jul-to-slf4j\1.7.16\jul-to-slf4j-1.7.16.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\jcl-over-slf4j\1.7.16\jcl-over-slf4j-1.7.16.jar;C:\Users\driftking9987\.m2\repository\log4j\log4j\1.2.17\log4j-1.2.17.jar;C:\Users\driftking9987\.m2\repository\org\slf4j\slf4j-log4j12\1.7.16\slf4j-log4j12-1.7.16.jar;C:\Users\driftking9987\.m2\repository\com\ning\compress-lzf\1.0.3\compress-lzf-1.0.3.jar;C:\Users\driftking9987\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\driftking9987\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\driftking9987\.m2\repository\org\roaringbitmap\RoaringBitmap\0.5.11\RoaringBitmap-0.5.11.jar;C:\Users\driftking9987\.m2\repository\commons-net\commons-net\2.2\commons-net-2.2.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-jackson_2.10\3.2.11\json4s-jackson_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-core_2.10\3.2.11\json4s-core_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\org\json4s\json4s-ast_2.10\3.2.11\json4s-ast_2.10-3.2.11.jar;C:\Users\driftking9987\.m2\repository\com\thoughtworks\paranamer\paranamer\2.6\paranamer-2.6.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scalap\2.10.0\scalap-2.10.0.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-compiler\2.10.0\scala-compiler-2.10.0.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-client\2.22.2\jersey-client-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\ws\rs\javax.ws.rs-api\2.0.1\javax.ws.rs-api-2.0.1.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-api\2.4.0-b34\hk2-api-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-utils\2.4.0-b34\hk2-utils-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\external\aopalliance-repackaged\2.4.0-b34\aopalliance-repackaged-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\external\javax.inject\2.4.0-b34\javax.inject-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\hk2-locator\2.4.0-b34\hk2-locator-2.4.0-b34.jar;C:\Users\driftking9987\.m2\repository\org\javassist\javassist\3.18.1-GA\javassist-3.18.1-GA.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-common\2.22.2\jersey-common-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\annotation\javax.annotation-api\1.2\javax.annotation-api-1.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\bundles\repackaged\jersey-guava\2.22.2\jersey-guava-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\hk2\osgi-resource-locator\1.0.1\osgi-resource-locator-1.0.1.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\core\jersey-server\2.22.2\jersey-server-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\media\jersey-media-jaxb\2.22.2\jersey-media-jaxb-2.22.2.jar;C:\Users\driftking9987\.m2\repository\javax\validation\validation-api\1.1.0.Final\validation-api-1.1.0.Final.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet\2.22.2\jersey-container-servlet-2.22.2.jar;C:\Users\driftking9987\.m2\repository\org\glassfish\jersey\containers\jersey-container-servlet-core\2.22.2\jersey-container-servlet-core-2.22.2.jar;C:\Users\driftking9987\.m2\repository\io\netty\netty-all\4.0.42.Final\netty-all-4.0.42.Final.jar;C:\Users\driftking9987\.m2\repository\io\netty\netty\3.8.0.Final\netty-3.8.0.Final.jar;C:\Users\driftking9987\.m2\repository\com\clearspring\analytics\stream\2.7.0\stream-2.7.0.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-core\3.1.2\metrics-core-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-jvm\3.1.2\metrics-jvm-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-json\3.1.2\metrics-json-3.1.2.jar;C:\Users\driftking9987\.m2\repository\io\dropwizard\metrics\metrics-graphite\3.1.2\metrics-graphite-3.1.2.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.6.5\jackson-databind-2.6.5.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.6.5\jackson-core-2.6.5.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\module\jackson-module-scala_2.10\2.6.5\jackson-module-scala_2.10-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-reflect\2.10.6\scala-reflect-2.10.6.jar;C:\Users\driftking9987\.m2\repository\com\fasterxml\jackson\module\jackson-module-paranamer\2.6.5\jackson-module-paranamer-2.6.5.jar;C:\Users\driftking9987\.m2\repository\org\apache\ivy\ivy\2.4.0\ivy-2.4.0.jar;C:\Users\driftking9987\.m2\repository\oro\oro\2.0.8\oro-2.0.8.jar;C:\Users\driftking9987\.m2\repository\net\razorvine\pyrolite\4.13\pyrolite-4.13.jar;C:\Users\driftking9987\.m2\repository\net\sf\py4j\py4j\0.10.4\py4j-0.10.4.jar;C:\Users\driftking9987\.m2\repository\org\apache\commons\commons-crypto\1.0.0\commons-crypto-1.0.0.jar;C:\Users\driftking9987\.m2\repository\org\scala-lang\scala-library\2.10.6\scala-library-2.10.6.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-streaming-kafka-0-10_2.10\2.1.1\spark-streaming-kafka-0-10_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\kafka\kafka_2.10\0.10.2.1\kafka_2.10-0.10.2.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\kafka\kafka-clients\0.10.2.1\kafka-clients-0.10.2.1.jar;C:\Users\driftking9987\.m2\repository\net\sf\jopt-simple\jopt-simple\5.0.3\jopt-simple-5.0.3.jar;C:\Users\driftking9987\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;C:\Users\driftking9987\.m2\repository\com\101tec\zkclient\0.10\zkclient-0.10.jar;C:\Users\driftking9987\.m2\repository\org\apache\zookeeper\zookeeper\3.4.9\zookeeper-3.4.9.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sql_2.10\2.1.1\spark-sql_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\com\univocity\univocity-parsers\2.2.1\univocity-parsers-2.2.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sketch_2.10\2.1.1\spark-sketch_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-catalyst_2.10\2.1.1\spark-catalyst_2.10-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\janino\janino\3.0.0\janino-3.0.0.jar;C:\Users\driftking9987\.m2\repository\org\codehaus\janino\commons-compiler\3.0.0\commons-compiler-3.0.0.jar;C:\Users\driftking9987\.m2\repository\org\antlr\antlr4-runtime\4.5.3\antlr4-runtime-4.5.3.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-column\1.8.1\parquet-column-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-common\1.8.1\parquet-common-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-encoding\1.8.1\parquet-encoding-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-hadoop\1.8.1\parquet-hadoop-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-format\2.3.0-incubating\parquet-format-2.3.0-incubating.jar;C:\Users\driftking9987\.m2\repository\org\apache\parquet\parquet-jackson\1.8.1\parquet-jackson-1.8.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-sql-kafka-0-10_2.11\2.1.1\spark-sql-kafka-0-10_2.11-2.1.1.jar;C:\Users\driftking9987\.m2\repository\org\apache\spark\spark-tags_2.11\2.1.1\spark-tags_2.11-2.1.1.jar" kafkatry
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/07/03 01:23:55 INFO SparkContext: Running Spark version 2.1.0
17/07/03 01:23:55 WARN SparkContext: Support for Scala 2.10 is deprecated as of Spark 2.1.0
17/07/03 01:23:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/07/03 01:23:56 INFO SecurityManager: Changing view acls to: driftking9987
17/07/03 01:23:56 INFO SecurityManager: Changing modify acls to: driftking9987
17/07/03 01:23:56 INFO SecurityManager: Changing view acls groups to:
17/07/03 01:23:56 INFO SecurityManager: Changing modify acls groups to:
17/07/03 01:23:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(driftking9987); groups with view permissions: Set(); users with modify permissions: Set(driftking9987); groups with modify permissions: Set()
17/07/03 01:23:57 INFO Utils: Successfully started service 'sparkDriver' on port 51353.
17/07/03 01:23:57 INFO SparkEnv: Registering MapOutputTracker
17/07/03 01:23:57 INFO SparkEnv: Registering BlockManagerMaster
17/07/03 01:23:57 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
17/07/03 01:23:57 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
17/07/03 01:23:58 INFO DiskBlockManager: Created local directory at C:\Users\driftking9987\AppData\Local\Temp\blockmgr-4535fb48-e3c2-4af7-a57a-c5c54c2d9ed1
17/07/03 01:23:58 INFO MemoryStore: MemoryStore started with capacity 352.5 MB
17/07/03 01:23:58 INFO SparkEnv: Registering OutputCommitCoordinator
17/07/03 01:23:58 INFO Utils: Successfully started service 'SparkUI' on port 4040.
17/07/03 01:23:58 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.56.1:4040
17/07/03 01:23:59 INFO Executor: Starting executor ID driver on host localhost
17/07/03 01:23:59 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 51374.
17/07/03 01:23:59 INFO NettyBlockTransferService: Server created on 192.168.56.1:51374
17/07/03 01:23:59 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
17/07/03 01:23:59 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.56.1, 51374, None)
17/07/03 01:23:59 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.56.1:51374 with 352.5 MB RAM, BlockManagerId(driver, 192.168.56.1, 51374, None)
17/07/03 01:23:59 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.56.1, 51374, None)
17/07/03 01:23:59 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.56.1, 51374, None)
17/07/03 01:24:00 WARN KafkaUtils: overriding enable.auto.commit to false for executor
17/07/03 01:24:00 WARN KafkaUtils: overriding auto.offset.reset to none for executor
17/07/03 01:24:00 WARN KafkaUtils: overriding executor group.id to spark-executor-0
17/07/03 01:24:00 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135
17/07/03 01:24:00 INFO SparkContext: Invoking stop() from shutdown hook
17/07/03 01:24:00 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
17/07/03 01:24:00 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/03 01:24:00 INFO MemoryStore: MemoryStore cleared
17/07/03 01:24:00 INFO BlockManager: BlockManager stopped
17/07/03 01:24:00 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/03 01:24:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/07/03 01:24:00 INFO SparkContext: Successfully stopped SparkContext
17/07/03 01:24:00 INFO ShutdownHookManager: Shutdown hook called
17/07/03 01:24:00 INFO ShutdownHookManager: Deleting directory C:\Users\driftking9987\AppData\Local\Temp\spark-cd270e2f-257c-4878-88b4-8f908d65f76a
Process finished with exit code 0
现在如果我添加
jssc.start();// Start the computation
jssc.awaitTermination();
它给出了以下错误,
17/07/03 01:26:46 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at KafkaStreamingTry.main(KafkaStreamingTry.java:74)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:513)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:573)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:572)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at KafkaStreamingTry.main(KafkaStreamingTry.java:74)
17/07/03 01:26:46 INFO SparkContext: Invoking stop() from shutdown hook
17/07/03 01:26:46 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
17/07/03 01:26:46 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
17/07/03 01:26:46 INFO MemoryStore: MemoryStore cleared
17/07/03 01:26:46 INFO BlockManager: BlockManager stopped
17/07/03 01:26:46 INFO BlockManagerMaster: BlockManagerMaster stopped
17/07/03 01:26:46 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
17/07/03 01:26:46 INFO SparkContext: Successfully stopped SparkContext
17/07/03 01:26:46 INFO ShutdownHookManager: Shutdown hook called
这是显而易见的.
请告诉我如何让它运行?我的要求是,每当我在生产者终端中输入任何数据时,java 应用程序都应该确认它并至少打印一次.我会试着找出我将收到的 json 上的计算部分.
Can please tell me how can I make it run? My requirement is that whenever I enter any data in producer terminal, the java application should acknowledge it and atleast print it once. I will try to figure out the calculation part on the json which I will receive.
这是一个问题我之前问过并尝试遵循建议的架构.
This is a question which I asked before and trying to follow the suggested architecture.
谢谢
POM.XML
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cs</groupId>
<artifactId>sparktest</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.6</version>
</dependency>
<!--For kafka integgration-->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.10</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10_2.11 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
</project>
推荐答案
在使用 Kafka 创建直接流后,您创建 JavaPairDStream
.现在您可以迭代 JavaPairDStream
并打印您的 Kafka 消息的键和值.
After creating the direct stream with Kafka you create JavaPairDStream
. Now you can iterate the JavaPairDStream
and print the key and value of your Kafka messages.
JavaPairDStream<String, String> jPairDStream = stream.mapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception {
return new Tuple2<>(record.key(), record.value());
}
});
jPairDStream.foreachRDD(jPairRDD -> {
jPairRDD.foreach(rdd -> {
System.out.println("key="+rdd._1()+" value="+rdd._2());
});
});
jssc.start();
jssc.awaitTermination();
您与 kafka-console-producer.bat
一起使用的命令将生成消息,其中键将为空.为了在主题 test
上生成带有键和值的消息,请使用以下命令.在控制台中输入逗号分隔的键和值,如 key1,value1
The command you are using with kafka-console-producer.bat
will produce message in which key will be null. In order to produce messages with both key and value on topic test
use below command. Enter comma separated key and value in console like key1,value1
kafka-console-producer.bat --property parse.key=true --property key.separator=, --broker-list localhost:9092 --topic test
您的 pom 文件具有不同版本的 Spark 工件.确保对所有工件使用相同的版本.您需要以下依赖项才能运行您的程序.
Your pom file has different versions for spark artifacts. Make sure you use same version for all artifacts. You need following dependencies to run your program.
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
这篇关于如何集成 Spark 和 Kafka 进行直播的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!