Getting Started with Azure Event Hubs: Ingesting and Processing Big Data Streams | by Abhimanyubajaj | Medium

Getting Started with Azure Event Hubs: Ingesting and Processing Big Data Streams

Abhimanyubajaj
5 min readNov 7, 2023

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second. It represents the “front door” for an event pipeline, often called an event ingestor in solution architectures. Event Hubs provides a unified streaming platform with time retention buffer, decoupling event producers from event consumers.

Pre-requisite

  • Microsoft Azure subscription. To use Azure services, including Azure Event Hubs, you need a subscription. If you don’t have an existing Azure account, sign up for a free trial.
  • Python 3.7 or later, with pip installed and updated.
  • Visual Studio Code (recommended) or any other integrated development environment (IDE).
  • Create an Event Hubs namespace and an event hub. (Follow along)

Create an Event Hubs namespace and an event hub

You can achieve this by simply running the terraform from the github. To do so->

git clone https://github.com/Abhimanyu9988/azure-event-hub.git
cd azure-event-hub
terraform init
terraform apply --auto-approve

If you prefer UI then, In the portal.azure.com -> Search for Azure Event hubs.

Create Event hubs namespace -> Enter all the details as below -> Review + Create -> Create

Great. Now let’s create a Event hub in this namespace. In the Event hub namespace, Select -> Event hubs (bottom left) -> Event hub (+) -> Choose your desired name -> Review + Create -> Create

IAM permission for Azure Event hub

By default when you login from the Account creator email, You will have Owner Role. Therefore, We need to add permission to your User for it to run the python(we will create later) script from CLI.

To do that->

  1. On the Azure Event hub page -> Click Access Control IAM -> Role assignment -> Add Role assignment and add below->

You would need to do all these 1 by 1.

In + Select Members -> Select your User

Above is how it will look. Now you ready to go to the next part. :)

Irrespective of Azure portal or terraform, The result will look like below (Overview)->

The terraform output will provide you with values that we will need to publish data to Azure Event hub.

EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
EVENT_HUB_NAME

Send Events

To send the events to our EventHub, We will create a simply python script.

import asyncio

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient
from azure.identity.aio import DefaultAzureCredential


EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<azure-event-hub-namespace>.servicebus.windows.net"
EVENT_HUB_NAME = "<azure-event-hub-name>"

credential = DefaultAzureCredential()

async def run():
# Create a producer client to send messages to the event hub.
# Specify a credential that has correct role assigned to access
# event hubs namespace and the event hub name.
producer = EventHubProducerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
credential=credential,
)
async with producer:
# Create a batch.
event_data_batch = await producer.create_batch()

# Add events to the batch.
event_data_batch.add(EventData("First event "))
event_data_batch.add(EventData("Second event"))
event_data_batch.add(EventData("Third event"))

# Send the batch of events to the event hub.
await producer.send_batch(event_data_batch)

# Close credential when no longer needed.
await credential.close()

asyncio.run(run())

Please replace EVENT_HUB_FULLY_QUALIFIED_NAMESPACE and EVENT_HUB_NAME

Before you run the python, Install dependancies->

pip3 install azure-eventhub
pip3 install azure-identity
pip3 install aiohttp

You can also clone the repository and run pre-req-python.sh file. It will install all dependancies for our complete article.

Receive items

Azure Event Hub doesn’t allow you to directly query the data. You would need to use a checkpoint. We will use Azure Blob storage. The Blob storage needs to have Storage Blob Data Contributor role to your user account. If you deployed our terraform then this is automatically done for you.

Else, create an Azure storage account and a blob container in it by doing the following steps:

  1. Create an Azure Storage account
  2. Create a blob container

In our Terraform output you should see

You would need to use these in our python script to receive items from Event Hub.

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
BlobCheckpointStore,
)
from azure.identity.aio import DefaultAzureCredential

BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
EVENT_HUB_NAME = "EVENT_HUB_NAME"

credential = DefaultAzureCredential()

async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)

# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)


async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore(
blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
container_name=BLOB_CONTAINER_NAME,
credential=credential,
)

# Create a consumer client for the event hub.
client = EventHubConsumerClient(
fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
eventhub_name=EVENT_HUB_NAME,
consumer_group="$Default",
checkpoint_store=checkpoint_store,
credential=credential,
)
async with client:
# Call the receive method. Read from the beginning of the partition
# (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")

# Close credential when no longer needed.
await credential.close()

if __name__ == "__main__":
# Run the main method.
asyncio.run(main())

Please make to replace BLOB_STORAGE_ACCOUNT_URL, BLOB_CONTAINER_NAME, EVENT_HUB_FULLY_QUALIFIED_NAMESPACE and EVENT_HUB_NAME

Before running the script, Make sure to install the dependancy.

pip3 install azure-eventhub-checkpointstoreblob-aio

To run the script->

python3 receive.py

Viola!! Great work everyone.

For more information or in case you get stuck, Feel free to connect with me on LinkedIn

https://www.linkedin.com/in/theabhibajaj/

--

--

Abhimanyubajaj

I solve problems. CKAD, CKA, Azure, AWS, GCP, Terraform Certified. Senior Software Engineer at Cisco.