Spark Nested Data Types
In the previous article on Higher-Order Functions, we described three complex data types: arrays, maps, and structs and focused on arrays in particular. In this follow-up article, we will take a look at structs and see two important functions for transforming nested data that were released in Spark 3.1.1 version. For the code, we will use Python API.
Struct
The StructType
is a very important data type that allows representing nested
hierarchical data. It can be used to group some fields together. Each element
of a StructType
is called StructField and it has a name and also a type. The
elements are also usually referred to just as fields or subfields, and they are
accessed by the name. The StructType
is also used to represent the schema of the
entire DataFrame. Let’s see a simple example
from pyspark.sql.types import *
my_schema = StructType([
StructField('id', LongType()),
StructField('country', StructType([
StructField('name', StringType()),
StructField('capital', StringType())
])),
StructField('currency', StringType())
])
l = [
(1, {'name': 'Italy', 'capital': 'Rome'}, 'euro'),
(2, {'name': 'France', 'capital': 'Paris'}, 'euro'),
(3, {'name': 'Japan', 'capital': 'Tokyo'}, 'yen')
]
df = spark.createDataFrame(l, schema=my_schema)
df.printSchema()
root
|-- id: long (nullable = true)
|-- country: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- capital: string (nullable = true)
|-- currency: string (nullable = true)
df.show()
+---+---------------+--------+
| id| country|currency|
+---+---------------+--------+
| 1| {Italy, Rome}| euro|
| 2|{France, Paris}| euro|
| 3| {Japan, Tokyo}| yen|
+---+---------------+--------+
The created DataFrame has one struct country
that has two subfields:
name
and capital
.
Creating a struct
There are at least four basic ways how to create a StructType
in the DataFrame.
The first one we have already seen above — create DataFrame from a local collection.
The second and very common way is that it will come by reading data from a source
that supports complex data structures, such as JSON or Parquet. Next, there are
some functions that will create a struct as a result. One particular example of
such transformation is grouping by a window that will produce a struct with two
subfields start
and end
as you can see here:
l = [(1, 10, '2021-01-01'), (2, 20, '2021-01-02')]
dx = spark.createDataFrame(l, ['id', 'price', 'date'])
(
dx
.groupBy(window('date', '1 day'))
.agg(sum('price').alias('daily_price'))
).printSchema()
root
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- daily_price: long (nullable = true)
The fourth way how to create a struct is by using the function struct()
. The
function will create a StructType
from other columns that are passed as arguments
and the StructFields
will have the same names as the original columns unless we
rename them using alias()
:
df.withColumn('my_struct', struct('id', 'currency')).printSchema()
root
|-- id: long (nullable = true)
|-- country: struct (nullable = true)
| |-- name: string (nullable = true)
| |-- capital: string (nullable = true)
|-- currency: string (nullable = true)
|-- my_struct: struct (nullable = false)
| |-- id: long (nullable = true)
| |-- currency: string (nullable = true)
Here, we created a column my_struct
that has two subfields that are derived from
two columns that were present in the DataFrame.
Accessing the elements
As we mentioned above the subfields of a struct are accessed by the name, and it is done with a dot notation:
df.select('country.capital').show()
+-------+
|capital|
+-------+
| Rome|
| Paris|
| Tokyo|
+-------+
What might be not obvious is that this works also for arrays of structs. Let’s
assume that we have an array countries
and each element of the array is a struct.
If we want to access only the capital
subfield of each struct we would do it exactly
in the same way and the resulting column would be an array containing all capitals:
my_new_schema = StructType([
StructField('id', LongType()),
StructField('countries', ArrayType(StructType([
StructField('name', StringType()),
StructField('capital', StringType())
])))
])
l = [(1, [
{'name': 'Italy', 'capital': 'Rome'},
{'name': 'Spain', 'capital': 'Madrid'}
])
]
dz = spark.createDataFrame(l, schema=my_new_schema)
# we have array of structs:
dz.show(truncate=False)
+---+--------------------------------+
|id |countries |
+---+--------------------------------+
|1 |[{Italy, Rome}, {Spain, Madrid}]|
+---+--------------------------------+
# access all capitals:
dz.select('countries.capital').show(truncate=False)
+--------------+
|capital |
+--------------+
|[Rome, Madrid]|
+--------------+
For another specific example of accessing elements in nested structs inside array see This Stack Overflow question.
Adding new elements
Adding a new subfield to an existing struct is supported since Spark 3.1 using
the function withField()
. Let’s see our example in which we add the column currency
to the struct country:
(
df
.withColumn(
'country',
col('country').withField('currency', col('currency'))
)
).show(truncate=False)
+---+---------------------+--------+
|id |country |currency|
+---+---------------------+--------+
|1 |{Italy, Rome, euro} |euro |
|2 |{France, Paris, euro}|euro |
|3 |{Japan, Tokyo, yen} |yen |
+---+---------------------+--------+
Before Spark 3.1, the situation was more complex, and adding a new field to a struct was possible by redefining the entire struct:
new_df = (
df.withColumn('country', struct(
col('country.name'),
col('country.capital'),
col('currency')
))
)
As you can see, we had to list all the struct subfields and after that add the new one — this can be quite cumbersome especially for large structs with many subfields. In this case, there is however a nice trick by which you can handle all subfields at once — using the star notation:
The asterisk in the country.*
will take all subfields of the original struct. The
situation will however become more complicated in the next example where we want
to remove a field.
Removing elements
Dropping subfields from a struct is again a simple task since Spark 3.1 because
the function dropFields()
was released. Let’s now work with the modified DataFrame
new_df where the struct contains three subfields name
, capital
, and currency
.
Removing a subfield, for example, capital can be done as follows:
new_df.withColumn('country',col('country').dropFields('capital')) \
.show(truncate=False)
+---+--------------+--------+
|id |country |currency|
+---+--------------+--------+
|1 |{Italy, euro} |euro |
|2 |{France, euro}|euro |
|3 |{Japan, yen} |yen |
+---+--------------+--------+
As we can see the subfield capital
was dropped. The situation gets complicated
again in previous versions before Spark 3.1 where we have to redefine the entire
struct and leave out the subfield that we want to drop:
For large structs this is again tedious, so we can make this more feasible by listing all the subfields as follows:
# list all fields in the struct:
subfields = new_df.schema['country'].dataType.fieldNames()
# remove the subfield from the list:
subfields.remove('capital')
# use the new list to recreate the struct:
(
new_df.withColumn(
'country',
struct(
['country.{}'.format(x) for x in subfields]
)
)
).show()
+---+--------------+--------+
| id| country|currency|
+---+--------------+--------+
| 1| {Italy, euro}| euro|
| 2|{France, euro}| euro|
| 3| {Japan, yen}| yen|
+---+--------------+--------+
Notice that both functions withField
and dropFields
are members of the Column class,
therefore they are called as methods on the column object (to understand more how
methods from the Column class are used, check my recent article
where I discuss it more in detail).
Structs in SQL expressions
When you check the SQL documentation you will find that there are two functions
that can be used to create structs, namely, it is struct()
and named_struct()
and
they differ in the syntax because named_struct requires also passing a name for
each subfield:
(
df
.selectExpr("struct(id, currency) as my_struct")
).show(truncate=False)
+---------+
|my_struct|
+---------+
|{1, euro}|
|{2, euro}|
|{3, yen} |
+---------+
(
df.selectExpr(
"named_struct('id', id, 'currency', currency) as my_struct")
).show()
+---------+
|my_struct|
+---------+
|{1, euro}|
|{2, euro}|
| {3, yen}|
+---------+
Conclusion
In this article, we continued our description of complex data types in Spark SQL.
In the previous article,
we covered arrays, here we focused on structs, and in the future post, we will cover maps.
We have seen two important functions withField()
and dropFields()
that were
released in the recent version 3.1.1 and that can simplify the code quite a bit
when manipulating subfields of an existing struct.
References
- https://towardsdatascience.com/nested-data-types-in-spark-3-1-663e5ed2f2aa