如何将多个文本文件读入一个 RDD?

我想从一个 hdfs 位置读取一组文本文件,并在迭代中使用 park 对其执行映射。

JavaRDD<String> records = ctx.textFile(args[1], 1);一次只能读取一个文件。

我想读取多个文件并将它们作为单个 RDD 处理?

169344 次浏览

使用 union如下:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

那么 bigRdd就是包含所有文件的 RDD。

您可以指定整个目录,使用通配符,甚至目录和通配符的 CSV。例如:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

正如 Nick Chammas 指出的那样,这是 Hadoop 的 FileInputFormat的一次曝光,因此这也适用于 Hadoop (和 Scalding)。

你可以使用一个 textFile 调用来读取多个文件:

sc.textFile(','.join(files))

你可以用这个

首先你可以得到一个缓冲区/S3路径列表:

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest


def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]


//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();


//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)


//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)


//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());


//Removing Base Directory Name
files.remove(0)


//Creating a Scala List for same
files.asScala
}

现在将 List 对象传递给下面的代码,注意: sc 是 SQLContext 的对象

var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}

现在您得到了最终的统一 RDD,即 df

可选,您还可以在单个 BigRDD 中对其进行重新分区

val files = sc.textFile(filename, 1).repartition(1)

重新分区总是有效的: D

在 PySpark 中,我发现了另一种解析文件的有用方法。也许在 Scala 中有一个对应的程序,但是我不太习惯想出一个可以工作的翻译。实际上,它是一个 textFile 调用,添加了标签(在下面的示例中,key = filename,value = 1 line from file)。

“标签”文本文件

输入:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)


for filename in glob.glob(Data_File + "/*"):
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

Output: 数组,每个条目包含一个元组,使用 filename 作为键,值 = 每行文件。(从技术上讲,使用这种方法,除了实际的文件路径名之外,还可以使用一个不同的键——可能是一个哈希表示以节省内存)。也就是说。

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
...]

您还可以将其重新组合为一个行列表:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

或者将整个文件重新组合为单个字符串(在本例中,结果与从 wholeTextFiles 获得的结果相同,但是从文件路径中去掉了字符串“ file:”):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()

有一个直截了当的清洁解决方案。使用 wholeTextFiles ()方法。这将采用一个目录并形成一个键值对。返回的 RDD 将是一对 RDD。 以下是 Spark 医生的描述:

WholeTextFiles 允许读取包含多个小文本文件的目录,并将它们作为(文件名、内容)对返回。这与 textFile 相反,textFile 在每个文件中每行返回一条记录

你可以利用

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

在这里您将得到您的文件的路径和该文件的内容。因此,您可以一次执行整个文件的任何操作,从而节省开销

rdd = textFile('/data/{1.txt,2.txt}')

使用 sc.textFile,所有答案都是正确的

我只是想知道为什么不 wholeTextFiles例如,在这种情况下..。

val minPartitions = 2
val path = "/pathtohdfs"
sc.wholeTextFiles(path,minPartitions)
.flatMap{case (path, text)
...

一个限制是,我们必须加载小文件,否则性能将是糟糕的,并可能导致 OOM。

注:

  • 整个文件应该适合内存
  • 适用于不能按行分割的文件格式... 比如 XML 文件

进一步参考 探访

用于将 DataFrame 写入外部存储系统(如文件系统、键值存储等)的接口。使用 DataFrame.write ()访问此。

版本1.4中新增。

Csv (路径,模式 = 无,压缩 = 无,sep = 无,报价 = 无,转义 = 无,头 = 无,nullValue = 无,转义报价 = 无,quoteAll = 无,dateFormat = 无,timestampFormat = 无) 将 DataFrame 的内容以 CSV 格式保存到指定的路径。

参数: Path-任何 Hadoop 支持的文件系统中的路径 模式- 指定数据已存在时保存操作的行为。

Append: 将此 DataFrame 的内容追加到现有数据。 覆盖: 覆盖现有数据。 如果数据已经存在,则默认忽略此操作。 Error (缺省情况) : 如果数据已经存在,则引发异常。 压缩-保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名称之一(none、 bzip2、 gzip、 lz4、 snappy 和 flate)。 Sep-设置单个字符作为每个字段和值的分隔符。如果设置了 Nothing,则它使用默认值,,。 引号-设置用于转义引号值的单个字符,其中分隔符可以是值的一部分。如果没有设置,则使用默认值,”。如果要关闭引号,则需要设置一个空字符串。 Escape-设置用于在已经引用的值中转义引号的单个字符。如果设置了 Nothing,则它使用默认值, 一个标志,指示是否包含引号的值应该始终用引号括起来。如果没有设置,则它使用默认值 true,转义包含引号字符的所有值。 QuoteAll-一个标志,指示是否所有值都应始终用引号括起来。如果设置了 Nothing,则它使用默认值 false,仅转义包含引号字符的值。 Header-将列的名称作为第一行写入。如果没有设置,则使用默认值 false。 NullValue-设置空值的字符串表示形式。如果没有设置,则使用默认值空字符串。 DateFormat-设置指示日期格式的字符串。自定义日期格式遵循 java.text 的格式。SimpleDateFormat.这适用于日期类型。如果设置了 Nothing,它将使用默认值 yyyy-MM-dd。 TimestampFormat-设置指示时间戳格式的字符串。自定义日期格式遵循 java.text 的格式。SimpleDateFormat.这适用于时间戳类型。如果没有设置,则使用默认值 yyyy-MM-dd‘ T’HH: mm: ss.SSSZZ。