df.select()
spark.read
RDD → DataFrame
PySpark
Big Data
Home/Study Lab/Guides/PySpark Complete Guide
DATA ENGINEERING GUIDE

PySpark
Complete Guide

From transformations and actions to Spark architecture. Master PySpark with interactive before-and-after demos, interview prep, and real ETL pipeline walkthroughs.

6 Sections
35 min read
Intermediate
Interactive Demos
Begin Learning
Contents
Top 20 Transformations Top PySpark Actions Interview Topics Real ETL Pipeline Spark Architecture Advanced Concepts
01

Top 20 PySpark Transformations

Transformations are lazy operations that define a computation but don't execute until an action is called. Click any transformation below to see before & after data.

What Are Transformations?

In PySpark, transformations create a new DataFrame from an existing one without modifying the original. They are lazy — Spark builds a logical plan (DAG) but delays execution until an action like show() or collect() triggers it. This allows Spark's Catalyst optimizer to rearrange and optimize the entire chain before running a single line.

Lazy Evaluation

No computation happens when you call df.filter() or df.select(). Spark only records what you want to do. The actual processing happens when you call an action like .show(), .count(), or .write.

Interactive Transformation Explorer

Click a transformation to see the PySpark code and the before/after data side by side.

📋 Before (Input)
✨ After (Output)

More Transformations

Additional powerful transformations every PySpark developer should know.

📋 Before (Input)
✨ After (Output)
02

Top PySpark Actions

Actions trigger execution of the transformation chain and return results to the driver or write data to storage.

Transformations vs Actions

While transformations are lazy and build a DAG, actions are what trigger actual computation. When you call an action, Spark's DAG scheduler converts the logical plan into physical stages and tasks, distributes them across executors, and returns the result.

Transformations (Lazy)

  • Return a new DataFrame
  • Build the DAG plan
  • No computation happens
  • Narrow: select, filter, map
  • Wide: groupBy, join, orderBy

Actions (Eager)

  • Return values or write data
  • Trigger DAG execution
  • Send results to driver
  • Terminal: show, count, collect
  • Output: write, save, saveAsTable

Interactive Action Explorer

Click an action to see the code and the output it produces. Notice how each action triggers the full Spark execution.

📤 Output
03

Common PySpark Interview Topics

Click any card to reveal the answer. These are the most frequently asked Spark & PySpark questions in data engineering interviews.

Question 01
What is the difference between Spark and Hadoop MapReduce?
Spark processes data in-memory (up to 100x faster) while Hadoop MapReduce writes intermediate results to disk after every map and reduce step. Spark also supports iterative algorithms, real-time streaming, and has a unified API for batch, SQL, ML, and graph processing. Hadoop MR only does batch.
Click to reveal answer
Question 02
What is the difference between RDD, DataFrame, and Dataset?
RDD: Low-level, unstructured, no schema, no optimizer. DataFrame: High-level, schema-based, uses Catalyst optimizer, columnar. Dataset: Type-safe DataFrame (Scala/Java only). In PySpark, DataFrame is the primary API. DataFrames are ~10x faster than RDDs due to Catalyst + Tungsten optimizations.
Click to reveal answer
Question 03
What is Lazy Evaluation and why does Spark use it?
Spark doesn't execute transformations immediately. It builds a DAG (Directed Acyclic Graph) of all transformations. When an action is called, the Catalyst optimizer analyzes the entire DAG to: (1) eliminate unnecessary steps, (2) push filters early (predicate pushdown), (3) combine operations, (4) optimize join strategies. This makes execution far more efficient.
Click to reveal answer
Question 04
What is a DAG in Spark?
A Directed Acyclic Graph is Spark's execution plan. Each node represents an RDD/DataFrame and edges represent transformations. The DAG Scheduler divides the graph into stages (separated by shuffle boundaries). Each stage contains tasks that run in parallel on executors. No cycles allowed — data flows one direction.
Click to reveal answer
Question 05
What is a Broadcast Join and when should you use it?
When joining a large table with a small table (<10MB default), Spark broadcasts the small table to all executors, avoiding an expensive shuffle of the large table. Use broadcast(df_small) hint. Ideal for dimension table lookups. Avoids data movement across the cluster.
Click to reveal answer
Question 06
What is the Catalyst Optimizer?
Catalyst is Spark SQL's query optimizer. It takes a logical plan, applies rule-based and cost-based optimizations (predicate pushdown, column pruning, join reordering), then generates an optimized physical plan. It also uses Tungsten for memory management and code generation, bypassing JVM garbage collection.
Click to reveal answer
Question 07
What is the difference between narrow and wide transformations?
Narrow (map, filter, select): each input partition maps to one output partition — no data shuffle needed. Wide (groupBy, join, orderBy): data from multiple partitions must be shuffled across the network. Wide transformations create stage boundaries and are expensive. Minimize shuffles for better performance.
Click to reveal answer
Question 08
How do you handle data skew in PySpark?
Data skew means some partitions have much more data than others, causing slow tasks. Solutions: (1) Salting — add random prefix to skewed keys, (2) Broadcast join for small tables, (3) Repartition on a different column, (4) AQE (Adaptive Query Execution) in Spark 3.x auto-handles skew by splitting large partitions.
Click to reveal answer
Question 09
What is partitioning and why is it important?
Partitioning divides data into chunks distributed across executors. In-memory partitioning (repartition/coalesce) controls parallelism. Disk partitioning (partitionBy) organizes output files by column (e.g., date). Proper partitioning = parallel processing, partition pruning (skip irrelevant data), and balanced workload across the cluster.
Click to reveal answer
Question 10
cache() vs persist() — what is the difference?
cache() stores the DataFrame in memory only (MEMORY_AND_DISK by default in newer versions). persist() lets you choose the storage level: MEMORY_ONLY, MEMORY_AND_DISK, DISK_ONLY, etc. Use caching when a DataFrame is reused multiple times in the same job. Always unpersist() when done to free resources.
Click to reveal answer
04

Real Data Engineering ETL Pipeline

Walk through a complete 12-step production ETL pipeline built with PySpark. Click Next to advance through each stage and see the code + data at every step.

Production PySpark ETL Pipeline

This pipeline reads raw CSV data, cleans it, transforms it, and writes the final output as partitioned Parquet — exactly as you would in a real data engineering job.

Spark
SparkSession
Extract
Read CSV
Inspect
Profile Data
Clean
Dedupe & Nulls
Transform
Enrich & Agg
Load
Write Parquet
🚀 Step 1: Create SparkSession
Every PySpark application starts by creating a SparkSession — the entry point to all Spark functionality.
from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("ETL_Pipeline") \ .config("spark.sql.adaptive.enabled", "true") \ .config("spark.sql.shuffle.partitions", "200") \ .getOrCreate() print(f"Spark version: {spark.version}") # Spark version: 3.5.0
Step 1 of 6

Full Pipeline Code Summary

Here is the complete pipeline in one view. Each step builds on the previous, following the Extract → Clean → Transform → Load pattern.

# ===== COMPLETE ETL PIPELINE ===== # 1. SparkSession from pyspark.sql import SparkSession, functions as F spark = SparkSession.builder.appName("ETL").getOrCreate() # 2. Extract df = spark.read.csv("s3://data/raw/orders.csv", header=True, inferSchema=True) # 3. Inspect df.printSchema() df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show() # 4. Clean cleaned = df.dropDuplicates(["order_id"]) \ .fillna({"city": "Unknown", "amount": 0}) \ .filter(F.col("amount") >= 0) # 5. Transform result = cleaned \ .withColumn("order_date", F.to_date("timestamp")) \ .withColumn("amount_usd", F.col("amount") * 0.0036) \ .groupBy("city", "order_date").agg( F.sum("amount_usd").alias("total_revenue"), F.count("*").alias("order_count") ) # 6. Load result.write.mode("overwrite") \ .partitionBy("order_date") \ .parquet("s3://data/gold/daily_city_revenue/")
05

Spark Architecture

Understanding how Spark distributes and executes your PySpark code across a cluster. Click any component in the diagram to learn more.

Spark Cluster Architecture

Spark follows a master-worker architecture. The Driver program orchestrates work, and Executors run tasks in parallel across the cluster. Click each component below to understand its role.

Driver Program SparkSession + SparkContext Cluster Manager YARN / Mesos / K8s / Standalone Executor 1 Task 1 Task 2 Cache (Memory/Disk) Executor 2 Task 3 Task 4 Cache (Memory/Disk) Executor 3 Task 5 Task 6 Cache (Memory/Disk) DAG Scheduler Task Scheduler Catalyst Optimizer Internal Components

Click a component above

Select any box in the architecture diagram to learn about its role in the Spark cluster.

How Spark Executes Your Code

When you write PySpark code, here is exactly what happens under the hood:

1. Logical PlanCatalyst parses your code into an unresolved logical plan
2. AnalysisResolves column names, types, and table references
3. OptimizationCatalyst applies 100+ optimization rules (predicate pushdown, etc.)
4. Physical PlanGenerates multiple physical plans and picks the best via cost model
5. DAG CreationDAG Scheduler splits plan into stages at shuffle boundaries
6. Task LaunchTask Scheduler sends tasks to executors via Cluster Manager
7. ExecutionExecutors run tasks in parallel, reading/writing data partitions
8. ResultResults collected back to driver (for actions) or written to storage

Key Takeaways

Transformations are Lazy, Actions are Eager

Always chain transformations and let Catalyst optimize them. Only trigger actions when you need results. This is the single most important concept in PySpark.

Minimize Shuffles

Wide transformations (groupBy, join) cause data shuffling across the network. Use broadcast joins for small tables, partition your data wisely, and avoid unnecessary orderBy operations.

DataFrames Over RDDs

Always use the DataFrame API. It benefits from Catalyst optimization and Tungsten execution, making it 10x+ faster than raw RDDs. RDDs bypass the optimizer entirely.

06

Advanced PySpark Concepts

Essential advanced topics every PySpark developer must master — from UDFs and partitioning strategies to Spark SQL and broadcast variables.

Repartition vs Coalesce

Both control the number of partitions, but they work very differently under the hood.

🔄 repartition(n)

  • Creates exactly n partitions
  • Full shuffle across the network
  • Can increase or decrease partitions
  • Evenly distributes data
  • Use when increasing parallelism

📦 coalesce(n)

  • Reduces to n partitions only
  • No shuffle — merges locally
  • Cannot increase partitions
  • May create uneven partitions
  • Use before writing to reduce small files
# Before writing, reduce 200 partitions to 10 (no shuffle) df.coalesce(10).write.parquet("output/") # Need more parallelism for heavy computation (full shuffle) df.repartition(100).filter(heavy_condition)

User Defined Functions (UDFs)

UDFs let you extend PySpark with custom Python logic when built-in functions aren't enough. But use them wisely — they bypass Catalyst optimization.

Built-in Functions

  • Optimized by Catalyst
  • Run in JVM (fast)
  • Vectorized execution
  • F.upper(), F.when(), etc.
  • Always prefer these first

Python UDFs

  • Bypass Catalyst optimizer
  • Serialize data to Python (slow)
  • Row-by-row execution
  • @udf decorator
  • Use Pandas UDF for better perf
# Regular UDF (slow - avoid if possible) from pyspark.sql.functions import udf from pyspark.sql.types import StringType @udf(returnType=StringType()) def clean_name(name): return name.strip().title() if name else "Unknown" df.withColumn("clean_name", clean_name("name")) # Pandas UDF (vectorized - 10-100x faster than regular UDF) from pyspark.sql.functions import pandas_udf import pandas as pd @pandas_udf(StringType()) def clean_name_fast(s: pd.Series) -> pd.Series: return s.str.strip().str.title().fillna("Unknown") df.withColumn("clean_name", clean_name_fast("name"))

Spark SQL

You can write SQL queries directly on DataFrames by registering them as temporary views. This is powerful for analysts who prefer SQL syntax.

# Register DataFrame as a temporary SQL view df.createOrReplaceTempView("orders") # Run SQL queries result = spark.sql(""" SELECT city, COUNT(*) as total_orders, SUM(amount) as total_revenue, AVG(amount) as avg_order FROM orders WHERE amount > 0 GROUP BY city ORDER BY total_revenue DESC """) result.show() # +---------+------------+-------------+---------+ # | city|total_orders|total_revenue|avg_order| # +---------+------------+-------------+---------+ # | Karachi| 2| 5700| 2850.0| # | Lahore| 1| 1800| 1800.0| # |Islamabad| 1| 950| 950.0| # +---------+------------+-------------+---------+

SQL vs DataFrame API

Both produce the same execution plan — Catalyst optimizes them identically. Use whichever your team prefers. Many production pipelines mix both: SQL for complex aggregations, DataFrame API for programmatic transformations.

Broadcast Variables & Accumulators

Two special shared variables for distributed computing:

📡 Broadcast Variables

  • Read-only data sent to all executors
  • Cached once per executor (not per task)
  • Avoid sending large data repeatedly
  • Used for lookup tables, configs
  • Also powers broadcast joins

🔢 Accumulators

  • Write-only counters from executors
  • Only driver can read the value
  • Used for counting errors, metrics
  • Not guaranteed in transformations
  • Reliable only inside actions
# Broadcast variable - send lookup dict to all executors city_lookup = {"KHI": "Karachi", "LHR": "Lahore", "ISB": "Islamabad"} bc_cities = spark.sparkContext.broadcast(city_lookup) @udf(StringType()) def map_city(code): return bc_cities.value.get(code, "Unknown") # Accumulator - count bad records across all executors bad_records = spark.sparkContext.accumulator(0) def process_row(row): if row.amount is None: bad_records.add(1) return row df.foreach(process_row) print(f"Bad records: {bad_records.value}") # Bad records: 1

More Essential Concepts

Checkpointing

Breaks the DAG lineage by materializing data to disk. Prevents stack overflow on deep lineage chains. Use df.checkpoint() after 10+ chained transformations or in iterative algorithms.

Adaptive Query Execution (AQE)

Spark 3.x feature that re-optimizes queries at runtime based on actual data statistics. Handles data skew, coalesces small partitions, and switches join strategies dynamically. Enable with spark.sql.adaptive.enabled=true.

Delta Lake

Open-source storage layer that adds ACID transactions, schema enforcement, and time travel to Parquet files. The foundation of modern Lakehouse architecture. Write with df.write.format("delta").

Shuffle Partitions

Default is 200 partitions after any shuffle (groupBy, join). Too many = overhead from small tasks. Too few = memory pressure. Tune with spark.sql.shuffle.partitions based on data size (~128MB per partition).

Data Skew Handling

When one partition has 100x more data than others, it bottlenecks the entire job. Solutions: salt keys, use broadcast joins, enable AQE skew join optimization, or repartition on a different column.

Spark Streaming

Process real-time data streams with the same DataFrame API using Structured Streaming. Supports Kafka, files, sockets as sources. Triggers: micro-batch or continuous processing mode.

Continue Your Journey