如何循环通过每一行的数据帧在火花

例如

sqlContext = SQLContext(sc)


sample=sqlContext.sql("select Name ,age ,city from user")
sample.show()

上面的语句在终端上打印整个表。但是我想使用 forwhile访问该表中的每一行,以执行进一步的计算。

241225 次浏览

你就是不能。与其他分布式数据结构一样,DataFrames不是 可迭代的,只能使用专用的高阶函数和/或 SQL 方法访问。

你当然可以 collect

for row in df.rdd.collect():
do_something(row)

或转换 toLocalIterator

for row in df.rdd.toLocalIterator():
do_something(row)

并在本地迭代,如上所示,但它击败了使用 Spark 的所有目的。

如果要对 DataFrame 对象中的每一行执行某些操作,请使用 map。这将允许您对每一行执行进一步的计算。它相当于从 0len(dataset)-1的整个数据集的循环。

注意,这将返回一个 PipelinedRDD,而不是 DataFrame。

要“循环”并利用 Spark 的并行计算框架,您可以定义一个自定义函数并使用 map。

def customFunction(row):


return (row.name, row.age, row.city)


sample2 = sample.rdd.map(customFunction)

或者

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

然后自定义函数将应用于数据框架的每一行。注意 sample2将是一个 RDD,而不是一个数据帧。

如果要执行更复杂的计算,可能需要映射。如果只需要添加一个简单的派生列,可以使用 withColumn,并返回一个数据帧。

sample3 = sample.withColumn('age2', sample.age + 2)

使用 python 中的列表理解,您可以只用两行代码就将整列值收集到一个列表中:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

在上面的示例中,我们返回数据库“ default”中的表列表,但是可以通过替换 sql ()中使用的查询来调整该列表。

或者更简称:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

对于三列的示例,我们可以创建一个字典列表,然后在 for 循环中迭代它们。

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]}
for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
print("{} is a {} year old from {}".format(
row["name"],
row["age"],
row["city"]))

以上

tupleList = [{name:x["name"], age:x["age"], city:x["city"]}

应该是

tupleList = [{'name':x["name"], 'age':x["age"], 'city':x["city"]}

对于 nameagecity,它们不是变量,而只是字典的键。

这样试一试

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]);
for f in result.collect():
print (f.col_name)

这可能不是最佳实践,但是您可以简单地使用 collect()定位特定列,将其导出为 Rows 列表,然后循环遍历该列表。

假设这是你的 df:

+----------+----------+-------------------+-----------+-----------+------------------+
|      Date|  New_Date|      New_Timestamp|date_sub_10|date_add_10|time_diff_from_now|
+----------+----------+-------------------+-----------+-----------+------------------+
|2020-09-23|2020-09-23|2020-09-23 00:00:00| 2020-09-13| 2020-10-03| 51148            |
|2020-09-24|2020-09-24|2020-09-24 00:00:00| 2020-09-14| 2020-10-04| -35252           |
|2020-01-25|2020-01-25|2020-01-25 00:00:00| 2020-01-15| 2020-02-04| 20963548         |
|2020-01-11|2020-01-11|2020-01-11 00:00:00| 2020-01-01| 2020-01-21| 22173148         |
+----------+----------+-------------------+-----------+-----------+------------------+

循环遍历“日期”列中的行:

rows = df3.select('Date').collect()


final_list = []
for i in rows:
final_list.append(i[0])


print(final_list)