Skip to content

With Iceberg

Getting Started

Define the important libraries

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
catalog_nm = "glue_catalog"

# The Glue Database Name which has the source table
in_database="<glue-database-input>"

# The input Glue Table which we will be using as a source for the Iceberg data
in_table_name="covid_19_data"

# The Glue Database Name which will be used to create an output Iceberg table
database_op='database_ib'

# The Glue Table Name which will be used as a destination for Iceberg table
table_op='covid_dataset_iceberg'

# The S3 path which will be used to store the Iceberg files as output
s3_output_path='s3://<your-destination-bucket-name>/iceberg-output/'

table = str(catalog_nm)+ '.`' + str(database_op) + '`.' + str(table_op)

print("\nINPUT Database : " + str(in_database))
print("\nINPUT Table : " + str(in_table_name))
print("\nOUTPUT IceBerg Database : " + str(database_op))
print("\nOUTPUT IceBerg Table : " + str(table))
print("\nOUTPUT IceBerg S3 Path : " + str(s3_output_path))

In line with the script we need to define a important job parameter in the glue which will indicate the Glue job executer to leverage the Iceberg table format as output for the data. For this you need to define a parameter named as

--datalake-formats : iceberg

Job Parameters

Define Spark and Glue context

def create_spark_iceberg(catalog_nm: str = "glue_catalog"):
    """
    Function to initialize a session with iceberg by default
    :param catalog_nm:
    :return spark:
    """
    from pyspark.sql import SparkSession
    # You can set this as a variable if required
    warehouse_path = s3_output_path

    spark = (
        SparkSession.builder
            .config(f"spark.sql.catalog.{catalog_nm}", "org.apache.iceberg.spark.SparkCatalog")
            .config(f"spark.sql.catalog.{catalog_nm}.warehouse", warehouse_path)
            .config(f"spark.sql.catalog.{catalog_nm}.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
            .config(f"spark.sql.catalog.{catalog_nm}.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
            .config(f"spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
            .getOrCreate()
    )
    return spark

ibspark = create_spark_iceberg(catalog_nm)
ibsc = ibspark.sparkContext
ibglueContext = GlueContext(ibsc)
ibjob = Job(ibglueContext)
ibjob.init(args["JOB_NAME"], args)

Read the source Glue table and write into a destination Glue

#Read the Glue inout table from thr Catalog using a Glue DynamicFrame
InputDynamicFrameTable = (
    ibglueContext.create_dynamic_frame
        .from_catalog(database=in_database, table_name=in_table_name)
)

#Convert the Glue DynamicFrame into a Spark DataFrame
InputDynamicFrameTable_DF = InputDynamicFrameTable.toDF()

#Register the Spark DataFrame as TempView
InputDynamicFrameTable_DF.createOrReplaceTempView("InputDataFrameTable")
ibspark.sql("select * from InputDataFrameTable LIMIT 10").show()

#Filter the source table with country as 'Australia'
colname_df = ibspark.sql("SELECT * FROM InputDataFrameTable WHERE country='Australia'")
colname_df.createOrReplaceTempView("OutputDataFrameTable")

#Write the filtered Data into an ICEBERG table format in Glue destination table
ib_Write_SQL = f"""
    CREATE OR REPLACE TABLE {catalog_nm}.{database_op}.{table_op}
    USING iceberg
    TBLPROPERTIES ("format-version"="2", "write_compression"="gzip")
    AS SELECT * FROM OutputDataFrameTable;
    """

#Run the Spark SQL query
ibspark.sql(ib_Write_SQL)

Read Mores