如何在 sc.textFile 中加载本地文件,而不是 HDFS

我在追随伟大的 火花教程

所以我尝试在46m: 00s 加载 README.md,但是失败了,我正在做的是:

$ sudo docker run -i -t -h sandbox sequenceiq/spark:1.1.0 /etc/bootstrap.sh -bash
bash-4.1# cd /usr/local/spark-1.1.0-bin-hadoop2.4
bash-4.1# ls README.md
README.md
bash-4.1# ./bin/spark-shell
scala> val f = sc.textFile("README.md")
14/12/04 12:11:14 INFO storage.MemoryStore: ensureFreeSpace(164073) called with curMem=0, maxMem=278302556
14/12/04 12:11:14 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 160.2 KB, free 265.3 MB)
f: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[1] at textFile at <console>:12
scala> val wc = f.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://sandbox:9000/user/root/README.md
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)

我怎样才能装载那个 README.md

233606 次浏览

试试看

val f = sc.textFile("./README.md")

尝试显式指定 sc.textFile("file:///path to the file/")。错误发生在设置 Hadoop 环境时。

SparkContext.textFile internally calls org.apache.hadoop.mapred.FileInputFormat.getSplits, which in turn uses org.apache.hadoop.fs.getDefaultUri if schema is absent. This method reads "fs.defaultFS" parameter of Hadoop conf. If you set HADOOP_CONF_DIR environment variable, the parameter is usually set as "hdfs://..."; otherwise "file://".

这已经讨论到火花邮件列表,并请参考此 mail

You should use hadoop fs -put <localsrc> ... <dst> copy the file into hdfs:

${HADOOP_COMMON_HOME}/bin/hadoop fs -put /path/to/README.md README.md

贡贝的回答非常好。但我还是想提到 file:/// = ~/../../,而不是 $SPARK_HOME。希望这能为像我这样的新手节省一些时间。

这就是我在 Spark 集群上遇到的错误的解决方案,这个错误是由 Azure 托管在一个 Windows 集群上的:

加载原始 HVAC.csv 文件,使用函数解析它

data = sc.textFile("wasb:///HdiSamples/SensorSampleData/hvac/HVAC.csv")

我们使用(wasb://)来允许 Hadoop 访问蔚蓝色的 blog 存储文件,这三个斜杠是对正在运行的节点容器文件夹的相对引用。

例如: 如果 Spark 集群仪表板中文件资源管理器中文件的路径是:

样品传感器样品数据

因此,描述路径如下: sflcc1: 是存储帐户的名称。Sflccpark: 是集群节点名。

因此,我们使用相对的三个斜杠来引用当前集群节点的名称。

希望这个能帮上忙。

您只需要将文件的路径指定为 “ file:///目录/file”

例如:

val textFile = sc.textFile("file:///usr/local/spark/README.md")

我的桌面上有一个名为 NewsArticle.txt 的文件。

在 Spark 中,我输入:

val textFile= sc.textFile(“file:///C:/Users/582767/Desktop/NewsArticle.txt”)

I needed to change all the \ to / character for the filepath.

为了测试它是否有效,我输入:

textFile.foreach(println)

我正在运行 Windows7,但没有安装 Hadoop。

如果文件位于 Spark 主节点中(例如,在使用 AWS EMR 的情况下) ,那么首先以本地模式启动星火 shell。

$ spark-shell --master=local
scala> val df = spark.read.json("file:///usr/lib/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

或者,您可以首先从本地文件系统将文件复制到 HDFS,然后以其默认模式启动 Spark (例如,在使用 AWS EMR 的情况下启动 YARN)来直接读取文件。

$ hdfs dfs -mkdir -p /hdfs/spark/examples
$ hadoop fs -put /usr/lib/spark/examples/src/main/resources/people.json /hdfs/spark/examples
$ hadoop fs -ls /hdfs/spark/examples
Found 1 items
-rw-r--r--   1 hadoop hadoop         73 2017-05-01 00:49 /hdfs/spark/examples/people.json


$ spark-shell
scala> val df = spark.read.json("/hdfs/spark/examples/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]


scala> df.show()
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

注意:

确保在从本地(sc.textFile("file:///path to the file/"))加载数据时以本地模式运行火花,否则将得到类似于 Caused by: java.io.FileNotFoundException: File file:/data/sparkjob/config2.properties does not exist的错误。 因为运行在不同工作者上的执行器不会在本地路径中找到这个文件。

如果您尝试读取文件形式的 HDFS.trying 设置路径在 SparkConf 中

 val conf = new SparkConf().setMaster("local[*]").setAppName("HDFSFileReader")
conf.set("fs.defaultFS", "hdfs://hostname:9000")

While Spark supports loading files from the local filesystem, it requires that the files are available at the same path on all nodes in your cluster.

有些网络文件系统,如 NFS、 AFS 和 MapR 的 NFS 层,作为常规文件系统向用户公开。

如果您的数据已经在其中一个系统中,那么您可以通过指定 档案://路径将其用作输入; 只要文件系统挂载在每个节点上的相同路径上,Spark 就会处理它。每个节点都需要有相同的路径

 rdd = sc.textFile("file:///path/to/file")

如果您的文件尚未在集群中的所有节点上,那么您可以在本地将其加载到驱动程序上,而无需经过 Spark,然后调用并行化将内容分发给 worker

Take care to put file:// in front and the use of "/" or "\" according to OS.

我在 Spark 2.3中也遇到过这种情况,Hadoop 也安装在通用的“ Hadoop”用户主目录下。由于 Spark 和 Hadoop 都安装在同一个公共目录下,因此 Spark 默认将该方案视为 hdfs,并开始查找 Hadoop 的 core-site.xmlfs.defaultFS指定的 hdfs 下的输入文件。在这种情况下,我们需要显式地将方案指定为 file:///<absoloute path to file>

不必使用 sc.textFile (...)将本地文件转换为数据框架。其中一个选项是,逐行读取本地文件,然后将其转换为 Spark 数据集。下面是 Java 中 Windows 机器的一个例子:

StructType schemata = DataTypes.createStructType(
new StructField[]{
createStructField("COL1", StringType, false),
createStructField("COL2", StringType, false),
...
}
);


String separator = ";";
String filePath = "C:\\work\\myProj\\myFile.csv";
SparkContext sparkContext = new SparkContext(new SparkConf().setAppName("MyApp").setMaster("local"));
JavaSparkContext jsc = new JavaSparkContext (sparkContext );
SQLContext sqlContext = SQLContext.getOrCreate(sparkContext );


List<String[]> result = new ArrayList<>();
try (BufferedReader br = new BufferedReader(new FileReader(filePath))) {
String line;
while ((line = br.readLine()) != null) {
String[] vals = line.split(separator);
result.add(vals);
}
} catch (Exception ex) {
System.out.println(ex.getMessage());
throw new RuntimeException(ex);
}
JavaRDD<String[]> jRdd = jsc.parallelize(result);
JavaRDD<Row> jRowRdd = jRdd .map(RowFactory::create);
Dataset<Row> data = sqlContext.createDataFrame(jRowRdd, schemata);

现在可以在代码中使用数据帧 data

我尝试了以下方法,它在我的本地文件系统中工作。.基本上火花可以从本地,HDFS 和 AWS S3路径读取

listrdd=sc.textFile("file:////home/cloudera/Downloads/master-data/retail_db/products")