PySpark / Spark SQL Join DataFrames And DataSets – Let’s Learn The Powerful Stuff In 5 Min!

Spark SQL Join DataFrames And DataSets
Share this post and Earn Free Points!

In this post I will show you how to using Spark SQL join DataFrames And DataSets.

Introduction

Apache Spark

Large data sets can be processed quickly using Apache Spark, an open-source distributed computing system. It is written in Scala and has Java, Python, and R APIs. It was created by the Apache Software Foundation.

The Hadoop Distributed File System (HDFS), Apache Cassandra, and Amazon S3 are just a few of the data storage systems that can be used with Spark to perform a wide range of tasks, including data processing, machine learning, and graph processing.

Spark in-memory data processing capabilities, which enable it to process data operations much more quickly than conventional disk-based systems, are one of its key features. Additionally,

Spark capacity to conduct analytics and data processing on a group of computers enables quick and effective processing of very large data.

Basic JOIN In SQL

In SQL, a join operation combines rows from two or more tables based on a related column between them. The join condition defines how the rows from the tables are matched and combined. There are several types of joins in SQL, including:

  • The most typical kind of join is an INNER JOIN. Only rows with matching values across both tables are returned.
  • All the rows from the left table and any matching rows from the right table are returned by a LEFT JOIN (also known as a LEFT OUTER JOIN). The columns of the right table will return NULL values if there is no match.
  • Similar to a LEFT JOIN, a RIGHT JOIN returns all rows from the right table along with any matching rows from the left table.
  • When using a FULL JOIN (also known as a FULL OUTER JOIN), all rows from both tables are returned, and any matching rows are combined into a single row. If no match, NULL values are returned.

Not Typical JOIN in SQL

  • SEMI JOIN:Only the rows from the left table that have a matching value in the right table are returned by a semi-join operation. In other words, it doesn’t return the entire row from the second table; rather, it only returns the rows from the first table that match a row in the second table.
  • All the rows from the left table for which there is no match in the right table are returned by a LEFT ANTI JOIN. Only the rows from the left table that don’t match are returned. Another way to write it is LEFT EXCEPT JOIN.
  • The RIGHT ANTI JOIN returns all the rows from the right table for which there is no match in the left table. Only the rows from the right table that don’t match are returned. Another way to write it is RIGHT EXCEPT JOIN.
  • FULL ANTI JOIN: This kind of join returns the rows for which the other table contains no matches.

Apache Spark JOIN Function Source Code

Using the join() method in the DataFrame API, you can perform join operations on DataFrames in Apache Spark. The DataFrame to be joined, the join conditions, and the join type are some of the arguments that the join() method accepts.

1) join(right: Dataset[_]): DataFrame
2) join(right: Dataset[_], usingColumn: String): DataFrame
3) join(right: Dataset[_], usingColumns: Seq[String]): DataFrame
4) join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame
5) join(right: Dataset[_], joinExprs: Column): DataFrame
6) join(right: Dataset[_], joinExprs: Column, joinType: String): DataFrame

The following table presents the SQL join type mapped to Spark join string value which you must provide tin .join() function as a last argument.

SQL JOIN TYPESPARK JOIN STRING VALUE
INNERinner
FULL OUTERouter, full, fullouter, full_outer
LEFT OUTERleft, leftouter, left_outer
RIGHT OUTERright, rightouter, right_outer
CROSScross
LEFT ANTIanti, leftanti, left_anti
LEFT SEMIsemi, leftsemi, left_semi

Spark SQL Join DataFrames And DataSets

Please find the examples of join operations in Spark (Scala) using different data:

Spark INNER JOIN

PySpark / Spark SQL Join DataFrames And DataSets, SQL INNER JOIN, Spark inner join
PySpark / Spark SQL Join DataFrames And DataSets - Let's Learn The Powerful Stuff In 5 Min! 6
val customers = Seq(
 (1, "John Smith"),
 (2, "Jane Doe"),
 (3, "Bob Johnson")
).toDF("id", "name")

val orders = Seq(
 (1, 100),
 (2, 200),
 (3, 300)
).toDF("customer_id", "amount")

customers.join(orders, customers("id") === orders("customer_id"), "inner").show()

This example uses an inner join to connect the “orders” and “customers” DataFrames using the “id” and “customer_id” columns, respectively. For each order, the result will include the customer’s name and ID in addition to the total.

+---+-----------+-----------+------+
| id|       name|customer_id|amount|
+---+-----------+-----------+------+
|  1| John Smith|          1|   100|
|  2|   Jane Doe|          2|   200|
|  3|Bob Johnson|          3|   300|
+---+-----------+-----------+------+
PySpark INNER JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

customers = spark.createDataFrame([(1, "John Smith"), (2, "Jane Doe"), (3, "Bob Johnson")], ["id", "name"])
orders = spark.createDataFrame([(1, 100), (2, 200), (3, 300)], ["customer_id", "amount"])

customers.join(orders, customers.id == orders.customer_id, "inner").show()

Spark LEFT JOIN

PySpark / Spark SQL Join DataFrames And DataSets, SQL LEFT JOIN, Spark left join
PySpark / Spark SQL Join DataFrames And DataSets - Let's Learn The Powerful Stuff In 5 Min! 7
val departments = Seq(
 (1, "IT"),
 (2, "HR"),
 (3, "Marketing")
).toDF("id", "name")

val employees = Seq(
 (1, "John Smith", 1),
 (2, "Jane Doe", 2),
 (3, "Bob Johnson", 1)
).toDF("id", "name", "department_id")

employees.join(departments, employees("department_id") === departments("id"), "left").show()
PySpark LEFT JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

departments = spark.createDataFrame([(1, "IT"),(2, "HR"),(3, "Marketing")], ["id", "name"])
employees = spark.createDataFrame([(1, "John Smith", 1),(2, "Jane Doe", 2),(3, "Bob Johnson", 1)], ["id", "name", "department_id"])

employees.join(departments, employees.department_id == departments.id, "left").show()

This example uses a left join to combine the “employees” and “departments” data frames using the “department_id” and “id” columns, respectively. The output will list every employee along with the department to which they are assigned. It will display null if the employee doesn’t have a department.

+---+-----------+-------------+---+----+
| id|       name|department_id| id|name|
+---+-----------+-------------+---+----+
|  1| John Smith|            1|  1|  IT|
|  2|   Jane Doe|            2|  2|  HR|
|  3|Bob Johnson|            1|  1|  IT|
+---+-----------+-------------+---+----+

Spark RIGHT JOIN

PySpark / Spark SQL Join DataFrames And DataSets, SQL right JOIN, Spark right join
PySpark / Spark SQL Join DataFrames And DataSets - Let's Learn The Powerful Stuff In 5 Min! 8
val products = Seq(
 (1, "Product1", "Category1"),
 (2, "Product2", "Category2"),
 (3, "Product3", "Category3")
).toDF("id", "name", "category")

val sales = Seq(
 (1, 100, 1),
 (2, 200, 2),
 (3, 300, 1)
).toDF("id", "amount", "product_id")

products.join(sales, products("id") === sales("product_id"), "right").show()

It will produce results:

+---+--------+---------+---+------+----------+
| id|    name| category| id|amount|product_id|
+---+--------+---------+---+------+----------+
|  1|Product1|Category1|  1|   100|         1|
|  2|Product2|Category2|  2|   200|         2|
|  1|Product1|Category1|  3|   300|         1|
+---+--------+---------+---+------+----------+
PySpark RIGHT JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

products = spark.createDataFrame([(1, "Product1", "Category1"),(2, "Product2", "Category2"),(3, "Product3", "Category3")], ["id", "name", "category"])
sales = spark.createDataFrame([(1, 100, 1),(2, 200, 2),(3, 300, 1)], ["id", "amount", "product_id"])

products.join(sales, products.id == sales.product_id, "right").show()

Spark FULL JOIN

PySpark / Spark SQL Join DataFrames And DataSets, SQL full JOIN, Spark full join
PySpark / Spark SQL Join DataFrames And DataSets - Let's Learn The Powerful Stuff In 5 Min! 9
val stores = Seq(
 (1, "Store1"),
 (2, "Store2"),
 (3, "Store3")
).toDF("id", "name")

val inventory = Seq(
 (1, "Product1", 10),
 (2, "Product2", 20),
 (3, "Product3", 30),
 (4, "Product4", 40)
).toDF("product_id", "name", "quantity")

stores.join(inventory, stores("id") === inventory("product_id"), "full").show()

This example uses a full join to connect the “stores” and “inventory” DataFrames using the “id” and “product_id” columns, respectively. The outcome will display all of the stores and their inventory, and if a particular store has no inventory or a certain product is not available in any stores, it will display null.

+----+------+----------+--------+--------+
|  id|  name|product_id|    name|quantity|
+----+------+----------+--------+--------+
|   1|Store1|         1|Product1|      10|
|   2|Store2|         2|Product2|      20|
|   3|Store3|         3|Product3|      30|
|null|  null|         4|Product4|      40|
+----+------+----------+--------+--------+
PySpark FULL JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

stores = spark.createDataFrame([(1, "Store1"),(2, "Store2"),(3, "Store3")], ["id", "name"])
inventory = spark.createDataFrame([(1, "Product1", 10),(2, "Product2", 20),(3, "Product3", 30),(4, "Product4", 40)], ["product_id", "name", "quantity"])

stores.join(inventory, stores.id == inventory.product_id, "full").show()

Spark CROSS JOIN

val fruits = Seq(("Apple"),("Banana"),("Grapes"),("Strawberries")).toDF("name")
val color = Seq(("Red"),("Yellow"),("Green"),("Orange")).toDF("color")
fruits.crossJoin(color).show()

In this example, a cross join is used to join the “fruits” and “color” DataFrames. The end result will display every possible pairing of each color with every possible fruit variety.

+------------+------+
|        name| color|
+------------+------+
|       Apple|   Red|
|       Apple|Yellow|
|       Apple| Green|
|       Apple|Orange|
|      Banana|   Red|
|      Banana|Yellow|
|      Banana| Green|
|      Banana|Orange|
|      Grapes|   Red|
|      Grapes|Yellow|
|      Grapes| Green|
|      Grapes|Orange|
|Strawberries|   Red|
|Strawberries|Yellow|
|Strawberries| Green|
|Strawberries|Orange|
+------------+------+
PySpark CROSS JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

fruits = spark.createDataFrame([("Apple"),("Banana"),("Grapes"),("Strawberries")], ["name"])
colors = spark.createDataFrame([("Red"),("Yellow"),("Green"),("Orange")], ["color"])

fruits.crossJoin(colors).show()

Spark SEMI JOIN

val books = Seq(
  (1, "Book1", "Author1"),
  (2, "Book2", "Author2"),
  (3, "Book3", "Author3"),
  (4, "Book4", "Author4")
).toDF("id", "name", "author")

val sales = Seq(
  (1, 100),
  (3, 200)
).toDF("book_id", "sales")

books.join(sales, books("id") === sales("book_id"), "semi").show()

This example uses a semi-join to connect the “books” and “sales” data frames using the “id” and “book_id” columns, respectively. It keeps only the rows of the first table that match a row in the second table, so the result will display information about all the books that have sales but not the total amount of sales.

+---+-----+-------+
| id| name| author|
+---+-----+-------+
|  1|Book1|Author1|
|  3|Book3|Author3|
+---+-----+-------+
PySpark SEMI JOIN Equivalent
val books = Seq(
  (1, "Book1", "Author1"),
  (2, "Book2", "Author2"),
  (3, "Book3", "Author3"),
  (4, "Book4", "Author4")
).toDF("id", "name", "author")

val sales = Seq(
  (1, 100),
  (3, 200)
).toDF("book_id", "sales")

books.join(sales, books("id") === sales("book_id"), "semi").show()

Spark ANTI JOIN

val countries = Seq(
  (1, "US"),
  (2, "UK"),
  (3, "CN")
).toDF("id", "name")

val languages = Seq(
  (1, "English"),
  (2, "Chinese")
).toDF("country_id", "language")

countries.join(languages, countries("id") === languages("country_id"), "anti")

The join operation between two DataFrames, “countries” and “languages,” is demonstrated in this example. It is an anti join that is carried out on the “id” and “country_id” columns, respectively. The rows from the first Dataframe that have no matches in the second Dataframe are all returned by this join.

The result will list the nations that aren’t included in the languages table in the example I provided, which returns all the nations for which there are no languages.

Empty
PySpark ANTI JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

countries = spark.createDataFrame([(1, "US"),(2, "UK"),(3, "CN")], ["id", "name"])
languages = spark.createDataFrame([(1, "English"),(2, "Chinese")], ["country_id", "language"])

countries.join(languages, countries.id == languages.country_id, "anti").show()

Spark LEFT ANTI JOIN

val students = Seq(
  (1, "John Smith"),
  (2, "Jane Doe"),
  (3, "Bob Johnson")
).toDF("id", "name")

val grades = Seq(
  (1, "A"),
  (2, "B")
).toDF("student_id", "grade")

students.join(grades, students("id") === grades("student_id"), "left_anti").show()

In this example, the columns “id” and “student_id” are used to Left Anti join the “students” and “grades” DataFrames. All of the students who don’t have a grade will be returned.

+---+-----------+
| id|       name|
+---+-----------+
|  3|Bob Johnson|
+---+-----------+
PySpark LEFT ANTI JOIN Equivalent
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("JoinExample").getOrCreate()

students = spark.createDataFrame([(1, "John Smith"),(2, "Jane Doe"),(3, "Bob Johnson")], ["id", "name"])
grades = spark.createDataFrame([(1, "A"),(2, "B")], ["student_id", "grade"])

students.join(grades, students.id == grades.student_id, "left_anti").show()

Code On GitLab

Full code you can find on our GItLab repository.

Summary

You should pick the best type of join for your situation based on what you want to achieve since each type of join has a different use case and set of unique characteristics. Keep in mind that joins can be expensive operations, particularly with large data sets, so it’s crucial to optimize the join conditions and take alternative approaches like broadcast joining or bucketing into account.

Could You Please Share This Post? 
I appreciate It And Thank YOU! :)
Have A Nice Day!

How useful was this post?

Click on a star to rate it!

Average rating 5 / 5. Vote count: 1

No votes so far! Be the first to rate this post.

As you found this post useful...

Follow us on social media!

We are sorry that this post was not useful for you!

Let us improve this post!

Tell us how we can improve this post?