To work with Spark 2.0's DataFrame (which was initially done using RDD syntax), we work with Spark DataFrame in the below manner to work with Spark Data. Operations performed are automatically distributed across RDDs (Resilient Distributed Datasets)
The API documentation (available @ http://spark.apache.org/docs/latest/api/python/index.html for python) is exhaustive. However, in below blogpost, I will try to cover some of the basic operations that I had learnt from one of the courses I took on Udemy.
We will import the SparkSession (which is the entry point from pyspark.sql library like below):
from pyspark.sql import SparkSession
Then, we will be building an instance of the SparkSession like below in order to eventually get the dataframe variable df:
spark = SparkSession.builder.appName("Operations").getOrCreate()
df = spark.read.csv('appl_stock.csv',inferSchema=True,header=True)
df.printSchema()
df.columns
df.show()
df.describe() --> Used for Stats such as avg, stddev, min, max, count etc. on the integer typed column’s values present in the table.
Defining Schema on a DataFrame:
from pyspark.sql.types import StructField,StringType,IntegerType,StructType
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)] ==> Parameters here are columnName , DataType, Nullable
final_struc = StructType(fields=data_schema)
df = spark.read.json('people.json', schema=final_struc)
df.printSchema()
Filtering Data
A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFrame methods.
df.filter("Close<500").show()
df.filter("Close<500").select('Open').show()
df.filter("Close<500").select(['Open','Close']).show()
df.filter(df["Close"] < 200).show()
df.filter( (df["Close"] < 200) & (df['Open'] > 200) ).show() ==> The paranthesis around the inner comparisions are important
df.filter( (df["Close"] < 200) & ~(df['Open'] < 200) ).show()
df.filter(df["Low"] == 197.16).collect() ==> Collecting results as Python objects
row = result[0]
row.asDict() ===> To convert the DataRow as a dictionary object
for item in result[0]:
print(item)
Getting DataFrame type and Column Types from the DataFrame instance df and using select , head operations:
df['age']
type(df['age'])
df.select('age')
type(df.select('age'))
df.select('age').show()
df.head(2) ==> Returns list of Row objects
df.select(['age','name']).show()
Creating new columns in the Dataframe :
df.withColumnRenamed('age','supernewage').show() ==> Simple Rename
df.withColumn('half_age',df['age']/2).show()
Using SQL on the DataFrame instead of using .select / .head / df["colName"] and / or other comparison operations in the .select statements:
df.createOrReplaceTempView("people") ==> Register the DataFrame as a SQL temporary view
spark.sql("SELECT * FROM people WHERE age=30").show()
Working with DateTime columns using dataFrame.
from pyspark.sql import SparkSession
df = SparkSession.builder.appName("dates").getOrCreate().read.csv("appl_stock.csv",header=True,inferSchema=True)
from pyspark.sql.functions import format_number,dayofmonth,hour,dayofyear,month,year,weekofyear,date_format
df.select(dayofmonth(df['Date'])).show()
df.select(month(df['Date'])).show()
df.select(dayofyear(df['Date'])).show()
df.select(year(df['Date'])).show()
df.select(hour(df['Date'])).show()
You can also perform GroupBy and Ordering operations on DataFrame.
df.groupBy("Company").mean().show() -> To show only mean of all int dataTyped columns once grouped by Company
df.agg({'Sales':'max'}).show() -> multiple parameters could be given to perform aggregations on using this syntax.
from pyspark.sql.functions import countDistinct, avg,stddev
df.select(countDistinct("Sales").alias("Distinct Sales")).show()
sales_std = df.select(stddev("Sales").alias('std'))
sales_std.select(format_number('std',2)).show()
df.orderBy("Sales").show() ==> Ascending Sales
df.orderBy(df["Sales"].desc).show() ==> Descending Sales
Formatting the Average Close and Average Year from the dataset and formatting it to precision 2 and aliasing like below:
newdf = df.withColumn("Year",year(df['Date'])).show()
newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']].show()
result = newdf.groupBy("Year").mean()[['avg(Year)','avg(Close)']]
result = result.withColumnRenamed("avg(Year)","Year")
result = result.select('Year',format_number('avg(Close)',2).alias("Mean Close")).show()
Dropping rows containing null column Values or Replace them with some value operations:
Param how
If 'any', drop a row if it contains any nulls.
If 'all', drop a row only if all its values are null
Param thresh: int, default None
If specified, drop rows that have less than `thresh` non-null values. This overwrites the `how` parameter.
Usage:
df.na.drop().show() ==> Drop any row that contains missing data
df.na.drop(thresh=2).show() ==> Has to have at least 2 NON-null values. Else, that row will be dropped.
df.na.drop(subset=["Sales"]).show() => Drops all rows with Sales Column Value = Null
df.na.drop(how='any').show()
Filling rows with null values:
df.na.fill('NEW VALUE').show() -> All string typed columns get filled with 'NEW VALUE'
df.na.fill(0).show() -> All INT typed columns get filled with 0
A very common practice is to fill values with the mean value for the column, for example:
from pyspark.sql.functions import mean
mean_val = df.select(mean(df['Sales'])).collect()
mean_sales = mean_val[0][0]
df.na.fill(mean_sales,["Sales"]).show()
NOTE: This isn’t the end of the road. Exhaustive API documentation is available @ http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
Thanks for reading this through.
Comentarios