如何在 Spark 中跳过 CSV 文件的头部?

假设我为 Spark 上下文提供了三个文件路径,并且每个文件在第一行都有一个模式。如何跳过标题中的模式行?

val rdd=sc.textFile("file1,file2,file3")

现在,我们如何跳过这个 rdd 的标题行?

154353 次浏览

您可以分别加载每个文件,用 file.zipWithIndex().filter(_._2 > 0)过滤它们,然后联合所有文件 RDD。

如果文件数太大,联合可能抛出 StackOverflowExeption

如果第一条记录中只有一个标题行,那么最有效的过滤方法是:

rdd.mapPartitionsWithIndex {
(idx, iter) => if (idx == 0) iter.drop(1) else iter
}

当然,如果有许多文件内部有许多标题行,这也没有帮助。实际上,您可以通过这种方式联合三个 RD。

您也可以只编写一个 filter,它只匹配可能是标题的一行。这非常简单,但效率较低。

相当于 Python:

from itertools import islice


rdd.mapPartitionsWithIndex(
lambda idx, it: islice(it, 1, None) if idx == 0 else it
)
data = sc.textFile('path_to_data')
header = data.first() #extract header
data = data.filter(row => row != header)   #filter out header

或者,您可以使用 park-CSV 包(或者在 Spark 2.0中,这或多或少可以作为 CSV 本地使用)。注意,这需要每个文件的头部(如您所愿) :

schema = StructType([
StructField('lat',DoubleType(),True),
StructField('lng',DoubleType(),True)])


df = sqlContext.read.format('com.databricks.spark.csv'). \
options(header='true',
delimiter="\t",
treatEmptyValuesAsNulls=True,
mode="DROPMALFORMED").load(input_file,schema=schema)

在 Spark 2.0中,Spark 内置了一个 CSV 阅读器,所以你可以很容易地加载一个 CSV 文件,如下所示:

spark.read.option("header","true").csv("filePath")

使用 PySpark 中的 filter()方法,过滤掉第一个列名以删除标题:

# Read file (change format for other file formats)
contentRDD = sc.textfile(<filepath>)


# Filter out first column of the header
filterDD = contentRDD.filter(lambda l: not l.startswith(<first column name>)


# Check your result
for i in filterDD.take(5) : print (i)

Spark 2.0开始,你能做的就是使用 火花会议来完成这个一行程序:

val spark = SparkSession.builder.config(conf).getOrCreate()

然后@Sandeep Purohit 说:

val dataFrame = spark.read.format("CSV").option("header","true").load(csvfilePath)

我希望它解决了你的问题!

P.S: SparkSession 是 Spark 2.0中引入的新入口点,可以在 Park _ sql 包下找到

//Find header from the files lying in the directory
val fileNameHeader = sc.binaryFiles("E:\\sss\\*.txt",1).map{
case (fileName, stream)=>
val header = new BufferedReader(new InputStreamReader(stream.open())).readLine()
(fileName, header)
}.collect().toMap


val fileNameHeaderBr = sc.broadcast(fileNameHeader)


// Now let's skip the header. mapPartition will ensure the header
// can only be the first line of the partition
sc.textFile("E:\\sss\\*.txt",1).mapPartitions(iter =>
if(iter.hasNext){
val firstLine = iter.next()
println(s"Comparing with firstLine $firstLine")
if(firstLine == fileNameHeaderBr.value.head._2)
new WrappedIterator(null, iter)
else
new WrappedIterator(firstLine, iter)
}
else {
iter
}
).collect().foreach(println)


class WrappedIterator(firstLine:String,iter:Iterator[String]) extends Iterator[String]{
var isFirstIteration = true
override def hasNext: Boolean = {
if (isFirstIteration && firstLine != null){
true
}
else{
iter.hasNext
}
}


override def next(): String = {
if (isFirstIteration){
println(s"For the first time $firstLine")
isFirstIteration = false
if (firstLine != null){
firstLine
}
else{
println(s"Every time $firstLine")
iter.next()
}
}
else {
iter.next()
}
}
}

对于 python 开发人员来说,我已经用火花2.0进行了测试。假设您想删除前14行。

sc = spark.sparkContext
lines = sc.textFile("s3://folder_location_of_csv/")
parts = lines.map(lambda l: l.split(","))
parts.zipWithIndex().filter(lambda tup: tup[1] > 14).map(lambda x:x[0])

WithColumn 是 df 函数,所以下面的代码不能像上面那样使用 RDD 风格。

parts.withColumn("index",monotonically_increasing_id()).filter(index > 14)

在 PySpark 中,您可以使用一个数据框架并将标头设置为 True:

df = spark.read.csv(dataPath, header=True)

这是一个传递给 read()命令的选项:

context = new org.apache.spark.sql.SQLContext(sc)


var data = context.read.option("header","true").csv("<path>")

在2018年工作(星火2.3)

巨蟒

df = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")

斯卡拉

val myDf = spark.read
.option("header", "true")
.format("csv")
.schema(myManualSchema)
.load("mycsv.csv")

MyManualSchema 是我写的一个预定义的模式,你可以跳过这部分代码

更新日期2021 同样的代码也适用于 Spark 3.x

df = spark.read
.option("header", "true")
.option("inferSchema", "true")
.format("csv")
.csv("mycsv.csv")

您可以简单地通过在 Pycharm 中使用 filter()操作(在使用 python 的情况下)过滤掉 Header 行

rdd = sc.textFile('StudentData.csv')
headers=rdd.first()
rdd=rdd.filter(lambda x: x!=headers)
rdd.collect()