用 PySpark 加载 CSV 文件

我是 Spark 的新手,我正在尝试从 Spark 的文件中读取 CSV 数据。 Here's what I am doing :

sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()

我希望这个调用会给我一个文件前两列的列表,但是我得到了这个错误:

“文件”第一行 IndexError: 列表索引超出范围

虽然我的 CSV 文件作为一个以上的列。

395868 次浏览

你确定 all的行至少有2列? 你能试一下,只是为了检查? :

sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()

或者,你可以列印罪魁祸首(如果有的话) :

sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()

现在,对于任何通用的 csv 文件,还有另一个选项: https://github.com/seahboonsiew/pyspark-csv,如下所示:

假设我们有以下上下文

sc = SparkContext
sqlCtx = SQLContext or HiveContext

首先,使用 SparkContext 将 pypark-csv.py 分发给执行者

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

通过 SparkContext 读取 csv 数据并将其转换为 DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

还有一种选择是使用熊猫来读取 CSV 文件,然后将熊猫数据框导入到 Spark 中。

例如:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd


sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)


pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

If your csv data happens to not contain newlines in any of the fields, you can load your data with textFile() and parse it

import csv
import StringIO


def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()


input = sc.textFile(inputFile).map(loadRecord)

Spark 2.0.0 +

您可以直接使用内置的 csv 数据源:

spark.read.csv(
"some_input_file.csv",
header=True,
mode="DROPMALFORMED",
schema=schema
)

或者

(
spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv")
)

不包括任何外部依赖。

星火 < 2.0.0 :

我建议使用 spark-csv,而不是手工解析,因为在一般情况下,手工解析并不琐碎:

确保 Spark CSV 包含在路径中(--packages--jars--driver-class-path)

并加载数据如下:

df = (
sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)

它可以处理加载、模式推断、删除格式不正确的行,并且不需要将数据从 Python 传递给 JVM。

注意 :

如果您知道模式,那么最好避免模式推理并将其传递给 DataFrameReader。假设你有三列——整数、双精度和字符串:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType


schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])


(
sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)

简单地用逗号分割也会分割字段中的逗号(例如 a,b,"1,2,3",c) ,所以不推荐使用这种方法。如果你想使用 DataFrames API,0-323的答案是很好的,但是如果你想坚持使用基本的 Spark,你可以使用 CSV模块在基本的 Python 中解析 csv:

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

编辑: 正如@muon 在评论中提到的,这将像对待其他行一样对待标题,因此您需要手动提取它。例如,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(确保在过滤器计算之前不要修改 header)。但是在这一点上,您最好使用内置的 csv 解析器。

This is in-line with what JP Mercier 最初建议 about using Pandas, but with a major modification: If you read data into Pandas in chunks, it should be more malleable. Meaning, that you can parse a much larger file than Pandas can actually handle as a single piece and pass it to Spark in smaller sizes. (This also answers the comment about why one would want to use Spark if they can load everything into Pandas anyways.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd


sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)


Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)


for chunky in chunk_100k:
Spark_Full +=  sc.parallelize(chunky.values.tolist())


YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
from pyspark.sql import SparkSession


spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()


df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")


print(df.collect())

如果你想把 csv 加载成一个数据框架,你可以这样做:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file

我觉得挺好的。

如果数据集中有一行或多行的列数少于或多于2,则可能出现此错误。

我也是刚到 Pyspark,正在尝试阅读 CSV 文件,以下代码对我很有用:

在这段代码中,我使用的数据集来自 kaggle 的链接是: https://www.kaggle.com/carrie1/ecommerce-data

1. Without mentioning the schema:

from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()


sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

现在检查一下栏目: 列

产出将包括:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

检查每一列的数据类型:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

这将为数据类型为 StringType 的所有列提供数据框架

2. 使用模式: 如果您知道模式,或者想要更改上表中任何列的数据类型,那么使用它(假设我有以下列,并且希望每个列都具有特定的数据类型)

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])


scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()


sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

现在检查每个列的数据类型的模式:

sdfData.schema


StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

编辑: 我们也可以使用下面这行代码,而不必明确提到模式:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

输出结果是:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

输出如下:

sdfData.show()


+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

在使用 spark.read.csv时,我发现使用选项 escape='"'multiLine=True可以为 CSV 标准提供最一致的解决方案,而且根据我的经验,从 Google Sheets 导出的 CSV 文件效果最好。

就是,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
inferSchema=False, header=True)

This is in PYSPARK

path="Your file path with file name"


df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Then you can check

df.show(5)
df.count()

以下列方式阅读你的 csv 文件:

df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)

火花版本是3.0