Skip to content

Connect to AWS Services

Authentication

Using AWS Access Token

  • Go to IAM

Kinesis

Warning

In Databricks Runtime 11.3 LTS and Above, the Trigger.Once setting is deprecated. Databricks recommends you use Trigger.AvailableNow for all incremental batch processing workloads.1

1) IAM Policy

By default, the Kinesis connector resorts to Amazon’s default credential provider chain

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:Get*",
                "kinesis:DescribeStreamSummary"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:111122223333:stream/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:111122223333:stream/*"
            ]
        }
    ]
}

2) Connection Code

df = (
    spark.readStream
        .format("kinesis")
        .option("streamName", "<aws-kinesis-stream-name>")
        .option("initialPosition", "latest")
        .option("format", "json")
        .option("awsAccessKey", "<aws-access-key>")
        .option("awsSecretKey", "<aws-access-secret-key")
        .option("region", "<aws-region>")
        .option("inferSchema", "true")
        .load()
)

Note

initialPosition:

  • latest: Read from the latest position that data ingest.
  • trim_horizon or earliest: Read all data that keep in shard.
  • at_timestamp: Specify time value such as {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}

Read more about StartingPosition

(
    df
        .writeStream
        .format("kinesis")
        .outputMode("update")
        .option("streamName", "<aws-kinesis-stream-name>")
        .option("region", "<aws-region>")
        .option("awsAccessKeyId", "<aws-access-key>")
        .option("awsSecretKey", "aws-access-secret-key")
        .option("checkpointLocation", "/path/to/checkpoint")
        .start()
        .awaitTermination()
)

Note

Kinesis returns records with the following schema:

from pyspark.sql.types import TimestampType, StringType, StructType, StructField, BinaryType

schema: StructType = StructType(
    [
        StructField("partitionKey", StringType(), True),
        StructField("data", BinaryType(), False),
        StructField("stream", StringType(), False),
        StructField("shardId", StringType(), False),
        StructField("sequenceNumber", StringType(), False),
        StructField("approximateArrivalTimestamp", TimestampType(), False),
    ],
)

References: