apache-spark相关内容
我正在开发一个Spark流应用程序,其中我需要使用来自两个服务器的输入流,每个服务器每秒向Spark上下文发送一条JSON消息。 我的问题是,如果我只在一个流上执行操作,一切都运行得很好。但如果我有来自不同服务器的两个流,那么Spark在可以打印任何东西之前冻结,并且只有在两个服务器都发送了它们必须发送的所有JSON消息时(当它检测到socketTextStream没有接收数据时)才开始重新
..
您好,我正在尝试下载以下Build.sbt文件中的spark-core、spark-streaming、twitter4j和spark-streaming-twitter: name := "hello" version := "1.0" scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spa
..
我使用OCR工具从截图中提取文本(每个截图大约1-5句)。但是,在手动验证提取的文本时,我注意到不时会出现几个错误。 考虑到文字“你好星火!我真的很喜欢😊❤️!”,我注意到: 1)字母“i”、“!”和“l”被替换为“|”。 2)表情符号未正确提取并被其他字符替换或被省略。 3)不时删除空格。 结果,我可能会得到这样的字符串:“Hello here 7l|Real|y
..
我是Spark的新手,正在尝试使用Spark来读取这样的json文件。在ubuntu18.04、java1.8上使用Spark 2.3和Scala 2.11: cat my.json: { "Name":"A", "No_Of_Emp":1, "No_Of_Supervisors":2} { "Name":"B", "No_Of_Emp":2, "No_Of_Supervisors":
..
我正在运行Spark作业。我有4个内核和设置为5G的工作内存。应用程序主机位于同一网络中的另一台计算机上,不承载任何工作进程。这是我的代码: private void myClass() { // configuration of the spark context SparkConf conf = new SparkConf().setAppName("myWork").s
..
有没有办法将数据帧行转换成列。 我有以下结构作为输入: val inputDF = Seq(("pid1","enc1", "bat"), ("pid1","enc2", ""), ("pid1","enc3", ""), ("pid3","enc1", "cat"),
..
我对ApacheSpark非常陌生,我正在尝试按美国州重新划分数据帧。然后,我希望将每个分区分解为其自己的RDD并保存到特定位置: schema = types.StructType([ types.StructField("details", types.StructType([ types.StructField("state", types.StringType(),
..
是否可以更改Spark在写入前保存其临时文件的_temporary目录? 具体地说,因为我正在写入表的单个分区,所以我希望临时文件夹位于分区文件夹中。 可能吗? 文件输出委员会无法使用默认的${mapred.output.dir}/_temporary 由于其实现方式,文件输出委员会会创建一个推荐答案子目录来写入文件,并在提交后移到${mapred.output.dir}。
..
list_1 = [[6, [3, 8, 7]], [5, [9, 7, 3]], [6, [7, 8, 5]], [5, [6, 7, 2]]] rdd1 = sc.parallelize(list_1) newpairRDD = rdd1.partitionBy(2,lambda k: int(k[0])) print("Partitions structure: {}".format(ne
..
我正在使用AWS Glue运行一些pyspark python代码,它有时成功,但有时失败,出现依赖错误:Resource Setup Error: Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: JohnSnowLabs#spark-nlp;2.5.4: not found],错误日志如
..
技术堆栈详细信息- Scala - 2.11.8 Spark - 2.4.4 Delta - 0.7.0 Running On - AWS EMR 用法- spark.readStream .format("kinesis") .option("streamName", kinesisConfs.streamName) .option("regi
..
Spark 2.4.2能否与Amazon EMR上的HIVE 2.3.4一起用作执行引擎? 我已经通过以下命令将JAR文件链接到hive(scala-库、spark-core、spark-Common-network): cd $HIVE_HOME/lib ln -s $SPARK_HOME/jars/spark-network-common_2.11-2.4.2.jar ln -s
..
我在EMR(版本5.32.0)上的(Py)Spark中遇到了一些问题。大约一年前,我在EMR集群上运行了相同的程序(我认为发行版一定是5.29.0)。然后,我能够使用spark-submit参数正确配置我的PySpark程序。但是,现在我正在运行相同/相似的代码,但是spark-submit参数似乎没有任何效果。 我的集群配置: 主节点:8个VCORE,32 GiB内存,仅EBS存储E
..
Spark数据帧架构: StructType( [StructField("a", StringType(), False), StructField("b", StringType(), True), StructField("c" , BinaryType(), False), StructField("d", Ar
..
我有一个包含数亿行的表,我希望将其存储在Spark的DataFrame中,并作为拼图文件保存在磁盘上。 我的拼图文件大小现在超过2TB,我希望确保已对其进行优化。 这些列中有很大一部分是字符串值,这可能很长,但也通常只有很少的值。例如,我有一列只有两个不同的值(一个是20个字符,一个是30个字符的字符串),还有一个列的字符串平均有400个字符,但所有条目中只有大约400个不同的值。
..
开发人员和API文档都没有任何关于DataFrame.saveAsTable或DataFrameWriter.options可以传递哪些选项的参考,它们会影响配置单元表的保存。 我希望在这个问题的答案中,我们可以汇总一些信息,这些信息将有助于Spark开发人员更好地控制Spark保存表的方式,或许还能为改进Spark的文档提供基础。 推荐答案 您在任何地方都看不到options文
..
我正在使用Kryo编码器将GenericRecords编码为Spark DataFrame,并将该数据帧写入Avro文件。一旦我尝试从配置单元中读取文件,我就得到一个错误,说解析器找到的是toplevelRecords而不是预期的字段。这条记录不在我现有的模式中,我认为它是在我使用Spark-Avro编写时创建的。我想知道是否/如何从Avro文件中删除它。 如下所示: { "t
..
我有一个Spark DataFrame,我想将其另存为PARQUE,然后使用PARQUE-Avro库加载它。 我的数据帧中有一个时间戳列,它在拼图中被转换为INT96时间戳列。但是拼花-Avrodoes not support INT96格式化和抛出。 有没有办法避免呢?在Avro支持的内容中将时间戳写入拼图时,是否可以更改Spark使用的格式? 我当前使用 date_fr
..
我有一个如下所示的DataFrame ID Date Amount 10001 2019-07-01 50 10001 2019-05-01 15 10001 2019-06-25 10 10001 2019-05-27 20 10002 2019-06-29 25 10002 2019-07-18
..
我在纱线集群模式下使用Spark-Submit运行一个Spark作业。为了在运行时提交输入和输出文件路径,我尝试加载一个包含输入和输出路径的属性文件。 属性文件:input.properties spark.myapp.input /input/path spark.myapp.output /output/path 我正在使用以下命令运行我的应用程序。
..