partitioning相关内容
我有一个 ArrayList,我想把它分成 n 个较小的 List 对象,并对每个对象执行一个操作.我目前这样做的方法是用 Java 中的 ArrayList 对象实现的.任何伪代码都可以. for (int i = 1; i subArray(ArrayList A, int start,打算) {ArrayList toReturn = new ArrayList();for (int i
..
在 Kafka 中,我只想使用单个代理、单个主题和具有一个生产者和多个消费者的单个分区(每个消费者从代理获取自己的数据副本).鉴于此,我不想要使用 Zookeeper 的开销;我不能只使用经纪人吗?为什么必须要有动物园管理员? 解决方案 是的,运行 Kafka 需要 Zookeeper.来自 Kafka 入门文档: 第 2 步:启动服务器 Kafka 使用zookeeper,所
..
我写了一个简单的程序来请求一个巨大的数据库.为了导出我的结果,我写了这个函数: result.coalesce(1).write.options(Map("header" -> "true", "delimiter" > ";")).csv(mycsv.csv) 我使用 coalesce 方法只得到一个文件作为输出.问题是结果文件包含超过一百万行.所以,我无法在 Excel 中打开它...
..
我写了一个简单的程序来请求一个巨大的数据库.为了导出我的结果,我写了这个函数: result.coalesce(1).write.options(Map("header" -> "true", "delimiter" > ";")).csv(mycsv.csv) 我使用 coalesce 方法只得到一个文件作为输出.问题是结果文件包含超过一百万行.所以,我无法在 Excel 中打开它...
..
RDD 包含的元素数量与其理想的分区数量之间有什么关系? 我有一个包含数千个分区的 RDD(因为我从一个由多个小文件组成的源文件加载它,这是一个我无法修复的约束,所以我必须处理它).我想对其重新分区(或使用 coalesce 方法).但我事先不知道 RDD 将包含的事件的确切数量. 所以我想以自动化的方式来做.看起来像: val numberOfElements = rdd.count(
..
我有一个 DataFrame,我正在尝试 partitionBy 一列,按该列对其进行排序并使用以下命令以镶木地板格式保存: df.write().format("parquet").partitionBy("dynamic_col").sortBy("dynamic_col").save("test.parquet"); 我收到以下错误: reason: User class throw
..
我有一个像这样的火花流应用: val message = KafkaUtils.createStream(...).map(_._2)message.foreachRDD( rdd => {如果(!rdd.isEmpty){val kafkaDF = sqlContext.read.json(rdd)kafkaDF.foreachPartition(我 =>{创建连接()i.foreach(行
..
我想在 3 列上对数据框“df1"进行分区.该数据框对于这 3 列恰好有 990 个独特的组合: 在 [17]: df1.createOrReplaceTempView("df1_view")在 [18] 中:spark.sql("select count(*) from (select distinct(col1,col2,col3) from df1_view) as t").show()+
..
我是 Spark 的新手,正在使用 spark.read.jdbc 通过 JDBC 从 Postgres 数据库表创建 DataFrame. 我对分区选项有些困惑,特别是 partitionColumn、lowerBound、upperBound 和 numPartitions>. 文档似乎表明这些字段是可选的.如果我不提供它们会怎样? Spark 如何知道如何对查询进行分区?这会有
..
考虑的方法(Spark 2.2.1): DataFrame.repartition(采用 partitionExprs: Column* 参数的两个实现) DataFrameWriter.partitionBy 注意:本题不问这些方法的区别 来自 docs of partitionBy: 如果指定,输出将在类似于Hive 的分区方案 的文件系统上布置.例如,当我们按年份和
..
我正在使用 PySpark 来执行经典的 ETL 作业(加载数据集、处理它、保存它),并希望将我的 Dataframe 保存为由“虚拟"列分区的文件/目录;我所说的“虚拟"是指我有一列时间戳,它是一个包含 ISO 8601 编码日期的字符串,我想按年/月/日进行分区;但我实际上在 DataFrame 中没有 Year、Month 或 Day 列;我有这个时间戳,我可以从中派生这些列,但我不希望我的
..
我需要根据一些共享键列将许多数据帧join.对于键值 RDD,可以指定一个分区器,以便将具有相同键的数据点混洗到同一个执行器,因此加入更有效(如果在 join 之前有混洗相关操作).可以在 Spark DataFrames 或 DataSets 上做同样的事情吗? 解决方案 如果您知道将多次加入 DataFrame,您可以在加载它后重新分区 val users = spark.read.
..
我正在尝试使用 Spark 将一个大型分区数据集写入磁盘,而 partitionBy 算法在我尝试过的两种方法中都遇到了困难. 分区严重倾斜 - 有些分区很大,有些很小. 问题 1: 当我在 repartitionBy 之前使用 repartition 时,Spark 将所有分区都写成一个文件,即使是大分区也是如此 val df = spark.read.parquet("so
..
一种边缘情况,当在带分区的 Spark SQL 中保存拼花表时, #schema definitioin最终 StructType 架构 = DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("time", DataTypes.StringType, true),DataTypes.createStructF
..
我正在尝试将 Spark 中的数据帧写入 HDFS 位置,我希望如果我添加 partitionBy 符号 Spark 将创建分区(类似于 Parquet 格式的书写) 形式的文件夹 partition_column_name=partition_value (即 partition_date=2016-05-03).为此,我运行了以下命令: (df.write.partitionBy('p
..
我有按 date 分区的镶木地板数据 &hour,文件夹结构: events_v3-- 事件日期=2015-01-01-- event_hour=2015-01-1-- part10000.parquet.gz-- 事件日期=2015-01-02-- event_hour=5-- part10000.parquet.gz 我通过 spark 创建了一个表 raw_events 但是当我尝试查询
..
我想检查我们如何获取有关每个分区的信息,例如总数.当 Spark 作业以部署模式作为纱线集群提交以在控制台上记录或打印时,驱动程序端每个分区中的记录. 解决方案 您可以像这样获取每个分区的记录数: df.rdd.mapPartitionsWithIndex{case (i,rows) =>迭代器((i,rows.size))}.toDF("partition_number","numbe
..
在 Spark-land 中有几个相似但又不同的概念,围绕着如何将工作分派到不同节点并同时执行.具体来说,有: Spark Driver 节点 (sparkDriverCount) 一个 Spark 集群可用的工作节点数量 (numWorkerNodes) Spark 执行器的数量(numExecutors) 所有worker/executors同时操作的DataFrame (data
..
我的问题是由计算 spark 数据帧中连续行之间的差异的用例触发的. 例如,我有: >>>df.show()+-----+------------+|索引|列1|+-----+------------+|0.0|0.58734024||1.0|0.67304325||2.0|0.85154736||3.0|0.5449719|+-----+------------+ 如果我选择使用“
..
我正在尝试将数据从 PostgreSQL 表中的表移动到 HDFS 上的 Hive 表.为此,我想出了以下代码: val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set(
..