Connect to AWS Services
Authentication
Using AWS Access Token
- Go to IAM
Kinesis
Warning
In Databricks Runtime 11.3 LTS and Above, the Trigger.Once
setting is
deprecated.
Databricks recommends you use Trigger.AvailableNow
for all incremental batch
processing workloads.1
1) IAM Policy
By default, the Kinesis connector resorts to Amazon’s default credential provider chain
2) Connection Code
df = (
spark.readStream
.format("kinesis")
.option("streamName", "<aws-kinesis-stream-name>")
.option("initialPosition", "latest")
.option("format", "json")
.option("awsAccessKey", "<aws-access-key>")
.option("awsSecretKey", "<aws-access-secret-key")
.option("region", "<aws-region>")
.option("inferSchema", "true")
.load()
)
Note
initialPosition:
latest
: Read from the latest position that data ingest.trim_horizon
orearliest
: Read all data that keep in shard.at_timestamp
: Specify time value such as{"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}
(
df
.writeStream
.format("kinesis")
.outputMode("update")
.option("streamName", "<aws-kinesis-stream-name>")
.option("region", "<aws-region>")
.option("awsAccessKeyId", "<aws-access-key>")
.option("awsSecretKey", "aws-access-secret-key")
.option("checkpointLocation", "/path/to/checkpoint")
.start()
.awaitTermination()
)
Note
Kinesis returns records with the following schema:
from pyspark.sql.types import TimestampType, StringType, StructType, StructField, BinaryType
schema: StructType = StructType(
[
StructField("partitionKey", StringType(), True),
StructField("data", BinaryType(), False),
StructField("stream", StringType(), False),
StructField("shardId", StringType(), False),
StructField("sequenceNumber", StringType(), False),
StructField("approximateArrivalTimestamp", TimestampType(), False),
],
)
References: