Streaming data architectures are now prevalent for many enterprises. But when source events are JSON payloads and your downstream systems expect relational tables, things get messy fast. You either end up building a complex ETL stack or you can embrace a more elegant in-database pipeline.
We built a warm ingestion pipeline that uses SingleStore's built-in Kafka pipeline support to consume JSON messages from Azure Event Hubs, store them in a raw JSON layer, normalize them into relational tables, and serve them to BI and analytics tools in near real time. The latency? Measured in milliseconds, even during fairly busy periods.
How did we do it? Let's dive into the architecture, key transformations, challenges, and a few knobs we tweaked using official Kafka or SingleStore configs.
Architecture Overview
At the heart of this setup is a warm pipeline that continuously consumes and processes JSON data with minimal latency - usually within milliseconds - using the following components:
- Azure Event Hubs - Acts as the Kafka-compatible external source in an independent Azure tenant
- SingleStore Pipelines - Built-in connectors that consume Event Hub topics natively via Kafka API. The SingleStore setup is built on an AKS cluster in another Azure tenant
- Three Databases in SingleStore:
- raw - Stores unparsed JSON along with extracted business keys
- core - Contains normalized, relational data extracted from the JSON documents
- information_schema (system) - Maintains pipeline metadata and execution logs. This is inbuilt in SingleStore
- Private Networking - Communication between Azure Event Hubs and SingleStore is secured via VNet Peering and Private Endpoints across separate Azure tenants
Data Flow Design
The pipeline is designed around 7 Event Hub topics, each feeding a corresponding SingleStore pipeline.
Here's the illustration of the data flow and other key technical components involved in the overall architecture.
There are primarily 2 layers to this integration. The raw layer is where the consumer pipeline is defined which then feeds the JSON payload to the procedures for processing and storage into the tables. Note that there is one table defined per topic in the raw layer.
Some key benefits of this integration include:
- SingleStore native pipelines offer a powerful way to consume Kafka data with minimal code. This is because there is no explicit coding required for building the consumer. The offsets are maintained internally within SingleStore which means that the developer does not have to worry about Kafka Group IDs, offset maintenance, and batch retries.
- JSON data is stored, normalized, and made available for queries all within milliseconds, thanks to SingleStore's efficient handling of JSON style data.
- The warm pipeline runs continuously, processing small, frequent batches for low latency. The average batch size we've noticed is anywhere between 20-400 payloads.
- Network security is ensured via private networking using private endpoints defined in VNets across tenants. Upon trial, we've also discovered that defining the endpoint in the same VNet as the software build (SingleStore) improves the throughput significantly as compared to a Hub-Spoke style VNet architecture.
Data Ingestion: Raw DB Layer
Post network hardening, the first step is to define the pipeline which will connect to the Event Hub endpoint for a particular topic:
CREATE PIPELINE <pipelineName>
AS LOAD DATA KAFKA '<endpoint>/<topic>'
CONFIG '{"security.protocol": "sasl_ssl",
"sasl.mechanism": "PLAIN",
"sasl.username":"$ConnectionString"}'
CREDENTIALS '{"sasl.password": "Endpoint=<endpoint>;SharedAccessKeyName=<accesskeyname>;SharedAccessKey=<accesskeyvalue>}'
BATCH_INTERVAL 1000
INTO PROCEDURE <procedureName> (jsonPayload <- %) FORMAT JSON;Within the procedure, each JSON document received from Event Hub is translated into a single row in the raw database. The parent procedure performs two operations:
- Extract Key Metadata - Business keys are pulled from the JSON
- Store Original JSON - The full payload is stored in a jsonPayload column for audit and data science use cases
Additionally, system-level metadata like loaded timestamps are appended.
Procedure definition:
DELIMITER //
CREATE OR REPLACE PROCEDURE `<procedureName>`(jsonBatch query(`jsonPayload` JSON COLLATE utf8mb4_general_ci NULL)) RETURNS void AS
BEGIN
INSERT INTO <rawTableName>(businessKey1, businessKey2,... systemLoadedTs)
select
jsonPayload::parentAttribute::$attribute1,
...
JSON_EXCLUDE_MASK(jsonPayload, '{"someOtherAttribute":1, "someOtherAttribute":1}'),
jsonPayload::$someSystemFlagAttribute,
...
current_timestamp(6)
from jsonBatch;
...
END; //
DELIMITER;Data Transformation: Normalized DB Layer
The parent procedure triggers child procedures - one for each normalized table. In large topics, there can be 20-30 child procs, depending on the structure defined by the source system's Swagger YAML schema.
call(childProcedure1(jsonPayload));Using SingleStore's native JSON parsing syntax, the pipeline efficiently extracts fields for all relevant attributes including the ones in array formats:
...
jsonPayload::parentAttribute::$businessKeyAttr as businessKeyColumn,
table_col::%arrayChildAttribute AS arrayAttrColumn,
...
COALESCE(jsonPayload::parentAttribute::$`attribute1`, "") AS column1,
current_timestamp(6) AS systemLoadedTs,
...
FROM jsonPayload
LEFT JOIN table(JSON_TO_ARRAY(jsonPayload::$arrayParentAttribute))
ON (jsonPayload::$arrayParentAttribute != "[]")
where businessKeyColumn is not NULL
ON DUPLICATE KEY UPDATE
businessKeyColumn = VALUES(businessKeyColumn),
...The normalized data is written to relational tables optimized for joins and analytics and this happens in Update mode using SingleStore inbuilt functionality ON DUPLICATE KEY UPDATE. Overall, this schema-first transformation makes it seamless for BI tools to query the data.
Note that wherever array attributes exist, one row is generated for each array element with the same values for all its non-array attributes (if mapped).
Since the data from each topic is processed independently, and foreign keys often span across topics, late-arriving events can create referential integrity issues.
To mitigate this, the system runs hourly maintenance routines to reconcile missing relationships. These routines scan candidate tables and attempt to repair foreign key gaps using updated data from the raw and transformed layers.
System DB: Operational Metadata
All pipeline execution logs, timestamps, and metadata are automatically written to the information_schema database. This enables easy monitoring, alerting, and auditing without mixing operational logs with business data.
Sample data for offset maintenance in information_schema.PIPELINES_OFFSETS table from SingleStore's public playground demonstrates how the system tracks progress automatically.
Performance Characteristics
The pipeline operates in warm mode, meaning it runs continuously and processes data in near real-time. Key performance stats:
- Latency - Typically within milliseconds per payload
- Batch Size - 20-400 rows per execution/batch
- Throughput - Scales with topic activity and availability of resources on the SingleStore leaf nodes
- Concurrency - SingleStore manages all Kafka consumer logic natively, scaling across partitions automatically
- Volume - During peak periods, the daily row count combining all 7 topics could go as high as 18 to 20 million rows. Normal daily range is 6-10 million rows
Operational Simplicity with SingleStore Pipelines
One of the standout features is the use of SingleStore's native pipeline framework. Unlike custom Kafka consumers, these pipelines:
- Automatically manage Kafka offsets (no need to configure group IDs)
- Kafka consumer config parameters like max.partition.fetch.bytes can still be configured easily within the pipeline definition
- Handle retry logic, backpressure, and error handling internally
- Require only a topic name, endpoint, target DB, and associated SQL procedure to function
- Reduce developmental overhead by eliminating the need for separate ETL orchestration tools
Downstream Integration
Once data lands in the transformed layer, it is exposed to various consumers:
- Qlik dashboards for real-time reporting
- BI systems for statistical analysis
- Direct SQL access for power users and analysts
Because the data is already relational, analysts can query complex business logic without dealing with raw JSON structures. But if needed, the raw JSON payloads are also available for data science consumers.
Conclusion
Building a low-latency, JSON-native ingestion pipeline can be a complex endeavor involving custom Kafka consumers, external ETL tools, and orchestration frameworks. With SingleStore Pipelines and Azure Event Hubs, it's now possible to create production-grade, scalable, and elegant data pipelines that handle everything from ingestion to transformation - all within SQL.




