我想用 Scala 中指定的模式在 DataFrame上创建。我尝试过使用 JSON read (我的意思是读取空文件) ,但我不认为这是最佳实践。
DataFrame
Lets assume you want a data frame with the following schema:
root |-- k: string (nullable = true) |-- v: integer (nullable = false)
You simply define schema for a data frame and use empty RDD[Row]:
RDD[Row]
import org.apache.spark.sql.types.{ StructType, StructField, StringType, IntegerType} import org.apache.spark.sql.Row val schema = StructType( StructField("k", StringType, true) :: StructField("v", IntegerType, false) :: Nil) // Spark < 2.0 // sqlContext.createDataFrame(sc.emptyRDD[Row], schema) spark.createDataFrame(sc.emptyRDD[Row], schema)
PySpark equivalent is almost identical:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType schema = StructType([ StructField("k", StringType(), True), StructField("v", IntegerType(), False) ]) # or df = sc.parallelize([]).toDF(schema) # Spark < 2.0 # sqlContext.createDataFrame([], schema) df = spark.createDataFrame([], schema)
Using implicit encoders (Scala only) with Product types like Tuple:
Product
Tuple
import spark.implicits._ Seq.empty[(String, Int)].toDF("k", "v")
or case class:
case class KV(k: String, v: Int) Seq.empty[KV].toDF
or
spark.emptyDataset[KV].toDF
As of Spark 2.0.0, you can do the following.
Let's define a Person case class:
Person
scala> case class Person(id: Int, name: String) defined class Person
Import spark SparkSession implicit Encoders:
spark
Encoders
scala> import spark.implicits._ import spark.implicits._
And use SparkSession to create an empty Dataset[Person]:
Dataset[Person]
scala> spark.emptyDataset[Person] res0: org.apache.spark.sql.Dataset[Person] = [id: int, name: string]
You could also use a Schema "DSL" (see Support functions for DataFrames in org.apache.spark.sql.ColumnName).
scala> val id = $"id".int id: org.apache.spark.sql.types.StructField = StructField(id,IntegerType,true) scala> val name = $"name".string name: org.apache.spark.sql.types.StructField = StructField(name,StringType,true) scala> import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType scala> val mySchema = StructType(id :: name :: Nil) mySchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true)) scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val emptyDF = spark.createDataFrame(sc.emptyRDD[Row], mySchema) emptyDF: org.apache.spark.sql.DataFrame = [id: int, name: string] scala> emptyDF.printSchema root |-- id: integer (nullable = true) |-- name: string (nullable = true)
import scala.reflect.runtime.{universe => ru} def createEmptyDataFrame[T: ru.TypeTag] = hiveContext.createDataFrame(sc.emptyRDD[Row], ScalaReflection.schemaFor(ru.typeTag[T].tpe).dataType.asInstanceOf[StructType] ) case class RawData(id: String, firstname: String, lastname: String, age: Int) val sourceDF = createEmptyDataFrame[RawData]
Here is a solution that creates an empty dataframe in pyspark 2.0.0 or more.
from pyspark.sql import SQLContext sc = spark.sparkContext schema = StructType([StructField('col1', StringType(),False),StructField('col2', IntegerType(), True)]) sqlContext.createDataFrame(sc.emptyRDD(), schema)
Here you can create schema using StructType in scala and pass the Empty RDD so you will able to create empty table. Following code is for the same.
import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.BooleanType import org.apache.spark.sql.types.LongType import org.apache.spark.sql.types.StringType //import org.apache.hadoop.hive.serde2.objectinspector.StructField object EmptyTable extends App { val conf = new SparkConf; val sc = new SparkContext(conf) //create sparksession object val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() //Created schema for three columns val schema = StructType( StructField("Emp_ID", LongType, true) :: StructField("Emp_Name", StringType, false) :: StructField("Emp_Salary", LongType, false) :: Nil) //Created Empty RDD var dataRDD = sc.emptyRDD[Row] //pass rdd and schema to create dataframe val newDFSchema = sparkSession.createDataFrame(dataRDD, schema) newDFSchema.createOrReplaceTempView("tempSchema") sparkSession.sql("create table Finaltable AS select * from tempSchema") }
Java version to create empty DataSet:
public Dataset<Row> emptyDataSet(){ SparkSession spark = SparkSession.builder().appName("Simple Application") .config("spark.master", "local").getOrCreate(); Dataset<Row> emptyDataSet = spark.createDataFrame(new ArrayList<>(), getSchema()); return emptyDataSet; } public StructType getSchema() { String schemaString = "column1 column2 column3 column4 column5"; List<StructField> fields = new ArrayList<>(); StructField indexField = DataTypes.createStructField("column0", DataTypes.LongType, true); fields.add(indexField); for (String fieldName : schemaString.split(" ")) { StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true); fields.add(field); } StructType schema = DataTypes.createStructType(fields); return schema; }
As of Spark 2.4.3
val df = SparkSession.builder().getOrCreate().emptyDataFrame
This is helpful for testing purposes.
Seq.empty[String].toDF()
I had a special requirement wherein I already had a dataframe but given a certain condition I had to return an empty dataframe so I returned df.limit(0) instead.
df.limit(0)
I'd like to add the following syntax which was not yet mentioned:
Seq[(String, Integer)]().toDF("k", "v")
It makes it clear that the () part is for values. It's empty, so the dataframe is empty.
()
This syntax is also beneficial for adding null values manually. It just works, while other options either don't or are overly verbose.
null