spark-streaming相关内容
SparkStreaming 上下文从 RabbitMQ 读取一个流,间隔为 30 秒.我想修改 cassandra 中存在的相应行的几列的值,然后想将数据存储回 Cassandra.为此,我需要检查特定主键的行是否存在于 Cassandra 中,如果是,获取它并执行必要的操作.但问题是,我在驱动程序上创建 StreamingContext 并在 Worker 上执行操作.因此,他们无法获得 St
..
我对Spark的fileStream()方法的理解是它以三种类型为参数:Key、Value和Format.对于文本文件,适当的类型是:LongWritable、Text 和 TextInputFormat. 首先,我想了解这些类型的本质.直觉上,我猜在这种情况下 Key 是文件的行号,而 Value 是该行上的文本.因此,在以下文本文件示例中: 你好测试另一个测试 DStream 的第一
..
我一直在尝试在 YARN client 模式下运行 spark-shell,但是我收到了很多 ClosedChannelException 错误.我正在为 Hadoop 2.6 使用 spark 2.0.0 版本. 以下是例外: $ spark-2.0.0-bin-hadoop2.6/bin/spark-shell --master yarn --deploy-mode 客户端将默认日志级
..
我在通过火花流从 azure blob 读取数据时遇到问题 JavaDStreamlines = ssc.textFileStream("hdfs://ip:8020/directory"); 上面的代码适用于 HDFS,但无法从 Azure blob 读取文件 https://blobstorage.blob.core.windows.net/containerid/folder1/ 以
..
出于结帐目的,我尝试将 Amazon S3 存储桶设置为检查点文件. val checkpointDir = "s3a://bucket-name/checkpoint.txt"val sc = 新的 SparkContext(conf)sc.setLocalProperty("spark.default.parallelism", "30")sc.hadoopConfiguration.set
..
目标:通过 Spark 流从 Kinesis 读取数据并将数据以 Parquet 格式存储到 S3. 情况:应用程序最初运行良好,运行批次为 1 小时,处理时间平均不到 30 分钟.出于某种原因,可以说应用程序崩溃了,我们尝试从检查点重新启动.处理现在需要永远并且不会继续前进.我们试图以 1 分钟的批处理间隔测试相同的东西,处理运行良好,批处理需要 1.2 分钟才能完成.当我们从检查点恢复时,每
..
说明 我们在 Scala 中有一个 Spark Streaming 1.5.2 应用程序,它从 Kinesis Stream 读取 JSON 事件,执行一些转换/聚合并将结果写入不同的 S3 前缀.当前批处理间隔为 60 秒.我们有 3000-7000 个事件/秒.我们正在使用检查点来保护我们免于丢失聚合. 它运行良好有一段时间了,从异常中恢复,甚至集群重新启动.我们最近重新编译了 S
..
我正在构建一个 REST API,它在 Spark 集群中启动一些计算,并以分块的结果流进行响应.给定有计算结果的 Spark 流,我可以使用 dstream.foreachRDD() 将数据发送出 Spark.我正在使用 akka-http 发送分块的 HTTP 响应: val requestHandler: HttpRequest =>HttpResponse = {case HttpR
..
我有一个安装了 Spark 2.0 和 Zeppelin 0.6.1 的集群.由于 TwitterUtils.scala 类已从 Spark 项目移至 Apache Bahir,我无法再在我的 Zeppelin 笔记本中使用 TwitterUtils. 这是我的笔记本的片段: 依赖加载: %depz.resetz.load("org.apache.bahir:spark-stream
..
我刚刚从 http://zeppelin-project.org/docs/tutorial/tutorial.html(“流数据教程"部分).我现在遇到的问题是该应用程序似乎只能在本地工作.如果我将 Spark 解释器设置“master"从“local[*]"更改为“spark://master:7077",当我执行相同的 SQL 语句时,应用程序将不再产生任何结果.我做错了什么吗?我已经重新启
..
我的操作系统是 OS X 10.11.6.我正在运行 Spark 2.0、Zeppelin 0.6、Scala 2.11 当我在 Zeppelin 中运行此代码时,我收到了来自 Jackson 的异常.当我在 spark-shell 中运行此代码时 - 也不例外. val filestream = ssc.textFileStream("/Users/davidlaxer/first-ed
..
我使用的是 spark-sql 2.4.x 版本,Cassandra-3.x 版本使用的是 datastax-spark-cassandra-connector.与 kafka 一起. 我有一个来自 kafka 主题的财务数据的场景.data(基础数据集)包含 companyId, year , prev_year 字段信息. 如果列 year === prev_year 那么我需要加
..
我收到错误 SQLContext.gerorCreate is not a value of object org.apache.spark.SQLContext.这是我的代码 import org.apache.spark.SparkConf导入 org.apache.spark.streaming.StreamingContext导入 org.apache.spark.streaming.S
..
我使用的是 Spark Structure Streaming,代码如下: def convert_timestamp_to_datetime(timestamp):返回 datetime.fromtimestamp(timestamp)定义提取():火花 = SparkSession \.builder \.appName("StructuredNetworkWordCount") \.ge
..
我正在尝试查看我的 DataFrame 中的内容.. 这是火花代码 from pyspark.sql import SparkSession导入 pyspark.sql.functions 作为 psf导入日志导入时间火花 = SparkSession \.builder \.appName("控制台示例") \.getOrCreate()logging.info("开始监听主机..")线
..
我正在尝试解析本质上是动态的 Json 结构并加载到数据库中.但是在 json 内部有动态键的情况下面临困难.下面是我的示例 json:尝试使用爆炸功能但没有帮助.这里描述了类似的东西 如何解析嵌套 JSON 结果中的动态 JSON 键? {“_id":{“planId":“5f34dab0c661d8337097afb9",“版本":{“$numberLong":“1"},“句号":{“姓名
..
我在 Java 8 中使用 spark-sql-2.4.1 版本. 我有 raw_df 列“eventTs",这是 Long 数据类型的时间戳.我正在尝试将其转换回时间戳,如下所示: Datasetmodified_df = raw_df.withColumn("eventTimeStamp", ( col("eventTs").divide(1000) ).cast(DataTypes.
..
尝试在 Spark 结构化流方面理解 SparkSql. Spark Session 从 kafka 主题读取事件,将数据聚合到按不同列名分组的计数并将其打印到控制台. 原始输入数据结构如下: +--------------+------------+----------+-----------+-------+-----------+--------------------+-------
..
检查下面的代码.如果存在重复键,它将生成具有歧义的数据帧.我们应该如何修改代码以添加父列名称作为前缀. 添加了另一个包含 json 数据的列. scala>val df = 序列((77, "email1", """{"key1":38,"key3":39}""",""""{"name":";aaa","age":10}"""),(78, "email2", """{"key1":38,"
..
嗨,我有一个场景,传入的消息是一个 Json,它有一个标题说 tablename,数据部分有表列数据.现在我想把它写到 parquet 到单独的文件夹说 /emp 和 /dept.我可以通过基于表名聚合行在常规流中实现这一点.但是在结构化流媒体中,我无法拆分它.我如何才能在结构化流媒体中实现这一点. {"tableName":"employee","data":{"empid":1","em
..