apache-spark相关内容
我刚刚开始使用Spark,并在Amazon EC2实例上以独立模式运行它。我正在尝试文档中提到的示例,在查看这个名为Simple App的示例时,我不断收到以下错误: NameError:未定义名称‘Numas’ from pyspark import SparkContext logFile = "$YOUR_SPARK_HOME/README.md" # Should be some
..
我正在对电光结构化流进行窗口排序: val filterWindow: WindowSpec = Window .partitionBy("key") .orderBy($"time") controlDataFrame=controlDataFrame.withColumn("Make Coffee", $"value"). withColumn("datetime"
..
下面是消费RabbitMQ消息的电光流媒体代码。 import java.io.{ BufferedReader, InputStreamReader } import java.net.Socket import java.nio.charset.StandardCharsets import org.apache.spark.SparkConf import org.apache.sp
..
我在这方面完全是新手,如果有明显的错误,请原谅。 精确错误: 《在奴隶》: Info TransportClientFactory:69毫秒后成功创建到/10.2.10.128:7077的连接(0毫秒用于引导) 警告工作进程:无法连接到主10.2.10.128:7077 在Master: 资讯大师:我被选为领袖!新状态:活着 错误TransportRequestHandler:在RPC
..
我正在尝试实现一个使用自定义接收器从SQS读取消息的流作业。每封邮件都包含对S3文件的单个引用,然后我希望读取、解析该文件并将其存储为ORC。 以下是我到目前为止拥有的代码: val sc = new SparkContext(conf) val streamContext = new StreamingContext(sc, Seconds(5)) val sqs = stream
..
我正在更新代码,以便从updateStateByKey切换到mapWithState,以便基于2分钟的超时获得用户会话(2分钟仅用于测试目的)。每个会话应在超时之前聚合会话内的所有流数据(JSON字符串)。 这是我的旧代码: val membersSessions = stream.map[(String, (Long, Long, List[String]))](eventRecor
..
在我的方案中,我分解一个数组列,以便每行有一条记录,这样我就可以执行联接,然后将这些分解的列重新组合在一起 +--------------+-------+------------------------+ | body | ID | array_column | +--------------+-------+-------------------
..
我们正在运行一个Spark Stream作业,该作业从一个目录(使用TextFileStream)检索文件。 我们关注的一个问题是,作业已关闭,但文件仍在添加到目录中。 一旦作业重新启动,这些文件就不会被拾取(因为它们不是新的或在作业运行时更改),但我们希望处理它们。 1)有解决方案吗?有没有办法跟踪已处理的文件,以及是否可以“强制”拾取较旧的文件? 2)是否有方法删除已处理的文件?
..
我当前的结构化流应用程序写入一个巨大的Delta表。当我(停止流)并将其指向写入全新的增量表时: 它变得更快-批量持续时间减少了近四分之一 投入率几乎提高了3倍 我知道它可能会变得更快,因为它在旧的/更大的表上执行的任何聚合/写入在新表上都不需要。但投入率的变化我希望有人能解释一下? 源为Azure EventHubs。 谢谢! 推荐答案 回答我自己的问题:
..
我已经编写了一个数据集火花作业(批处理)代码来扁平化数据,运行正常,但当我尝试在火花流作业中使用相同的代码片段时,它抛出以下错误 必须使用WriteStream.start(); 执行具有流来源的查询 那么,有什么方法可以在流作业中展平嵌套的JSON吗? 样本输入嵌套JSON- { "name":" Akash", "age":26, "watches":{
..
无法使用wasbs://...url从检查点恢复到Azure Blob存储 在群集模式下使用独立Spark 2.0.2。 val ssc = StreamingContext.getOrCreate(checkpointPath, () => createSSC(), hadoopConf) 我通过hadoopConf.set中的hadoopConf设置了fs.azure和fs.a
..
我想知道为什么我的火花流工作中有这么多任务编号?它变得越来越大... 运行3.2h后,增长到120020。运行一天后,它将增长到100万...为什么? 推荐答案 此SparkUI功能意味着某些阶段依赖项可能已计算过,也可能没有计算过,但由于其输出已可用而被跳过。因此,它们表现为skipped。 请不要使用might,这意味着在作业完成Spark之前,无法确定是否需要返回并重新计
..
我使用fileStream从Spark(流上下文)读取HDFS目录中的文件。如果我的Spark在一段时间后关闭并启动,我希望读取目录中的新文件。我不想读取Spark已经读取和处理过的目录中的旧文件。我在此尽量避免重复。 val lines = ssc.fileStream[LongWritable, Text, TextInputFormat]("/home/File") 是否有需要帮助
..
我正在尝试使用SparkStreaming(Spark-Streaming_2.10,版本:1.5.1)的简单文件流传输示例 public class DStreamExample { public static void main(final String[] args) { final SparkConf sparkConf = new SparkConf()
..
我想从流查询中获取类似triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond的信息。 我使用rate格式生成10 rows per second,并使用QueryProgressEvent获取所有指标。 但是,在控制台中,当打印QueryProgressEvent.inputRowsPerSe
..
完全错误:Databricks作业超时,错误:[IP]上的Executor 0丢失。远程RPC客户端已解除关联。可能是由于容器超过阈值或网络问题。检查驱动程序日志中的警告消息。 我们正在Azure Databricks订阅上使用Job API 2.0运行作业,并使用Pools接口来缩短派生时间,并使用Worker/Driver作为Standard_DS12_v2。 我们有一个作业(JAR
..
以下是已成功安装的依赖项。 !apt-get install openjdk-8-jre !apt-get install scala !pip install py4j !wget -q https://downloads.apache.org/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz !tar xf spark-2.4.8-bin-h
..
如何通过电光Web UI监控作业进度?在本地运行电光,我可以使用http://localhost:4040. 通过4040端口访问电光UI 推荐答案 按照此colab notebook,您可以执行以下操作。 首先,配置电光UI,启动电光会话: import findspark findspark.init() from pyspark.sql import SparkSess
..
“$BREW安装阿帕奇-电光‘ 给我2.3.x版。 ‘$BREW搜索阿帕奇-电光’ 和 ‘$BREW信息阿帕奇-电光’ 不提供安装不同版本的选项。 是否可以使用自制软件获得不同的版本? 推荐答案 运行这些命令(假设您已经通过Homebrew安装了apache-电光) cd "$(brew --repo homebrew/core)" git log Formula/apache-
..
如何检查电光中的dataframe列是否为空 例如 type IdentifiedDataFrame = {SourceIdentfier, DataFrame} def splitRequestIntoDFsWithAndWithoutTransactionId(df: DataFrame) : Seq[IdentifiedDataFrame] = { seq((DeltaTab
..