SwiftTechMinutes

Concise and Comprehensive Insights on Various Technologies.

Pyspark

PySpark Dataframe Cheat Sheet

Pyspark Dataframe Cheat Sheet


PySpark Dataframe Cheat Sheet

Creating Dataframe from different sources in Pyspark.

In PySpark, we can create a DataFrame from different sources such as CSV files, JSON files, Parquet files, Hive tables, and more. Here are examples of creating DataFrames from different sources

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Read a CSV file
df = spark.read.csv('path/file.csv', header=True, inferSchema=True)


# Read a JSON file
df = spark.read.json('path/file.json')


# Read a Parquet file
df = spark.read.parquet('path/file.parquet')


# Read a Hive table
df = spark.sql('SELECT * FROM my_table')


# Configure JDBC connection properties
jdbc_url = "jdbc:mysql://localhost:3306/my_database"
connection_properties = {
    "user": "my_username",
    "password": "my_password",
    "driver": "com.mysql.jdbc.Driver"
}

# Read a table from a JDBC connection
df = spark.read.jdbc(url=jdbc_url, table="my_table", properties=connection_properties)


# Configure COSMOS db connection properties
pip install azure-cosmosdb-spark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CosmosDBIntegration") \
    .config("spark.cosmos.accountEndpoint", "YOUR_COSMOSDB_ENDPOINT") \
    .config("spark.cosmos.accountKey", "YOUR_COSMOSDB_ACCOUNT_KEY") \
    .config("spark.cosmos.database", "YOUR_COSMOSDB_DATABASE") \
    .config("spark.cosmos.container", "YOUR_COSMOSDB_CONTAINER") \
    .getOrCreate()

# Read a container from a COSMOS db connection
df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.read.partitioning.strategy", "Restrictive") \
    .option("spark.cosmos.read.inferSchema.enabled", "true") \
    .load()


Inspect Data in Pyspark Dataframe

To inspect data in a DataFrame in PySpark, you can use various methods and functions to view, analyze, and understand the structure and content of the data. Here are some common techniques for inspecting data in a PySpark DataFrame

df.show()

df.printSchema()

print(df.columns)

print(df.count())

df.describe().show()

df.head(5)  # Returns the first 5 rows
df.tail(10)  # Returns the last 10 rows

sample_df = df.sample(fraction=0.1, seed=42)  # Takes 10% random sample of the DataFrame
sample_df.show()

df.select("column_name1", "column_name2").show()

df.groupBy("column_name").agg({"column_name": "count", "other_column": "sum"}).show()


These are just a few examples of how you can inspect and analyze data in a PySpark DataFrame. You can explore the PySpark documentation for more information on DataFrame methods and functions

Cleansing in Pyspark Dataframe

In PySpark, we can handle missing values and replace them with specific values or apply various strategies to handle missing data. Here are some techniques for handling missing values in a DataFrame

# Drop rows with any missing values
df.dropna()


# Drop rows with missing values in specified columns
df.dropna(subset=["column1", "column2"])


# Fill missing values with a specific value
df.fillna("replacement_value")

# Fill missing values with a constant
df.fillna(0)


from pyspark.sql.functions import when

# Replace values based on condition
df.withColumn("new_column", when(df["column"] > 10, "replacement_value").otherwise(df["column"]))


PySpark Dataframe Cheat Sheet

Filter in Pyspark Dataframe

In PySpark, we can use the filter() method or the where() method to filter rows in a DataFrame based on specific conditions

# Import necessary modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Read data into DataFrame
df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

# Filter rows using filter() method
filtered_df = df.filter(df["column_name"] > 10)

# Filter rows using where() method
filtered_df = df.where(df["column_name"] > 10)

# Filter rows using SQL-like syntax
filtered_df = df.filter("column_name > 10")

# Filter rows using SQL-like syntax with col() function
filtered_df = df.filter(col("column_name") > 10)

# apply multiple conditions using logical operators like and, or, and not
filtered_df = df.filter((df["column1"] > 10) & (df["column2"] == "value"))

# Can chain multiple filters together
filtered_df = df.filter(df["column1"] > 10).filter(df["column2"] == "value")

Note that the filter() and where() methods are interchangeable and achieve the same filtering functionality.

Column Operations in Pyspark Dataframe

In PySpark, you can perform various column operations on a DataFrame using built-in functions, expressions, and methods. Here are some common column operations in PySpark:

# Select a single column
df.select("column_name")

# Select multiple columns
df.select("column_name1", "column_name2")

# Select columns using indexing
df["column_name"]


# Add a new column with a static value
df.withColumn("new_column", lit("some_value"))

# Add a new column by applying a function on existing columns
df.withColumn("new_column", col("column_name1") + col("column_name2"))

# Assign a new column expression to a DataFrame
df = df.withColumn("new_column", col("column_name1") + col("column_name2"))


# Rename a column using withColumnRenamed()
df.withColumnRenamed("old_column", "new_column")

# Create a new DataFrame with renamed columns
df = df.withColumnRenamed("old_column", "new_column")


# Drop a single column
df.drop("column_name")

# Drop multiple columns
df.drop("column_name1", "column_name2")


from pyspark.sql.functions import col, upper

# Apply a function using select()
df.select(col("column_name").alias("new_column"))

# Apply a function using withColumn()
df.withColumn("new_column", upper(col("column_name")))


from pyspark.sql.functions import sum, avg

# Aggregate columns using agg()
df.agg(sum("column_name").alias("total"), avg("column_name").alias("average"))

# Group by columns and perform aggregations
df.groupBy("column_name").agg(sum("column1"), avg("column2"))


These are some of the common column operations in PySpark

These are some common techniques for handling missing values in a PySpark DataFrame.

Joins in Pyspark Dataframe

In PySpark, we can perform joins on DataFrames to combine data from multiple DataFrames based on a common key or condition. PySpark supports various types of joins, including inner join, outer join, left join, right join, and cross join. Here are examples of how to perform joins in PySpark DataFrame:

# Perform inner join
joined_df = df1.join(df2, df1["key_column"] == df2["key_column"], "inner")

# Join using SQL-like syntax
joined_df = df1.join(df2, "key_column", "inner")


# Perform left join
joined_df = df1.join(df2, df1["key_column"] == df2["key_column"], "left")

# Join using SQL-like syntax
joined_df = df1.join(df2, "key_column", "left")


# Perform right join
joined_df = df1.join(df2, df1["key_column"] == df2["key_column"], "right")

# Join using SQL-like syntax
joined_df = df1.join(df2, "key_column", "right")


# Perform full outer join
joined_df = df1.join(df2, df1["key_column"] == df2["key_column"], "outer")

# Join using SQL-like syntax
joined_df = df1.join(df2, "key_column", "outer")


# Perform cross join
joined_df = df1.crossJoin(df2)

# Join using SQL-like syntax
joined_df = df1.crossJoin(df2)

These are examples of different join types you can perform in PySpark DataFrame.

Set Operation in Pyspark Dataframe

In PySpark, we can perform set operations on DataFrames to combine or compare their rows. PySpark supports set operations such as union, intersect, and subtract. Here are examples of how to perform set operations on PySpark DataFrames:

# Perform union
union_df = df1.union(df2)

# Perform intersection
intersection_df = df1.intersect(df2)

# Perform difference
difference_df = df1.subtract(df2)
difference_df = df1.except(df2)

PySpark Dataframe Cheat Sheet

Analytic Functions in Pyspark Dataframe


In PySpark, we can use analytic functions (also known as window functions) to perform calculations and aggregations on groups of rows within a DataFrame. Analytic functions operate on a set of rows called a “window” and can compute values based on the data in the window. Here are some examples of how to use analytic functions in PySpark DataFrames:

from pyspark.sql.window import Window
from pyspark.sql.functions import sum, avg, max, min

# Define a window specification
windowSpec = Window.partitionBy("partition_column").orderBy("column_name")

# Add a column with the sum of a column within the window
df.withColumn("sum_column", sum("column_name").over(windowSpec))

# Add a column with the average of a column within the window
df.withColumn("avg_column", avg("column_name").over(windowSpec))

# Add a column with the maximum value of a column within the window
df.withColumn("max_column", max("column_name").over(windowSpec))

# Add a column with the minimum value of a column within the window
df.withColumn("min_column", min("column_name").over(windowSpec))



# Applying Custom Functions within a Window
from pyspark.sql.window import Window
from pyspark.sql.functions import expr

# Define a window specification
windowSpec = Window.orderBy("column_name").rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Add a column with a custom function applied to the data within the window
df.withColumn("custom_function", expr("your_custom_function(column_name)").over(windowSpec))

UDF in Pyspark Dataframe

Let’s say we have a DataFrame with a column called “name” that contains strings. We want to create a new column called “name_length” that calculates the length of each name using a UDF

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [("John"),
        ("Alice"),
        ("Michael")]

# Create the DataFrame
df = spark.createDataFrame(data, ["name"])

# Define the UDF
def calculate_length(name):
    return len(name)

# Wrap the UDF with udf() and specify the return type
calculate_length_udf = udf(calculate_length, IntegerType())

# Apply the UDF to the DataFrame
df = df.withColumn("name_length", calculate_length_udf(df["name"]))

# Show the result
df.show()

Writing back Pyspark Dataframe into different formats

In PySpark, we can write a DataFrame back to different file formats or storage systems using the write method. Here are examples of how to write a DataFrame to various formats

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [("John", 25),
        ("Alice", 30),
        ("Michael", 35)]

# Create the DataFrame
df = spark.createDataFrame(data, ["name", "age"])

# Write DataFrame to Parquet format
df.write.parquet("path/to/output.parquet")

# Write DataFrame to CSV format
df.write.csv("path/to/output.csv")

# Write DataFrame to JSON format
df.write.json("path/to/output.json")

# Write DataFrame to ORC format
df.write.orc("path/to/output.orc")

# Write DataFrame to Hive table
df.write.saveAsTable("hive_table_name")


Detailed reference Click Here.

We value your feedback and are always ready to assist you. Please feel free to Contact Us.


FAQs.

What is PySpark Dataframe?

PySpark DataFrames are designed to handle big data processing tasks and provide a high-level API that simplifies data manipulation and analysis. They are widely used in data engineering, data preprocessing, data exploration, and machine learning tasks using PySpark.

What is Cheat Sheet?

Cheat sheets are valuable resources that save time, improve productivity, and serve as handy references for both beginners and experienced practitioner

One thought on “PySpark Dataframe Cheat Sheet

  • The articles you write help me a lot and I like the topic

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *

Databricks Lakehouse features