比起Scala,我更喜欢Python。但是,由于Spark本身是用Scala编写的,我希望我的代码在Scala中比Python版本运行得更快,原因很明显。
带着这样的假设,我想学习&为1 GB的数据写一些非常常见的Scala版本的预处理代码。数据选自Kaggle上的SpringLeaf比赛。只是简单介绍一下数据(它包含1936个维度和145232行)。数据由各种类型组成,例如int型,浮点型,字符串,布尔型。我使用8个内核中的6个进行Spark处理;这就是为什么我使用minPartitions=6
,这样每个核都有一些东西要处理。
Scala代码
val input = sc.textFile("train.csv", minPartitions=6)
val input2 = input.mapPartitionsWithIndex { (idx, iter) =>
if (idx == 0) iter.drop(1) else iter }
val delim1 = "\001"
def separateCols(line: String): Array[String] = {
val line2 = line.replaceAll("true", "1")
val line3 = line2.replaceAll("false", "0")
val vals: Array[String] = line3.split(",")
for((x,i) <- vals.view.zipWithIndex) {
vals(i) = "VAR_%04d".format(i) + delim1 + x
}
vals
}
val input3 = input2.flatMap(separateCols)
def toKeyVal(line: String): (String, String) = {
val vals = line.split(delim1)
(vals(0), vals(1))
}
val input4 = input3.map(toKeyVal)
def valsConcat(val1: String, val2: String): String = {
val1 + "," + val2
}
val input5 = input4.reduceByKey(valsConcat)
input5.saveAsTextFile("output")
Python代码
input = sc.textFile('train.csv', minPartitions=6)
DELIM_1 = '\001'
def drop_first_line(index, itr):
if index == 0:
return iter(list(itr)[1:])
else:
return itr
input2 = input.mapPartitionsWithIndex(drop_first_line)
def separate_cols(line):
line = line.replace('true', '1').replace('false', '0')
vals = line.split(',')
vals2 = ['VAR_%04d%s%s' %(e, DELIM_1, val.strip('\"'))
for e, val in enumerate(vals)]
return vals2
input3 = input2.flatMap(separate_cols)
def to_key_val(kv):
key, val = kv.split(DELIM_1)
return (key, val)
input4 = input3.map(to_key_val)
def vals_concat(v1, v2):
return v1 + ',' + v2
input5 = input4.reduceByKey(vals_concat)
input5.saveAsTextFile('output')
< >强Scala的性能 第0阶段(38分钟),第1阶段(18秒) < / p >
< >强Python的性能 第0阶段(11分钟),第1阶段(7秒) < / p >
两者都生成了不同的DAG可视化图(由于这两张图显示了Scala (map
)和Python (reduceByKey
)的不同阶段0函数)
但是,本质上,这两个代码都试图将数据转换为(维度_id,值列表字符串)RDD并保存到磁盘。输出将用于计算每个维度的各种统计信息。
性能方面,对于这样的真实数据,Scala代码似乎比Python版本运行慢4倍。 对我来说,好消息是它给了我继续使用Python的良好动力。坏消息是我不太明白为什么?< / p >