see what is columns in spark and rows in spark
selecting columns
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)
Literals:
- provide a mean to supply the constant value for comparisons/ transformation logic
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
| United States| Romania| 15| 1|
| United States| Croatia| 1| 1|
+-----------------+-------------------+-----+---+
adding/ rename columns:
df.withColumn("numberOne", lit(1)).show(2)
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
By default Spark is case insensitive; however, you can make Spark case sensitive by setting the
configuration via set spark.sql.caseSensitive true
Removing Columns
# dedicated method called drop:
df.drop("ORIGIN_COUNTRY_NAME").columns
#We can drop multiple columns by passing in multiple columns as arguments:
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")
Changing a Column’s Type (cast)
--in dataframe
df.withColumn("count2", col("count").cast("long"))
-- in SQL
SELECT *, cast(count as long) AS count2 FROM dfTable
filter (use filter/where keyword = the same)
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
# multiple conditions (all work; spark do filtering at once)
df.filter((col("act_date") >= "2016-10-01") & (col("act_date") <= "2017-04-01"))
df.filter("act_date >='2016-10-01' AND act_date <='2017-04-01'")
df.filter("act_date <='2017-04-01'").filter("act_date >='2016-10-01'")
-- in SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
Getting Unique Rows
# .distinct() returns dataframe
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
df.select("ORIGIN_COUNTRY_NAME").distinct()
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
Random Samples
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
Random Splits
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False
Union
- To union two DataFrames, you must be sure that they have the same schema and number of columns; otherwise, the union will fail.
df.union(newDF)
Sorting Rows:
- The default is to sort in ascending order
- need desc asc method if we want to speify the order
asc_nulls_first, desc_nulls_first, asc_nulls_last, or desc_nulls_last
to deal with the sorting order with nulls
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
limit:
# in Python
df.orderBy(expr("count desc")).limit(6).show()
-- in SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 6
Repartition and Coalesce:
- Repartition will incur a full shuffle of the data
- Coalesce, on the other hand, will not incur a full shuffle and will try to combine partitions.
df.rdd.getNumPartitions()
df.repartition(5)
# if you know which columns always being filtered
df.repartition(col("DEST_COUNTRY_NAME"))
df.repartition(5, col("DEST_COUNTRY_NAME")) # specify the partition number as well
# coalesce the partitioned data
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
Collecting Rows:
- running take or head requires moving data into the application’s driver process, and doing so with a very large and can crash the driver process
# in Python
collectDF = df.limit(10) #New DataFrame as an transformation, could takes longer
collectDF.take(5) # selects the first N rows in array
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()