PySpark Tutorial

In this PySpark Tutorial, we will understand why PySpark is becoming popular among data engineers and data scientist. This PySpark Tutorial will also highlight the key limilation of PySpark over Spark written in Scala (PySpark vs Spark Scala). The PySpark is actually a Python API for Spark and helps python developer/community to collaborat with Apache Spark using Python. In addition, PySpark, helps you interface with Resilient Distributed Datasets (RDDs) & DataFrames (DF)in Apache Spark and Python programming language. This has been achieved by taking advantage of the Py4j library.

Py4J Library

Py4J is a very popular library which is integrated within PySpark and allows python program to dynamically interface with JVM objects which is the underlying techology runs Spark programs.

What is Spark?

Apache Spark is an open source distributed computing engine originally developed by Matei Zaharia as a part of his PhD work. The first version of Spark was released in 2012. Since then, in 2013, Zaharia co-founded and has become the CTO at Databricks. Apache Spark is fast, easy to use framework, that allows you to solve a wide variety of complex data problems whether semi-structured, structured, streaming, and/or machine learning / data sciences. Apache Spark allows the user to read, transform, and aggregate data, as well as train and deploy sophisticated statistical models with ease. The Spark APIs are accessible in Java, Scala, Python, R and SQL.

Apache Spark can easily run locally on a laptop on (windows installation), yet it can also easily be deployed in standalone mode, over YARN, or Apache Mesos – either on your local cluster or in the cloud. It can read and write from a diverse data sources including (but not limited to) HDFS, Apache Cassandra, Apache HBase, and S3 (AWS Simple Storage Service)

Apache Spark are several already implemented and tuned algorithms, statistical models, and frameworks: MLlib and ML for machine learning, GraphX and GraphFrames for graph processing, and Spark Streaming (DStreams and Structured). Spark allows the user to combine these libraries seamlessly in the same application.

https://spark.apache.org/docs/latest/api/python/index.html

PySpark – Spark Dataframes (DF)

The key data type used in PySpark is the Spark Dataframe. This object can be thought of as a structured table distributed across a cluster and has functionality that is similar to Pandas. If you want to do distributed computation using PySpark, then you’ll need to perform operations on Spark dataframes, and not other python data types.

It is also possible to use Pandas dataframes, by calling toPandas() on a Spark dataframe, which returns a pandas object. However, this function should generally be avoided except when working with small dataframes, because it pulls the entire object into memory on a single node.

One of the key differences between Pandas and Spark dataframes is eager versus lazy execution. In PySpark, operations are delayed until a result is actually needed in the pipeline

Reading Data using pySpark

The first steps to learn when working with Spark is loading a data set into a dataframe. Once data has been loaded into a dataframe, you can apply transformations, perform analysis and modeling, create visualizations, and persist the results. In Python, you can load files directly from the local file system using Pandas:

import pandas as pd
pd.read_csv("sample.csv")

n PySpark, reading a CSV file is a little different and comes with additional options. Since Spark is a distributed computing engine, there is no local storage and therefore a distributed file system such as HDFS, Databricks file store (DBFS), or S3 needs to be used to specify the path of the file. While running in local machine, you can still specify local file system.

sample_file = "/my/file/location/sample.csv"
df = spark
.read
.format("csv")
.option("inferSchema",True)
.option("header", True)
.load(file_location)
display(df)

Writing Data Using pySpark

Like loading data with Spark, it’s also not advisable to save data to local storage when using PySpark. Instead, you should used a distributed file system such as S3 or HDFS.

# DBFS (Parquet)
df.write.save('/my/distributed/storage/location',format='parquet')
# S3 (Parquet)
df.write.parquet("s3a://sample_bucket/sample", mode="overwrite")

When saving a dataframe in parquet format, it is often partitioned into multiple files, as shown in the image below.

If you need the results in a CSV file, then a slightly different output step is required.

# DBFS (CSV)
df.write.save('/my/distributed/storage/location/sample.csv', format='csv')#

S3 (CSV)
df
.coalesce(1)
.write
.format("com.databricks.spark.csv")
.option("header", "true")
.save("s3a://sample_bucket/sample.csv")

Transforming Data Using pySpark

Many different types of operations (transformations & action) can be performed on Spark Dataframes, much like the wide variety of operations that can be applied on Pandas dataframes. One of the ways of performing operations on Spark dataframes is via Spark SQL, which enables dataframes to be queried as if they were tables.

#create a temporary view from the dataframe.
df.createOrReplaceTempView("orders")

#Now display the top 5 category
display(spark.sql("""
select category, count(1) as category_sales
from orders
group by category
limit 5
"""))