火花加载 CSV 文件作为 DataFrame? ?

我想在火花中读取 CSV 并将其转换为 DataFrame 并将其存储在带有 df.registerTempTable("table_name")的 HDFS 中

我试过了:

scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv")

我得到的错误是:

java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10]
at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277)
at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

在 ApacheSpark 中加载 CSV 文件作为 DataFrame 的正确命令是什么?

518422 次浏览

Park-csv 是 Spark 核心功能的一部分,不需要单独的库。 你可以举个例子

df = spark.read.format("csv").option("header", "true").load("csvfile.csv")

在 scala 中,(这适用于任何格式-在分隔符中提及“ ,”用于 csv,“ t”用于 tsv 等)

Val df = sqlContext.read.format (“ com.database ricks.spot.csv”) . option (“分隔符”,“ ,”) . load (“ csvfile.csv”)

使用 Spark 2. x 解析 CSV 并加载为 DataFrame/DataSet

首先,初始化 SparkSession对象 默认情况下,它将作为 spark在 shell 中提供

val spark = org.apache.spark.sql.SparkSession.builder
.master("local") # Change it as per your cluster
.appName("Spark CSV Reader")
.getOrCreate;

使用以下任何一种方法将 CSV 加载为 DataFrame/DataSet

1. 以程序化的方式进行

 val df = spark.read
.format("csv")
.option("header", "true") //first line in file has headers
.option("mode", "DROPMALFORMED")
.load("hdfs:///csv/file/dir/file.csv")

更新: 添加所有选项 从这里,以防链接将在未来中断

  • 路径 : 文件的位置。类似于 Spark 可以接受标准的 Hadoop globbing 表达式。
  • Header : 当设置为 true 时,第一行文件将用于命名列,不会包含在数据中。所有类型都将被假定为字符串。默认值为 false。
  • 分隔符 : 默认情况下,列使用,但分隔符可以设置为任何字符
  • 引号 : 默认情况下,引号字符是“ ,但可以设置为任何字符。引号内的分隔符将被忽略
  • Escape : 默认情况下,转义字符是,但可以设置为任何字符。转义引号字符将被忽略
  • ParserLib : 默认情况下,可以将“ 公共资源”设置为“ 单一性”以使用该库进行 CSV 解析。
  • Mode : 确定解析模式。默认为 PERMISSIVE。可能的值是:
    • PERMISSIVE : 尝试解析所有行: 为缺少的标记插入空值,并忽略额外的标记。
    • DROPMALFORMED : 删除标记数量少于或多于预期的行或与模式不匹配的标记
    • FAILFAST : 如果遇到任何格式不正确的行,则使用 RuntimeException 中止 Charset: 默认为‘ UTF-8’,但可以设置为其他有效的字符集名称
  • 推断模式 : 自动推断列类型。它需要对数据进行一次额外的传递,默认情况下为 false
  • 注释 : 跳过以此字符开头的行。默认为“ #”。通过将此设置为 null 禁用注释。
  • NullValue : 指定一个指示空值的字符串,任何与此字符串匹配的字段都将在 DataFrame 中设置为空值
  • DateFormat : 指定一个字符串,该字符串指示在读取日期或时间戳时使用的日期格式。自定义日期格式遵循 java.text 的格式。SimpleDateFormat.这同时适用于 DateType 和 TimestampType。默认情况下,它是 null,这意味着尝试通过 java.sql 解析 times 和 date。ValueOf ()和 java.sql。Date. valueOf ().

2. 您也可以使用这种 SQL 方式

 val df = spark.sql("SELECT * FROM csv.`hdfs:///csv/file/dir/file.csv`")

依赖关系 :

 "org.apache.spark" % "spark-core_2.11" % 2.0.0,
"org.apache.spark" % "spark-sql_2.11" % 2.0.0,

火花版本 < 2.0

val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("csv/file/path");

依赖性:

"org.apache.spark" % "spark-sql_2.10" % 1.6.0,
"com.databricks" % "spark-csv_2.10" % 1.6.0,
"com.univocity" % "univocity-parsers" % LATEST,

使用 Spark 2.0,以下是如何阅读 CSV

val conf = new SparkConf().setMaster("local[2]").setAppName("my app")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder
.config(conf = conf)
.appName("spark session example")
.getOrCreate()


val path = "/Users/xxx/Downloads/usermsg.csv"
val base_df = sparkSession.read.option("header","true").
csv(path)

在 Java 1.8中,这段代码可以很好地读取 CSV 文件

POM.xml

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.0.0</version>
</dependency>


<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-csv_2.10</artifactId>
<version>1.4.0</version>
</dependency>

爪哇咖啡

SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
// create Spark Context
SparkContext context = new SparkContext(conf);
// create spark Session
SparkSession sparkSession = new SparkSession(context);


Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");


//("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv");
System.out.println("========== Print Schema ============");
df.printSchema();
System.out.println("========== Print Data ==============");
df.show();
System.out.println("========== Print title ==============");
df.select("title").show();

佩妮的火花2的例子就是在火花2中做它的方法。还有一个技巧: 通过对数据进行初始扫描,将选项 inferSchema设置为 true,为您生成头文件

这里,假设 spark是您已经设置的一个火花会话,这个操作将加载 S3上亚马逊主机上所有 Landsat 图像的 CSV 索引文件。

  /*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


val csvdata = spark.read.options(Map(
"header" -> "true",
"ignoreLeadingWhiteSpace" -> "true",
"ignoreTrailingWhiteSpace" -> "true",
"timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ",
"inferSchema" -> "true",
"mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")

坏消息是: 这会触发对文件的扫描; 对于像这个20 + MB 压缩的 CSV 文件这样大的文件,在长途连接中可能需要30秒。请记住: 一旦您得到了这个模式,您最好手动编写代码。

(代码片段 Apache许可证2.0授权以避免所有歧义,这是我作为 s3集成的演示/集成测试所做的工作)

它的 Hadoop 是2.6,Spark 是1.6,而且没有“数据块”包。

import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;


val csv = sc.textFile("/path/to/file.csv")
val rows = csv.map(line => line.split(",").map(_.trim))
val header = rows.first
val data = rows.filter(_(0) != header(0))
val rdd = data.map(row => Row(row(0),row(1).toInt))


val schema = new StructType()
.add(StructField("id", StringType, true))
.add(StructField("val", IntegerType, true))


val df = sqlContext.createDataFrame(rdd, schema)

默认的文件格式是 Parquet,其中包含 fak.read。.和文件读取 csv,为什么你得到的异常。使用您试图使用的 api 指定 csv 格式

解析一个 CSV 文件有很多挑战,如果文件大小更大,如果列值中有非英语/escape/分隔符/其他字符,它会不断累加,这可能会导致解析错误。

魔力就在于所使用的选项。那些曾经为我工作并且希望能够覆盖大部分边缘案例的代码如下:

### Create a Spark Session
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()


### Note the options that are used. You may have to tweak these in case of error
html_df = spark.read.csv(html_csv_file_path,
header=True,
multiLine=True,
ignoreLeadingWhiteSpace=True,
ignoreTrailingWhiteSpace=True,
encoding="UTF-8",
sep=',',
quote='"',
escape='"',
maxColumns=2,
inferSchema=True)

希望能有所帮助

注意: 上面的代码来自 Spark 2 API,其中 CSV 文件读取 API 与 Spark 可安装的内置包捆绑在一起。

注意: PySpark 是用于 Spark 的 Python 包装器,它与 Scala/Java 共享相同的 API。

以防您正在构建一个具有 scala 2.11和 Apache 2.0或更高版本的 jar。

不需要创建 sqlContextsparkContext对象。只需一个 SparkSession对象就足以满足所有需求。

以下是运行良好的 mycode:

import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.log4j.{Level, LogManager, Logger}


object driver {


def main(args: Array[String]) {


val log = LogManager.getRootLogger


log.info("**********JAR EXECUTION STARTED**********")


val spark = SparkSession.builder().master("local").appName("ValidationFrameWork").getOrCreate()
val df = spark.read.format("csv")
.option("header", "true")
.option("delimiter","|")
.option("inferSchema","true")
.load("d:/small_projects/spark/test.pos")
df.show()
}
}

如果您在集群中运行,只需在定义 sparkBuilder对象时将 .master("local")更改为 .master("yarn")

《火种博士》的内容包括: Https://spark.apache.org/docs/2.2.0/sql-programming-guide.html

试试这个如果使用火花2.0 +

For non-hdfs file:
df = spark.read.csv("file:///csvfile.csv")




For hdfs file:
df = spark.read.csv("hdfs:///csvfile.csv")


For hdfs file (with different delimiter than comma:
df = spark.read.option("delimiter","|")csv("hdfs:///csvfile.csv")

注意:-这适用于任何分隔文件。只需使用选项(“分隔符”,)来更改值。

希望这对你有帮助。

使用内置的 Spark csv,您可以很容易地用新的 Spark > 2.0的 Spark Session 对象完成它。

val df = spark.
read.
option("inferSchema", "false").
option("header","true").
option("mode","DROPMALFORMED").
option("delimiter", ";").
schema(dataSchema).
csv("/csv/file/dir/file.csv")
df.show()
df.printSchema()

您可以设置多种选项。

  • header: 您的文件是否在顶部包含标题行
  • inferSchema: 是否要自动推断模式。默认值是 true。我总是倾向于提供模式来确保正确的数据类型。
  • mode: 解析模式,允许,下拉或失败
  • delimiter: 要指定分隔符,默认值为逗号(’,’)

要读取系统上的相对路径,可以使用 System.getProperty 方法获取工作目录,然后再使用相对路径加载文件。

scala> val path = System.getProperty("user.dir").concat("/../2015-summary.csv")
scala> val csvDf = spark.read.option("inferSchema","true").option("header", "true").csv(path)
scala> csvDf.take(3)

火花: 2.4.4 Scala: 2.11.12

将以下 Spark 依赖项添加到 POM 文件:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>

火花配置:

val spark = SparkSession.builder().master("local").appName("Sample App").getOrCreate()

阅读 csv 文件:

val df = spark.read.option("header", "true").csv("FILE_PATH")

显示输出:

df.show()

使用 Spark 2.4 + ,如果希望从本地目录加载 csv,那么可以使用2个会话并将其加载到 hive 中。第一个会话应该使用 master () config 作为“ local [ * ]”创建,第二个会话使用“纱线”和 Hive 启用。

下面这个对我有用。

import org.apache.log4j.{Level, Logger}
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.sql._


object testCSV {


def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark_local = SparkSession.builder().appName("CSV local files reader").master("local[*]").getOrCreate()


import spark_local.implicits._
spark_local.sql("SET").show(100,false)
val local_path="/tmp/data/spend_diversity.csv"  // Local file
val df_local = spark_local.read.format("csv").option("inferSchema","true").load("file://"+local_path) // "file://" is mandatory
df_local.show(false)


val spark = SparkSession.builder().appName("CSV HDFS").config("spark.sql.warehouse.dir", "/apps/hive/warehouse").enableHiveSupport().getOrCreate()


import spark.implicits._
spark.sql("SET").show(100,false)
val df = df_local
df.createOrReplaceTempView("lcsv")
spark.sql(" drop table if exists work.local_csv ")
spark.sql(" create table work.local_csv as select * from lcsv ")


}

当使用 spark2-submit --master "yarn" --conf spark.ui.enabled=false testCSV.jar运行时,它运行良好,并在 hive 中创建了表。