Getting Started with Azure Event Hubs: Ingesting and Processing Big Data Streams
Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. It represents the “front door” for an event pipeline, often called an event ingestor in solution architectures. Event Hubs provides a unified streaming platform with time retention buffer, decoupling event producers from event consumers.
Pre-requisite
- Microsoft Azure subscription. To use Azure services, including Azure Event Hubs, you need a subscription. If you don’t have an existing Azure account, sign up for a free trial.
- Python 3.7 or later, with pip installed and updated.
- Visual Studio Code (recommended) or any other integrated development environment (IDE).
- Create an Event Hubs namespace and an event hub. (Follow along)
Create an Event Hubs namespace and an event hub
You can achieve this by simply running the terraform from the github. To do so->
git clone https://github.com/Abhimanyu9988/azure-event-hub.git
cd azure-event-hub
terraform init
terraform apply --auto-approve
If you prefer UI then, In the portal.azure.com -> Search for Azure Event hubs.
Create Event hubs namespace -> Enter all the details as below -> Review + Create -> Create
Great. Now let’s create a Event hub in this namespace. In the Event hub namespace, Select -> Event hubs (bottom left) -> Event hub (+) -> Choose your desired name -> Review + Create -> Create
IAM permission for Azure Event hub
By default when you login from the Account creator email, You will have Owner Role. Therefore, We need to add permission to your User for it to run the python(we will create later) script from CLI.
To do that->
- On the Azure Event hub page -> Click Access Control IAM -> Role assignment -> Add Role assignment and add below->
- Azure Event Hubs Data Owner: Use this role to give complete access to Event Hubs resources.
- Azure Event Hubs Data Sender: Use this role to give access to Event Hubs resources.
- Azure Event Hubs Data Receiver: Use this role to give receiving access to Event Hubs resources.
You would need to do all these 1 by 1.
In + Select Members -> Select your User
Above is how it will look. Now you ready to go to the next part. :)
Irrespective of Azure portal or terraform, The result will look like below (Overview)->
The terraform output will provide you with values that we will need to publish data to Azure Event hub.
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME
Send Events
To send the events to our EventHub, We will create a simply python script.
import asyncio
from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<azure-event-hub-namespace>.servicebus.windows.net"
EVENT_HUB_NAME = "<azure-event-hub-name>"
credential = DefaultAzureCredential()
async def run():
# Create a producer client to send messages to the event hub.
# Specify a credential that has correct role assigned to access
# event hubs namespace and the event hub name.
producer = EventHubProducerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
credential=credential,
)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()
# Add events to the batch.
event_data_batch.add(EventData("First event "))
event_data_batch.add(EventData("Second event"))
event_data_batch.add(EventData("Third event"))
# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)
# Close credential when no longer needed.
await credential.close()
asyncio.run(run())
Please replace EVENT_HUB_FULLY_QUALIFIED_NAMESPACE and EVENT_HUB_NAME
Before you run the python, Install dependancies->
pip3 install azure-eventhub
pip3 install azure-identity
pip3 install aiohttp
You can also clone the repository and run pre-req-python.sh file. It will install all dependancies for our complete article.
Receive items
Azure Event Hub doesn’t allow you to directly query the data. You would need to use a checkpoint. We will use Azure Blob storage. The Blob storage needs to have Storage Blob Data Contributor role to your user account. If you deployed our terraform then this is automatically done for you.
Else, create an Azure storage account and a blob container in it by doing the following steps:
In our Terraform output you should see
You would need to use these in our python script to receive items from Event Hub.
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
BlobCheckpointStore,
)
from azure.identity.aio import DefaultAzureCredential
BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
EVENT_HUB_NAME = "EVENT_HUB_NAME"
credential = DefaultAzureCredential()
async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)
# Create a consumer client for the event hub.
client = EventHubConsumerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group="$Default",
checkpoint_store=checkpoint_store,
credential=credential,
)
async with client:
# Call the receive method. Read from the beginning of the partition
# (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
# Close credential when no longer needed.
await credential.close()
if __name__ == "__main__":
# Run the main method.
asyncio.run(main())
Please make to replace BLOB_STORAGE_ACCOUNT_URL, BLOB_CONTAINER_NAME, EVENT_HUB_FULLY_QUALIFIED_NAMESPACE and EVENT_HUB_NAME
Before running the script, Make sure to install the dependancy.
pip3 install azure-eventhub-checkpointstoreblob-aio
To run the script->
python3 receive.py
Viola!! Great work everyone.
For more information or in case you get stuck, Feel free to connect with me on LinkedIn