pyspark相关内容
我正在使用 spark 2.3 并使用 pyspark 中的数据帧编写器类方法编写了一个数据帧来创建配置单元分区表. newdf.coalesce(1).write.format('orc').partitionBy('veh_country').mode("overwrite").saveAsTable('emp.partition_Load_table') 这是我的表结构和分区信息. h
..
我想在 HiveContext 中使用 PySpark 应用 SCD1 和 SCD2.在我的方法中,我正在读取增量数据和目标表.阅读后,我加入了他们的 upsert 方法.我正在对所有源数据帧进行 registerTempTable.我正在尝试将最终数据集写入目标表,但我面临的问题是无法在读取它的表中插入覆盖. 请为此提出一些解决方案.我不想将中间数据写入物理表并再次读取. 是否有任何
..
我在直接从 Spark shell 读取 ORC 文件时遇到问题.注意:运行Hadoop 1.2和Spark 1.2,使用pyspark shell,可以使用spark-shell(运行scala). 我使用过这个资源 http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.2.4/Apache_Spark_Quickstart_v224/co
..
无法从 Hive 访问通过 Spark (pyspark) 创建的 Hive 表. df.write.format("orc").mode("overwrite").saveAsTable("db.table") 从 Hive 访问时出错: 错误:java.io.IOException:java.lang.IllegalArgumentException:bucketId 超出范围:-1
..
我一直在尝试使用 sqlContext.read.format("jdbc").options(driver="org.apache.hive.jdbc.HiveDriver") 将 Hive 表放入 Spark 而无需任何成功.我已经完成研究并阅读以下内容: 如何从 spark 连接到远程 hive 服务器 Spark 1.5.1 不使用 hive jdbc 1.2.0 htt
..
我有一个输入数据框:df_input(更新的 df_input) |comment|inp_col|inp_val||11 |a |a1 ||12 |a |a2 ||15 |b |b3 ||16 |b |b4 ||17 |c |&b ||17 |c |c5 ||17 |d |&c ||17 |d |d6 ||17 |e |&d ||17 |e |e7 | 我想将 inp_val 列中的变量替换
..
您可以在此处查看实现:https://github.com/apache/spark/blob/ffa05c84fe75663fc33f3d954d1cb1e084ab3280/python/pyspark/rdd.py#L804 它与“普通"reduce 函数有何不同? depth = 2 是什么意思? 我不希望 reducer 函数在分区上线性传递,但首先减少每个可用的对,然后将
..
如何在 Spark DataFrame 中打印特定样本的决策路径? Spark 版本:'2.3.1' 下面的代码打印了整个模型的决策路径,如何让它打印特定样本的决策路径?比如tagvalue ball等于2的那一行的决策路径 import pyspark.sql.functions as F从 pyspark.ml 导入管道、变压器从 pyspark.sql 导入数据帧从 pyspark.m
..
VectorAssembler 的功能有一些很烦人的地方.我目前正在将一组列转换为单列向量,然后使用 StandardScaler 函数应用缩放到包含的功能.然而,似乎有内存的SPARK原因,决定是否应该使用 DenseVector 或 SparseVector 来表示每行特征.但是,当您需要使用 StandardScaler 时,SparseVector(s) 的输入无效,只允许 DenseVe
..
如何将 Pandas 数据帧发送到 hive 表? 我知道如果我有一个 spark 数据框,我可以使用 将它注册到一个临时表 df.registerTempTable("table_name")sqlContext.sql("create table table_name2 as select * from table_name") 但是当我尝试使用 pandas dataFrame
..
我正在处理一项 ETL 作业,该作业会将 JSON 文件提取到 RDS 临时表中.我配置的爬虫可以对 JSON 文件进行分类,只要它们的大小小于 1MB.如果我缩小文件(而不是漂亮的打印件),如果结果小于 1MB,它将毫无问题地对文件进行分类. 我在想出解决方法时遇到了麻烦.我尝试将 JSON 转换为 BSON 或 GZIPing JSON 文件,但它仍然归类为 UNKNOWN. 有没
..
我正在使用 pyspark 来估计逻辑回归模型的参数.我使用 spark 计算似然和梯度,然后使用 scipy 的最小化函数进行优化 (L-BFGS-B). 我使用纱线客户端模式来运行我的应用程序.我的应用程序可以毫无问题地开始运行.但是,过一会就报如下错误: 回溯(最近一次调用最后一次):文件“/home/panc/research/MixedLogistic/software/mixe
..
A 上一个问题推荐sc.applicationId,但不是 存在于 PySpark 中,仅存在于 scala 中. 那么,如何确定 PySpark 进程的应用程序 ID(用于 yarn)? 解决方案 您可以通过 Py4J RPC 网关使用 Java SparkContext 对象: >>>sc._jsc.sc().applicationId()u'application_14338
..
当我对 30 多列进行特征工程以创建大约 200 多列时遇到错误.它没有使工作失败,但显示错误.我想知道如何避免这种情况. Spark - 2.3.1 Python - 3.6 集群配置 - 1 主 - 32 GB RAM,16 核 4 从 - 16 GB RAM,8 核 输入数据 - 8 个分区的 Parquet 文件,使用 snappy 压缩. 我的 Spark-提交
..
我有一个 spark 数据框,我想按比例 0.60、0.20、0.20 将其分为训练、验证和测试. 我使用了以下代码: def data_split(x):全局 data_map_vard_map = data_map_var.valuedata_row = x.asDict()随机导入rand = random.uniform(0.0,1.0)ret_list = ()如果兰特 我的
..
在 Python 3.5 Jupyter 环境中运行以下命令时,出现以下错误.关于导致它的原因有什么想法吗? import findsparkfindspark.init() 错误: IndexError Traceback(最近调用最后) 在 ()1 导入 findspark---->2 findspark.in
..
Pyspark API 提供了许多聚合函数,除了中位数.Spark 2 带有 approxQuantile ,它给出了近似的分位数,但精确的中位数计算起来非常昂贵.是否有更多 Pyspark 方法来计算 Spark Dataframe 中一列值的中位数? 解决方案 这是在 Python (Spark 1.6 +) 中使用 Dataframe API 的示例实现. import pyspa
..
开始使用 pyspark.ml 和管道 API,我发现自己为典型的预处理任务编写了自定义转换器,以便在管道中使用它们.示例: from pyspark.ml import Pipeline, Transformer类 CustomTransformer(变压器):# 懒惰的解决方法 - 转换器需要具有这些属性_defaultParamMap = dict()_paramMap = dict()_
..
使用 PySpark 数据帧,我正在尝试尽可能高效地执行以下操作.我有一个数据框,其中有一列包含文本和我想用来过滤行的单词列表.所以: 数据框看起来像这样 df:col1 col2 col_with_texta b foo 很好吃12 34 呜呜呜是的 0 块百胜 列表将是 list = [foo,bar]因此结果将是: 结果:col1 col2 col_with_texta b fo
..
假设我有以下 RDD: rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))]) 我如何使用 repartitionAndSortWithinPartitions 并按 x[0] 和 x[1][0] 之后排序.使用
..