在 ApacheSpark 中提取 Dataframe 的 List 列值

我想将数据框架的字符串列转换为列表。我能从 Dataframe API 中找到的是 RDD,所以我首先尝试将它转换回 RDD,然后将 toArray函数应用到 RDD。在这种情况下,长度和 SQL 工作得很好。但是,我从 RDD 得到的结果在每个元素周围都有方括号,比如这个 [A00001]。我想知道是否有适当的方法将列转换为列表或删除方括号。

如有任何建议,我们将不胜感激。谢谢!

287565 次浏览

This should return the collection containing single list:

dataFrame.select("YOUR_COLUMN_NAME").rdd.map(r => r(0)).collect()

Without the mapping, you just get a Row object, which contains every column from the database.

Keep in mind that this will probably get you a list of Any type. Ïf you want to specify the result type, you can use .asInstanceOf[YOUR_TYPE] in r => r(0).asInstanceOf[YOUR_TYPE] mapping

P.S. due to automatic conversion you can skip the .rdd part.

I know the answer given and asked for is assumed for Scala, so I am just providing a little snippet of Python code in case a PySpark user is curious. The syntax is similar to the given answer, but to properly pop the list out I actually have to reference the column name a second time in the mapping function and I do not need the select statement.

i.e. A DataFrame, containing a column named "Raw"

To get each row value in "Raw" combined as a list where each entry is a row value from "Raw" I simply use:

MyDataFrame.rdd.map(lambda x: x.Raw).collect()

With Spark 2.x and Scala 2.11

I'd think of 3 possible ways to convert values of a specific column to a List.

Common code snippets for all the approaches

import org.apache.spark.sql.SparkSession


val spark = SparkSession.builder.getOrCreate
import spark.implicits._ // for .toDF() method


val df = Seq(
("first", 2.0),
("test", 1.5),
("choose", 8.0)
).toDF("id", "val")

Approach 1

df.select("id").collect().map(_(0)).toList
// res9: List[Any] = List(one, two, three)

What happens now? We are collecting data to Driver with collect() and picking element zero from each record.

This could not be an excellent way of doing it, Let's improve it with the next approach.


Approach 2

df.select("id").rdd.map(r => r(0)).collect.toList
//res10: List[Any] = List(one, two, three)

How is it better? We have distributed map transformation load among the workers rather than a single Driver.

I know rdd.map(r => r(0)) does not seems elegant you. So, let's address it in the next approach.


Approach 3

df.select("id").map(r => r.getString(0)).collect.toList
//res11: List[String] = List(one, two, three)

Here we are not converting DataFrame to RDD. Look at map it won't accept r => r(0)(or _(0)) as the previous approach due to encoder issues in DataFrame. So end up using r => r.getString(0) and it would be addressed in the next versions of Spark.

Conclusion

All the options give the same output but 2 and 3 are effective, finally 3rd one is effective and elegant(I'd think).

Databricks notebook

In Scala and Spark 2+, try this (assuming your column name is "s"):

df.select('s').as[String].collect
sqlContext.sql(" select filename from tempTable").rdd.map(r => r(0)).collect.toList.foreach(out_streamfn.println) //remove brackets

it works perfectly

from pyspark.sql.functions import col


df.select(col("column_name")).collect()

here collect is functions which in turn convert it to list. Be ware of using the list on the huge data set. It will decrease performance. It is good to check the data.

This is java answer.

df.select("id").collectAsList();
List<String> whatever_list = df.toJavaRDD().map(new Function<Row, String>() {
public String call(Row row) {
return row.getAs("column_name").toString();
}
}).collect();


logger.info(String.format("list is %s",whatever_list)); //verification

Since no one has given any solution in java(Real Programming Language) Can thank me later

An updated solution that gets you a list:

dataFrame.select("YOUR_COLUMN_NAME").map(r => r.getString(0)).collect.toList

Below is for Python-

df.select("col_name").rdd.flatMap(lambda x: x).collect()