在 ApacheSpark 数据框架中连接列

如何在 ApacheSpark 数据框架中连接两列? Spark SQL 中有什么我们可以使用的函数吗?

450877 次浏览

使用原始 SQL,您可以使用 CONCAT:

  • 用巨蟒

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • In Scala

    import sqlContext.implicits._
    
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Since Spark 1.5.0 you can use concat function with DataFrame API:

  • In Python :

    from pyspark.sql.functions import concat, col, lit
    
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • In Scala :

    import org.apache.spark.sql.functions.{concat, lit}
    
    
    df.select(concat($"k", lit(" "), $"v"))
    

There is also concat_ws function which takes a string separator as the first argument.

如果希望使用 DF 完成此操作,可以使用 udf 基于现有列添加新列。

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)


//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))


//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )


//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()

这里有另一种方法来帮助火花:

#import concat and lit functions from pyspark.sql.functions
from pyspark.sql.functions import concat, lit


#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])


#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))


#Show the new data frame
personDF.show()


----------RESULT-------------------------


84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+

在 pySpark 中使用 sqlContext 的另一种方法是..。

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])


# Now we can concatenate columns and assign the new column a name
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))

下面介绍如何进行自定义命名

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

给予,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

通过连接创建新列:

df = df.withColumn('joined_column',
sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()


+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

当您不知道数据框中列的数量或名称时,这里有一个建议。

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

在 Spark 2.3.0中,您可以:

spark.sql( """ select '1' || column_a from table_a """)

在 Spark Scala 中连接字符串列的一个选项是使用 concat

这是必要的 检查空值。因为如果其中一列为空,则即使其他列之一确实具有信息,结果也将为空。

使用 concatwithColumn:

val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

使用 concatselect:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

使用这两种方法,您将得到一个 NEW _ COLUMN,它的值是来自原始 df 的 COL1和 COL2列的串联。

从 Spark 2.3(SARK-22771) Spark SQL 支持串联操作符 ||

例如:

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")

在 Java 中,您可以这样做来连接多个列。示例代码将为您提供一个场景以及如何使用它来更好地理解。

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
.withColumn("concatenatedCol",
concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));




class JavaSparkSessionSingleton {
private static transient SparkSession instance = null;


public static SparkSession getInstance(SparkConf sparkConf) {
if (instance == null) {
instance = SparkSession.builder().config(sparkConf)
.getOrCreate();
}
return instance;
}
}

上面的代码将 col1、 col2、 col3与“ _”分隔,以创建一个名为“ concatenatedCol”的列。

实际上,有一些漂亮的内置抽象可供您完成连接,而无需实现自定义函数。由于您提到了 Spark SQL,因此我猜测您正试图通过 parks.SQL ()将其作为声明性命令传递。如果是这样,您可以直接传递 SQL 命令,如: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

此外,从 Spark 2.3.0开始,您可以在以下行中使用命令: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

其中,是您首选的分隔符(也可以是空格) ,并且是您试图从中读取的临时或永久表。

我们有对应于下面进程的 Java 语法吗

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))

Concat (* cols)

V1.5或更高

将多个输入列连接到一个列中。该函数用于处理字符串、二进制和兼容的数组列。

例句: new_df = df.select(concat(df.a, df.b, df.c))


Concat _ ws (sep,* 备忘录)

V1.5或更高

concat类似,但使用指定的分隔符。

例句: new_df = df.select(concat_ws('-', df.col1, df.col2))


Map _ concat (* ls)

V2.4或更高版本

用于连接映射,返回所有给定映射的并集。

例句: new_df = df.select(map_concat("map1", "map2"))


使用 连接操作符(||) :

V2.3或更高版本

例句: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

参考资料: Spark sql 文档

我们也可以简单地使用 SelectExpr

df1.selectExpr("*","upper(_2||_3) as new")

val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

注意: 要使这段代码工作,您需要在“ isNotNull”函数中放置括号“()”。- > 正确的是“ isNotNull ()”。

val newDf =
df.withColumn(
"NEW_COLUMN",
concat(
when(col("COL1").isNotNull(), col("COL1")).otherwise(lit("null")),
when(col("COL2").isNotNull(), col("COL2")).otherwise(lit("null"))))

在我的例子中,我需要一个 < strong > Pipe-‘ I’分隔行。

from pyspark.sql import functions as F
df.select(F.concat_ws('|','_c1','_c2','_c3','_c4')).show()

这就像一把热刀在黄油上切开一样,效果很好。

像这样使用 concat 方法:

Dataset<Row> DF2 = DF1
.withColumn("NEW_COLUMN",concat(col("ADDR1"),col("ADDR2"),col("ADDR3"))).as("NEW_COLUMN")

在数据帧的选择方法中可以使用 concat()

val fullName = nameDF.select(concat(col("FirstName"), lit(" "), col("LastName")).as("FullName"))

使用 withColumnconcat

val fullName1 = nameDF.withColumn("FullName", concat(col("FirstName"), lit(" "), col("LastName")))

使用 spark.sql concat 函数

val fullNameSql = spark.sql("select Concat(FirstName, LastName) as FullName from names")

取自 https://www.sparkcodehub.com/spark-dataframe-concat-column