Connect to Azure Services
Authentication
Using Service Principal
1) Create Service Principal
To register your application and create your service principal:
- Go to Azure Active Directory Click App registrations Start get New registration
- Add the information of this app like
name
iscnct-adb-dev
(The name of app should be formatted like{app}-{resource-shortname}-{environment}
) - Click register for create
You will then be required to generate a secret:
- Go to
App registrations
Certificates&secrets
New Client Secret
- Save this value to
Azure Key Vaults
Note
We write both the Client ID
and Secret
to Key Vault for a number of reasons:
- The
Secret
is sensitive and like aStorage Key
orPassword
, we don't want this to be hardcoded or exposed anywhere in our application. - Normally we would have an instance of
Databricks
andKey Vault
per environment and when we come to referencing the secrets, we want the secrets names to remain the same, so the code in our Databricks notebooks referencing theSecrets
doesn't need to be modified when we deploy to different environments.
Abstract
The App Registration is the template used to create the security principal (like a User) which can be authenticated and authorized.
SQL Database
1) Create External User
The app registration still needs permission to log into Azure SQL
and access the
objects within it. You’ll need to Create that user (App & Service Principal)
in the database and then grant it permissions on the underlying objects.
Grant Permission:
Create SQL User
2) Connection Code
Method 01: Spark Connector
To connect to Azure SQL
, you will need to install the SQL Spark Connector
and the Microsoft Azure Active Directory Authentication Library
(ADAL) for Python code.
-
Go to your cluster in Databricks and Install necessary packages:
- Maven:
com.microsoft.azure:spark-mssql-connector_2.12_3.0:1.0.0-alpha
- PYPI:
adal
- Maven:
-
Also, if you haven’t already, Create a Secret Scope to your
Azure Key Vault
where yourClient ID
,Secret
, andTenant ID
have been generated. -
Get
Access Token
from Service Principle authentication requestimport adal context = adal.AuthenticationContext( f"https://login.windows.net/{dbutils.secrets.get(scope='defaultScope', key='TenantId')}" ) token = context.acquire_token_with_client_credentials( "https://database.windows.net/", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"), dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"), ) access_token = token["accessToken"]
df = (
spark.read
.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", "jdbc:sqlserver://<server-instance-name>.database.windows.net")
.option("databaseName", "{dev}")
.option("accessToken", access_token)
.option("encrypt", "true")
.option("hostNameInCertificate", "*.database.windows.net")
.option("dbtable", "[dbo].[<table-name>]")
.option("batchsize", 2500)
.option("mssqlIsolationLevel", "READ_UNCOMMITTED")
.load()
)
Note
This connector by default uses READ_COMMITTED
isolation level when performing
the bulk insert into the database. If you wish to override the isolation
level, use the mssqlIsolationLevel
option as show above.
(
df.write
.format("com.microsoft.sqlserver.jdbc.spark")
.mode("append")
.option("url", "jdbc:sqlserver://<server-instance-name>.database.windows.net")
.option("dbtable", "[dbo].[<table-name>]")
.option("accessToken", access_token)
.option("schemaCheckEnabled", "false")
.save()
)
Note
When schemaCheckEnabled
is false
, we can write to the destination table
which has less column than dataframe.
Note
Executing custom SQL through the connector. The previous Azure SQL Connector
for Spark provided the ability to execute custom SQL code like DML or DDL
statements through the connector. This functionality is out-of-scope of this
connector since it is based on the DataSource APIs. This functionality is readily
provided by libraries like pyodbc
, or you can use the standard java sql interfaces
as well.
Method 02: JDBC Connector
This method reads or writes the data row by row, resulting in performance issues. Not Recommended.
In Databricks Runtime 11.3 LTS and above, you can use the sqlserver keyword to use the included driver for connecting to SQL server.
df = (
spark.read
.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://<host>:1433;")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.option("encrypt", "true")
.load()
)
df = (
spark.read
.jdbc(
url="jdbc:sqlserver://<host>:1433;database=<database>",
table="<schema-name>.<table-name>",
properties={
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"authentication": "ActiveDirectoryServicePrincipal",
"UserName": dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"),
"Password": dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"),
},
)
)
(
df.write
.mode("append")
.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://<host>:1433;")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.save()
)
(
df.write
.mode("overwrite")
.format("jdbc")
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
.option("url", "jdbc:sqlserver://<host>:1433;")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.save()
)
df = (
spark.read
.format("sqlserver")
.option("host", "<host-name>.database.windows.net")
.option("port", "1433")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.option("encrypt", "true")
.option("hostNameInCertificate", "*.database.windows.net")
.load()
)
(
df.write
.mode("append")
.format("sqlserver")
.option("host", "<host:***.database.windows.net>")
.option("port", "1433")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.save()
)
(
df.write
.mode("overwrite")
.format("sqlserver")
.option("host", "<host:***.database.windows.net>")
.option("port", "1433")
.option("authentication", "ActiveDirectoryServicePrincipal")
.option("user", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnId"))
.option("password", dbutils.secrets.get(scope="defaultScope", key="DatabricksSpnSecret"))
.option("database", "<database-name>")
.option("dbtable", "<schema-name>.<table-name>")
.option("truncate", true)
save()
)
When using mode overwrite
if you do not use the option truncate on recreation
of the table, indexes will be lost. , a columnstore table would now be a heap.
If you want to maintain existing indexing please also specify option truncate
with value true. For example, .option("truncate","true")
.
References:
SQL User
Method 01: ODBC Connector
Install ODBC Driver on cluster:
%sh
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/16.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get -q -y install msodbcsql17
import pyodbc
conn = pyodbc.connect(
f'DRIVER={{ODBC Driver 17 for SQL Server}};'
f'SERVER=<host>;DATABASE=<database_name>;UID=[cnct-adb-dev];PWD=P@ssW0rd;'
f'Authentication=SqlPassword;Encrypt=yes;'
)
Reference:
- StackOverFlow: Using PyODBC in Azure Databricks for Connect to SQL Server
- Microsoft: SQL ODBC - Using Azure AD
Method 02: JDBC Connector
References:
Event Hubs
Warning
We should use 1 consumer group per query stream because it will raise ReceiverDisconnectedException
.
Read More about multiple readers.
1) Installation Package
2) Connection Code
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('App Connect Eventhub')
.config("spark.jars.packages", "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22")
.config("spark.locality.wait", "15s") # Default: 3s
.getOrCreate()
)
connectionString: str = (
f"Endpoint=sb://{eventhubs_namespace}.servicebus.windows.net/;"
f"SharedAccessKeyName={sharekey_name};"
f"SharedAccessKey={sharekey};"
f"EntityPath={eventhubs_name}"
)
ehConf = {
'eventhubs.connectionString' : spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
'eventhubs.consumerGroup' : "$Default",
}
df = (
spark
.readStream
.format("eventhubs")
.options(**ehConf)
.option("maxEventsPerTrigger", 1_000_000) # Default: <partition> * 1_000
.option("useExclusiveReceiver", False) # Default: True
.option("receiverTimeout", "PT000100") # Default: 60 sec
.option("operationTimeout", "PT000100") # Default: 60 sec
.load()
)
Note
This option require enable Kafka on the Azure Event Hubs.
connectionString: str = (
f"Endpoint=sb://{eventhubs_namespace}.servicebus.windows.net/;"
f"SharedAccessKeyName={sharekey_name};"
f"SharedAccessKey={sharekey};"
f"EntityPath={eventhubs_name}"
)
EH_SASL: str = (
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="$ConnectionString" '
f'password="{connectionString}";'
)
df = (
spark
.readStream
.format("kafka")
.option("subscribe", f"{eventhubs_name}")
.option("kafka.bootstrap.servers", f"{eventhubs_namespace}.servicebus.windows.net:9093")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", EH_SASL)
.option("kafka.request.timeout.ms", "60000")
.option("kafka.session.timeout.ms", "30000")
.option("kafka.group.id", "$Default")
.option("failOnDataLoss", "true")
.option("spark.streaming.kafka.allowNonConsecutiveOffsets", "true")
.load()
)
References:
- https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#user-configuration
- https://medium.com/@kaviprakash.2007/structured-streaming-using-azure-databricks-and-event-hub-6b0bcbf029c4
- Connect your Apache Spark application with Azure Event Hubs
- Using Apache Spark with Azure Event Hubs for Apache Kafka Ecosystems
IoT Hub
1) Using Shared Access Key
- Go to Azure IoT Hub > Click on Access Key
2) Package Installation
3) Connection Code
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName('App Connect IoT Hub')
.config("spark.jars.packages", "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22")
.config("spark.locality.wait", "15s") # Default: 3s
.getOrCreate()
)
connectionString: str = (
f"Endpoint=sb://{eventhubs_compatible_name}.servicebus.windows.net/;"
f"SharedAccessKeyName={sharekey_name};"
f"SharedAccessKey={sharekey};"
f"EntityPath={endpoint_name}"
)
ehConf = {
'eventhubs.connectionString' : spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString),
'eventhubs.consumerGroup' : "$Default",
'eventhubs.partition.count': "1",
'ehName': f"{IoTHubs-EventHub-Compatible-Name}",
}
df = (
spark
.readStream
.format("eventhubs")
.options(**ehConf)
.option("maxEventsPerTrigger", 1_000_000) # Default: <partition> * 1_000
.option("useExclusiveReceiver", False) # Default: True
.option("receiverTimeout", "PT000100") # Default: 60 sec
.option("operationTimeout", "PT000100") # Default: 60 sec
.load()
)
References:
- https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/PySpark/structured-streaming-pyspark.md#user-configuration
- https://medium.com/@kaviprakash.2007/structured-streaming-using-azure-databricks-and-event-hub-6b0bcbf029c4