- If a column was subjected to tranformation, col() is needed. If a column just served as a reference point (e.g. groupBy, drop_duplicates, partitionBy…), col() is not needed
- subset is used is we put the column(s) into main df function as argument, which could be the column string or column string list.
- agg function will either be used for the whole df, or used with the agg()/ groupBy() function
- .read/write don’t need a (), it is an object required other functions
- some method are columns object specific (e.g. .cast(), .alias(), .contains() )
- some functions are imported (e.g. upper(), lit(), explode(), split() )
Transformation
# createdataframe
df = spark.createDataFrame(
[(1,2,3,4),(5,6,7,8)], # list of rows
["id1", "id2", "id3","id4"] # list of columns name
)
# column rename (must be done one by one)
df.withColumnRenamed("existing", "new")
# filter condition needs to be in bracket (|or &and ~not)
df.filter((col("sqft") <= 25000) | (col("customerSatisfaction") >= 30)) # Alias .where()
# new formular of new column
df.withColumn("employeesPerSqft", col("numberOfEmployees") / col("sqft"))
# cast datatype
df.withColumn("storeId, col("storeId").cast(StringType()))
# constant as new column
df.withColumn("modality", lit("PHYSICAL"))
# split column
df.withColumn("storeValueCategory", split(col("storeCategory"), "_")[0])
.withColumn("storeSizeCategory", split(col("storeCategory"), "_")[1]))
# explode array
df.withColumn("productCategories", explode(col("productCategories")))
# replace string
df.withColumn("storeDescription", regexp_replace(col("storeDescription"), "<string_old>", "<string_new>"))
# drop all duplications
- DataFrame.distinct()
- DataFrame.dropDuplicates() # alias
- DataFrame.drop_duplicates() # alias
- DataFrame.drop_duplicates(subset = None) #subset = drop per group, either non or column list. Only first records in the group will be kept
# fill na
df.na.fill("unknown",["city"]) # has to be a column string/ string list
df.fillna() # alias
# drop na (if empty row then "all", if any missing then "any")
df.na.drop(how="All")
# drop column
df.drop(<col1>) # no-op if no col is given
# approx_count_dictinct, agg function is a must
df.agg(approx_count_distinct(col("division"), 0.15).alias("divisionDistinct")) # the number is the allowed standard deviation, the higher the faster
# return a mean
df.agg(mean(col("sqft")).alias("sqftMean"))
# returns a GroupedData object (column format is flexible)
df.groupBy(“division”, “storeCategory”)
df.groupBy().mean()
# reverse sorted alphabetically
df.orderBy(col("division").desc())
# sample 15% data with replacement
df.sample(True, fraction=0.15)
# change from UNIX eapoch format to Java's simpledateformat
df.withColumn("openDateString", from_unixtime(col("openDate"), "EEE, MMM d, yyyy h:mm a") # result: "Sunday, Dec 4, 2008 1:05 PM"
# get dateofyear/month/quarter... from a timestamp, must not from the UNIX epoch, so cast first
df.withColumn("openTimestamp", col("openDate").cast("Timestamp"))
.withColumn("dayOfYear", dayofyear(col("openTimestamp")))
# join( df, condition, method)
df.join(df2, "name", 'inner')
df.join(df2, [df.name == df2.name], 'outer')
df.join(df2, df["name"] == df2["name"], "inner")
# cross join
df.crossJoin(df2).show()
# union (default by position, alias: unionall)
df1.union(df2)
# union (by name)
df1.unionByName(df2)
# first two characters of column
df.withColumn(“division”, col(“division”).substr(0, 2))
# extracts the value for column sqft from the first row of
df.first().column_name
I/O
# read
spark.read.load(df = spark.read.format("csv").option("header", "true").load("path/to/file.csv") # generic
df = spark.read.csv("path/to/file.csv", header=True) # format specific
# read with schema object
spark.read.schema(schema).format("json").load(filePath)
# writes df to filePath as JSON
df.write.json(filePath)
# write df to filePath as Parquet, partitioning by col1
df.write.partitionBy('col1').parque(filePath)
# overwrite mode output as text
df.write.mode("overwrite").text(filePath)
Properties Config
# control partitions that do not meet a minimum size threshold are automatically coalesced into larger partitions during a shuffle
spark.sql.adaptive.coalescePartitions.enabled
# maximum size of an automatically broadcasted DataFrame when performing a join
spark.sql.autoBroadcastJoinThreshold
# skewed partitions are automatically detected and subdivided into smaller partitions when joining two DataFrames together
spark.sql.adaptive.skewedJoin.enabled
# adjust the number of partitions used in wide transformations like join()
spark.sql.shuffle.partitions
Others
# cache the partitions of DataFrame df only in Spark’s memory
df.persist(StorageLevel.MEMORY_ONLY).count()
# sql query
df.createOrReplaceTempView("df_view")
spark.sql("select * from df_view")
# printSchema
df.printSchema()
# register udf
spark.udf.register("<udf_name>", func, "STRING") # UDF with Spark SQL
<udf_name> = udf(func, return_type e.g. integertype() ) # UDF for use within the DataFrame API
# apply udf
df = spark.sql("SELECT udf_name(column_name) FROM some_table") #sparksql
df = df.withColumn("performance", udf_name(col("column_name")) )
# summary statistics for all/specifc columns
df.describe()
df.describe(["name","age"]).show()
# applies the function assessPerformance() to each row of DataFrame df
[assessPerformance(row) for row in df.collect()]