spark-dataframe相关内容
我想从字典中创建一个新的数据框.字典包含列名作为键和列数据列表作为值.例如: col_dict = {'col1': [1, 2, 3],'col2': [4, 5, 6]} 我需要它作为一个看起来像这样的数据框: +------+------+|列 1 |col2 |+------+------+|1|4||2|5||3|6|+------+------+ 似乎没有一种简单的方法可以做到
..
简单的请求是我需要帮助将一列添加到数据框中,但是,该列必须为空,其类型来自 ...spark.sql.types,并且必须从字符串中定义类型. > 我可能可以用 ifs 或 case 来做到这一点,但我正在寻找更优雅的东西.不需要为 org.apache.spark.sql.types 中的每种类型编写案例的东西 如果我这样做: df = df.withColumn("col_name
..
我正在尝试将我们的一个 ETL Hive 脚本转换为 Spark,其中 Hive ETL 脚本维护一个表,其中每晚需要在新同步之前删除部分数据.Hive ETL 使用插入覆盖将主表删除超过 3 天的数据.基本上创建一个临时表,其中的数据不超过三天,然后覆盖主表. 使用 Spark(使用 Scala)时,我不断收到此错误,无法写入相同的源.这是我的代码: spark.sql ("Select
..
我有一个 spark 作业(在 spark 1.3.1 中运行)必须迭代多个键(大约 42 个)并处理该作业.这是程序的结构 从地图中获取密钥 从 hive(下面的 hadoop-yarn)中获取与键匹配的数据作为数据框 处理数据 将结果写入 hive 当我一键运行时,一切正常.当我使用 42 个键运行时,在第 12 次迭代时出现内存不足异常.有没有办法可以在每次迭代之间清理内存
..
我正在做一个 POC,我想向 Redshift 写入一些简单的数据集. 我有以下 sbt 文件: name := "Spark_POC"版本:=“1.0"ScalaVersion := "2.10.6"libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "2.0.1"libraryDependencies +=
..
我们正在尝试使用 spark-csv 和 univocity 1.5.0 解析器读取一个 3 gb 文件,该文件的一个列中有多个换行符,但是该文件在某些行的多列中被拆分换行符.这种情况发生在大文件的情况下. 我们使用的是 spark 1.6.1 和 Scala 2.10 以下是我用来读取文件的代码: sqlContext.read.format("com.databricks.s
..
我想把输出的数据导入mysql数据库,但是出现如下错误,我不会把数组转换成想要的字符串类型,能帮帮我吗? val Array(trainingData, testData) = msgDF.randomSplit(Array(0.9, 0.1))val pipeline = new Pipeline().setStages(Array(labelIndexer, word2Vec, mlpc,
..
我在 hive 中有一个名为 test 的表,其中包含 id 和 name 现在我在 hive 中有另一个名为 mysql 的表,其中包含 id、name 和 city. 现在我想比较两个表的模式并将列差异添加到 hive 表 test. hive_df= sqlContext.table("testing.test")mysql_df= sqlContext.table("test
..
我正在寻找一种方法来对保留关系的数据帧的列进行排名.特别是对于这个例子,我有一个 pyspark 数据框,如下所示,我想在其中为 colA & 生成排名.colB(虽然我想支持能够对 N 个列进行排名) +--------+----------+-----+----+|实体|身份证|colA|colB|+--------------------+-----+----+|a|8589934652
..
我有一个 spark SQL 问题 我很欣赏一些关于从嵌套结构数组中进行条件选择的最佳方法的指导. 我在下面有一个示例 json 文档 ``` {"id":"p1",“外部ID":[{"system":"a","id":"1"},{"system":"b","id":"2"},{"system":"c","id":"3"}]} ``` 在 spark SQL 中,我想根据某些
..
在 Spark 1.5.1 中,我已经能够使用 Thrift Server 从 Beeline 访问 spark-shell 临时表.通过阅读 Stackoverflow 上相关问题的答案,我已经能够做到这一点. 但是,升级到 Spark 2.0 后,我无法再从 Beeline 看到临时表,这是我正在遵循的步骤. 我正在使用以下命令启动 spark-shell: ./bin/spar
..
我想从字典中创建一个新的数据框.字典包含列名作为键和列数据列表作为值.例如: col_dict = {'col1': [1, 2, 3],'col2': [4, 5, 6]} 我需要它作为一个看起来像这样的数据框: +------+------+|列 1 |col2 |+------+------+|1|4||2|5||3|6|+------+------+ 似乎没有一种简单的方法可以做到
..
我使用 IntelliJ IDE 在 Microsoft Windows 平台上执行 Spark Scala 代码. 我有四个 Spark 数据帧,每个数据帧大约有 30000 条记录,我尝试从这些数据帧中的每一个中取出一列作为我要求的一部分. 我使用了 Spark SQL 函数来完成它并成功执行.当我执行 DF.show() 或 DF.count() 方法时,我能够在屏幕上看到结果,
..
嗨,我有我的 spark 数据框的输出,它创建了文件夹结构并创建了部分文件.现在我必须合并文件夹内的所有部分文件并将该文件重命名为文件夹路径名. 这就是我做分区的方式 df.write.partitionBy("DataPartition","PartitionYear").format("csv").option("nullValue", "").option("header", "tr
..
在我的应用程序中,我从 Kafka 队列获得了一个帐户流(使用 Spark 流和 kafka) 而且我需要从 S3 获取与这些帐户相关的属性,因此我计划缓存 S3 结果数据帧,因为 S3 数据目前至少不会更新一天,将来可能会更改为 1 小时或 10 分钟.所以问题是如何在不停止进程的情况下定期刷新缓存的数据帧. **更新:我计划在 S3 中有更新时将事件发布到 kafka,使用 SNS
..
嗨,当我执行以下代码行时,我得到以下堆栈跟踪: transactionDF.write.format("jdbc").option("url",SqlServerUri).option("驱动程序", 驱动程序).option("dbtable", fullQualifiedName).option("user", SqlServerUser).option("password",SqlServ
..
伙计们, 我需要使用 Apache Spark DataFrame 执行 jdbc 操作.基本上我有一个名为 Measures 的历史 jdbc 表,我必须在其中执行两个操作: 1.将旧度量记录的endTime 有效性属性设置为当前时间 2.插入新的小节记录设置 endTime 为 9999-12-31 谁能告诉我如何执行(如果可以的话)第一个操作的更新语句和第二个操作的插
..
在 spark 中,我有以下名为“df"的数据框,其中包含一些空条目: +--------------+------------+--------------------+|身份证|特点1|特点2|+-------+--------------------+--------------------+|185|(5,[0,1,4],[0.1,0...| 空||220|(5,[0,2,3],[0.1
..
我正在查询mysql表 val url = "jdbc:mysql://XXX-XX-XXX-XX-XX.compute-1.amazonaws.com:3306/pg_partner"val driver = "com.mysql.jdbc.Driver"val 用户名 = "XXX"val 密码 = "XXX"var connection:Connection = DriverManager
..
我有一个像下面这样的 json 文件. {"name":"method2","name1":"test","parameter1":"C:/Users/test/Desktop/Online.csv","parameter2": 1.0} 我正在加载我的 json 文件. val sqlContext = new org.apache.spark.sql.SQLContext(sc)val
..