spark-streaming相关内容
我正在使用 spark directStream api 从 Kafka 读取数据.我的代码如下: val sparkConf = new SparkConf().setAppName("testdirectStreaming")val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(2))va
..
我们使用 Apache Spark 1.5.1 和 kafka_2.10-0.8.2.1 以及 Kafka DirectStream API 使用 Spark 从 Kafka 获取数据. 我们使用以下设置在 Kafka 中创建了主题 ReplicationFactor :1 和 Replica : 1 当所有 Kafka 实例都在运行时,Spark 作业工作正常.然而,当集群中的
..
在我的场景中,我有几个数据集不时出现,我需要在我们的平台中摄取它们.摄取过程涉及几个转换步骤.其中之一是 Spark.到目前为止,我特别使用火花结构化流媒体.基础设施还涉及 kafka,spark 结构化流从中读取数据. 我想知道是否有一种方法可以检测到某个主题在一段时间内没有其他东西可以消费时决定停止工作.那就是我想在消耗该特定数据集所需的时间内运行它,然后停止它.出于特定原因,我们决定不
..
我正在使用 Kafka 的结构化流源(集成指南),如前所述,它没有提交任何偏移量. 我的目标之一是监控它(检查它是否落后等).即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们.根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是: 它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程
..
我会尽量简化我要解决的问题.我有一个从 JSON 文件中读取的员工数据流,其架构如下: StructType([ \StructField("timeStamp", TimestampType()),\StructField("emp_id", LongType()),\StructField(“on_duty", LongType()) ])# on_duty 是一个 int boolean-
..
我一直在做火花流作业,通过 kafka 消费和生产数据.我用的是directDstream,所以必须自己管理offset,我们采用redis来写和读offset.现在有一个问题,当我启动我的客户端时,我的客户端需要从redis中获取offset,而不是kafka中存在的offset它自己.如何显示我编写的代码?现在我已经在下面编写了代码: kafka_stream = KafkaUtils.c
..
我正在为 Spark 流的实现而苦苦挣扎. 来自 kafka 的消息看起来像这样,但有更多的字段 {"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}{"event":"databasedata", "mysql":"sensors", "payload": {"json 形式的实际数
..
我正在尝试为 Spark 流准备应用程序(Spark 2.1、Kafka 0.10) 我需要从Kafka主题“输入"读取数据,找到正确的数据并将结果写入主题“输出" 我可以基于 KafkaUtils.createDirectStream 方法从 Kafka 读取数据. 我将 RDD 转换为 json 并准备过滤器: val messages = KafkaUtils.creat
..
如何使用直接流 API 为 kafka Spark 流指定消费者组 ID. HashMapkafkaParams = new HashMap();kafkaParams.put("metadata.broker.list", brokers);kafkaParams.put("auto.offset.reset", "最大");kafkaParams.put("group.id", "app1"
..
我正在尝试将数据从 kafka 传递到 Spark 流. 这就是我迄今为止所做的: 安装了 kafka 和 spark 使用默认属性配置启动 zookeeper 使用默认属性配置启动 kafka 服务器 开始kafka生产者 开始kafka消费者 从生产者向消费者发送消息.工作正常. 编写 kafka-spark.py 以接收来自 kafka 的消息以进行 spark.
..
我正在尝试使用 createDirectStream 方法打开 Kafka(尝试过 0.11.0.2 和 1.0.1 版)流并收到此 AbstractMethodError 错误: 线程“main"中的异常 java.lang.AbstractMethodError在 org.apache.spark.internal.Logging$class.initializeLogIfNecessary
..
我正在尝试从 kafka 接收流数据.在这个过程中,我能够接收流数据并将其存储到 JavaPairInputDStream 中.现在我需要分析这些数据而不将其存储到任何数据库中.所以我想将此 JavaPairInputDStream 转换为 DataSet 或 DataFrame 到目前为止我尝试的是: import java.util.Arrays;导入 java.util.Collec
..
我想创建一个可以实时读取日志的系统,并使用apache spark来处理它.我很困惑,如果我应该使用 kafka 或水槽之类的东西将日志传递给 Spark 流,还是应该使用套接字传递日志.我已经浏览了 Spark 流文档中的示例程序 - Spark 流示例.但是,如果有人能指导我更好地将日志传递给火花流,我将不胜感激.对我来说,这是一个新的领域. 解决方案 Apache Flume 可能有
..
我正在尝试设置 Spark Streaming 以从 Kafka 队列获取消息.我收到以下错误: py4j.protocol.Py4JJavaError:调用 o30.createDirectStream 时发生错误.: org.apache.spark.SparkException: java.nio.channels.ClosedChannelExceptionorg.apache.spar
..
我正在从消息应用程序收集数据,我目前正在使用 Flume,它每天发送大约 5000 万条记录 我想使用 Kafka,使用 Spark Streaming 从 Kafka 消费并将其持久化到 hadoop 并使用 impala 进行查询 我尝试过的每种方法都有问题.. 方法 1 - 将 RDD 保存为 parquet,将外部 hive parquet 表指向 parquet 目录
..
关于 Kafka 主题分区 -> 火花流资源利用,我有一些用例需要更清楚地说明. 我使用 spark 独立模式,所以我只有“执行程序总数"和“执行程序内存"的设置.据我所知,根据文档,将并行性引入 Spark 流的方法是使用分区的 Kafka 主题 -> 当我使用 spark-kafka 直接流集成时,RDD 将具有与 kafka 相同数量的分区. 因此,如果主题中有 1 个分区和 1
..
我们正在从 Kafka 接收火花流中的数据.一旦在 Spark Streaming 中开始执行,它只执行一个批次,其余批次在 Kafka 中开始排队. 我们的数据是独立的,可以并行处理. 我们尝试了具有多个执行器、内核、背压和其他配置的多个配置,但到目前为止没有任何效果.有很多消息排队,一次只处理了一个微批次,其余的留在队列中. 我们希望最大程度地实现并行性,以便没有任何微批处理
..
我有一个 kafka 生产者以 格式发送大量数据 {'1000':{'3':{'seq': '1','状态':'2','CMD': '异或'}},'1001':{'5':{'seq': '2','状态':'2','CMD': '或'}},'1003':{'5':{'seq': '3','状态':'4','CMD': '异或'}}} ....我想要的数据在最后一个循环中:{'seq': '1'
..
当我尝试执行我的 kafka spark 项目时.我收到以下错误: 线程“main"中的异常 java.lang.NoClassDefFoundError: org/spark_project/guava/cache/CacheLoader在 org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73)在 org.
..
我有一个使用 Spark Streaming 的项目,我正在使用“spark-submit"运行它,但我遇到了这个错误: 15/01/14 10:34:18 ERROR ReceiverTracker:流 0 的取消注册接收器:启动接收器 0 时出错 - java.lang.AbstractMethodError在 org.apache.spark.Logging$class.log(Loggi
..