如何将数组(即列表)列转换为 Vector [英] How do I convert an array (i.e. list) column to Vector

查看:37
本文介绍了如何将数组(即列表)列转换为 Vector的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑以下代码段(假设 spark 已经设置为某个 SparkSession):

Consider the following snippet (assuming spark is already set to some SparkSession):

from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

请注意,温度字段是一个浮点数列表.我想将这些浮点数列表转换为 MLlib 类型 Vector,并且我希望使用基本的 DataFrame API 而不是通过 RDD(这是低效的,因为它将所有数据从 JVM 发送到 Python,处理是在 Python 中完成的,我们没有得到 Spark 的 Catalyst 优化器 yada yada 的好处).我该怎么做呢?具体:

Notice that the temperatures field is a list of floats. I would like to convert these lists of floats to the MLlib type Vector, and I'd like this conversion to be expressed using the basic DataFrame API rather than going via RDDs (which is inefficient because it sends all data from the JVM to Python, the processing is done in Python, we don't get the benefits of Spark's Catalyst optimizer, yada yada). How do I do this? Specifically:

  1. 有没有办法让直接演员工作?有关详细信息(以及尝试解决方法失败),请参见下文?或者,是否还有任何其他操作具有我所追求的效果?
  2. 我在下面建议的两种替代解决方案中哪个更有效(UDF 与爆炸/重新组装列表中的项目)?或者还有其他几乎但不完全正确的替代方案比其中任何一个都更好吗?

直接演员不起作用

这就是我所期望的正确"解决方案.我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制转换.作为上下文,让我提醒您将其转换为另一种类型的正常方法:

A straight cast doesn't work

This is what I would expect to be the "proper" solution. I want to convert the type of a column from one type to another, so I should use a cast. As a bit of context, let me remind you of the normal way to cast it to another type:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

现在例如df_with_strings.collect()[0]["temperatures"][1]'-7.0'.但是,如果我转换为 ml Vector,那么事情就不会那么顺利:

Now e.g. df_with_strings.collect()[0]["temperatures"][1] is '-7.0'. But if I cast to an ml Vector then things do not go so well:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

这给出了一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

哎呀!任何想法如何解决这个问题?

Yikes! Any ideas how to fix this?

有一个 Transformer 似乎非常适合这项工作:VectorAssembler.它需要一列或多列并将它们连接成一个向量.不幸的是,它只需要 VectorFloat 列,而不是 Array 列,因此以下不起作用:

There is a Transformer that seems almost ideal for this job: the VectorAssembler. It takes one or more columns and concatenates them into a single vector. Unfortunately it only takes Vector and Float columns, not Array columns, so the follow doesn't work:

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)

它给出了这个错误:

pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'

我能想到的最好的解决方法是将列表分解成多列,然后使用 VectorAssembler 再次将它们全部收集起来:

The best work around I can think of is to explode the list into multiple columns and then use the VectorAssembler to collect them all back up again:

from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

这似乎是理想的,除了 TEMPERATURE_COUNT 超过 100,有时超过 1000.(另一个问题是,如果您不知道代码会更复杂)预先确定数组的大小,尽管我的数据不是这种情况.)Spark 是否真的生成了一个包含那么多列的中间数据集,或者它是否只是认为这是单个项目暂时通过的中间步骤(或者确实是当它看到这些列的唯一用途是组装成一个向量时,它会完全优化这个离开步骤)?

This seems like it would be ideal, except that TEMPERATURE_COUNT be more than 100, and sometimes more than 1000. (Another problem is that the code would be more complicated if you don't know the size of the array in advance, although that is not the case for my data.) Does Spark actually generate an intermediate data set with that many columns, or does it just consider this an intermediate step that individual items pass through transiently (or indeed does it optimise this away step entirely when it sees that the only use of these columns is to be assembled into a vector)?

一个更简单的替代方法是使用 UDF 进行转换.这让我可以在一行代码中非常直接地表达我想要做的事情,并且不需要制作包含大量列的数据集.但是所有这些数据都必须在 Python 和 JVM 之间交换,并且每个单独的数字都必须由 Python 处理(众所周知,Python 迭代单个数据项的速度很慢).这是它的外观:

A rather simpler alternative is to use a UDF to do the conversion. This lets me express quite directly what I want to do in one line of code, and doesn't require making a data set with a crazy number of columns. But all that data has to be exchanged between Python and the JVM, and every individual number has to be handled by Python (which is notoriously slow for iterating over individual data items). Here is how that looks:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

可忽略的评论

这个漫无边际的问题的其余部分是我在试图找到答案时想到的一些额外内容.大多数阅读本文的人可能会跳过它们.

Ignorable remarks

The remaining sections of this rambling question are some extra things I came up with while trying to find an answer. They can probably be skipped by most people reading this.

在这个简单的示例中,可以使用向量类型开始创建数据,但当然,我的数据实际上并不是我要并行化的 Python 列表,而是从数据源中读取的.但为了记录,这里是这样的:

In this trivial example it's possible to create the data using the vector type to begin with, but of course my data isn't really a Python list that I'm parallelizing, but instead is being read from a data source. But for the record, here is how that would look:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)

低效解决方案:使用map()

一种可能性是使用 RDD map() 方法将列表转换为 Vector.这类似于 UDF 的想法,除了它更糟糕,因为序列化等的成本是针对每一行中的所有字段产生的,而不仅仅是被操作的字段.作为记录,以下是该解决方案的样子:

Inefficient solution: use map()

One possibility is to use the RDD map() method to transform the list to a Vector. This is similar to the UDF idea, except that its even worse because the cost of serialisation etc. is incurred for all the fields in each row, not just the one being operated on. For the record, here's what that solution would look like:

df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()

尝试演员的变通方法失败

无奈之下,我注意到 Vector 在内部由一个具有四个字段的结构表示,但使用该类型结构的传统强制转换也不起作用.这是一个插图(我使用 udf 构建了结构,但 udf 不是重要的部分):

Failed attempt at a workaround for cast

In desperation, I noticed that Vector is represented internally by a struct with four fields, but using a traditional cast from that type of struct doesn't work either. Here is an illustration (where I built the struct using a udf but the udf isn't the important part):

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)

这给出了错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

推荐答案

就我个人而言,我会使用 Python UDF 而不会打扰其他任何事情:

Personally I would go with Python UDF and wouldn't bother with anything else:

  • Vectors 不是原生 SQL 类型,因此会以某种方式产生性能开销.特别是这个过程需要两个步骤,首先将数据从外部类型转换为行,然后使用通用 RowEncoder 从行到内部表示.
  • 任何下游 ML Pipeline 都比简单的转换要昂贵得多.此外,它需要一个与上述过程相反的过程
  • Vectors are not native SQL types so there will be performance overhead one way or another. In particular this process requires two steps where data is first converted from external type to row, and then from row to internal representation using generic RowEncoder.
  • Any downstream ML Pipeline will be much more expensive than a simple conversion. Moreover it requires a process which opposite to the one described above

但如果您真的想要这里的其他选择,您可以:

But if you really want other options here you are:

  • 带有 Python 包装器的 Scala UDF:

  • Scala UDF with Python wrapper:

按照项目网站上的说明安装 sbt.

Install sbt following the instructions on the project site.

创建具有以下结构的 Scala 包:

Create Scala package with following structure:

.
├── build.sbt
└── udfs.scala

Edit build.sbt(调整以反映 Scala 和 Spark 版本):

Edit build.sbt (adjust to reflect Scala and Spark version):

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.4",
  "org.apache.spark" %% "spark-mllib" % "2.4.4"
)

编辑udfs.scala:

package com.example.spark.udfs

import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.DenseVector

object udfs {
  val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}

包装:

sbt package

并包含(或等效的,取决于 Scala 版本):

and include (or equivalent depending on Scala version):

$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar

作为 --driver-class-path 启动 shell/提交应用程序时的参数.

as an argument for --driver-class-path when starting shell / submitting application.

在 PySpark 中定义一个包装器:

In PySpark define a wrapper:

from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext

def as_vector(col):
    sc = SparkContext.getOrCreate()
    f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
    return Column(f.apply(_to_seq(sc, [col], _to_java_column)))

测试:

with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()

+--------+------------------+----------------+
|    city|      temperatures|          vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+

with_vec.printSchema()

root
 |-- city: string (nullable = true)
 |-- temperatures: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- vector: vector (nullable = true)

  • 将数据转储为反映 DenseVector 模式的 JSON 格式并读回:

  • Dump data to a JSON format reflecting DenseVector schema and read it back:

    from pyspark.sql.functions import to_json, from_json, col, struct, lit
    from pyspark.sql.types import StructType, StructField
    from pyspark.ml.linalg import VectorUDT
    
    json_vec = to_json(struct(struct(
        lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
        col("temperatures").alias("values")
    ).alias("v")))
    
    schema = StructType([StructField("v", VectorUDT())])
    
    with_parsed_vector = df.withColumn(
        "parsed_vector", from_json(json_vec, schema).getItem("v")
    )
    
    with_parsed_vector.show()
    

    +--------+------------------+----------------+
    |    city|      temperatures|   parsed_vector|
    +--------+------------------+----------------+
    | Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
    |New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
    +--------+------------------+----------------+
    

    with_parsed_vector.printSchema()
    

    root
     |-- city: string (nullable = true)
     |-- temperatures: array (nullable = true)
     |    |-- element: double (containsNull = true)
     |-- parsed_vector: vector (nullable = true)
    

  • 这篇关于如何将数组(即列表)列转换为 Vector的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆