Building a Scalable Event-Driven Data Pipeline on AWS

Event-driven architecture has become a foundational pattern for modern data engineering. Unlike traditional batch processing where you schedule jobs to run at fixed intervals, event-driven systems react to data as it arrives, enabling real-time insights and responsive applications. In this guide, you'll learn how to build a scalable event-driven data pipeline on AWS using Kinesis for event ingestion, Lambda for processing, and S3 for durable storage.

Understanding Event-Driven Architecture in Data Engineering

Event-driven architecture (EDA) is a design pattern where system components communicate by producing and consuming events. An event represents a significant change in state—a user clicking a button, a sensor recording a temperature reading, or a transaction completing. Rather than polling for updates or running scheduled jobs, event-driven systems respond immediately when something happens.

Why Event-Driven for Data Pipelines?

Traditional batch-oriented data pipelines collect data over a period (hourly, daily) and process it in bulk. This approach works well for many use cases but introduces inherent latency. Event-driven pipelines process data as it arrives, offering several advantages for data engineering workloads:

Batch Processing Event-Driven Processing
Fixed processing schedules Continuous, real-time processing
High latency (minutes to hours) Low latency (sub-second to seconds)
Simpler failure recovery Requires idempotent processing
Predictable resource usage Dynamic scaling based on load

According to AWS documentation, Amazon Kinesis Data Streams provides a typical put-to-get delay of less than one second, making it suitable for real-time data intake and aggregation.

Core Components of an Event-Driven Pipeline

An event-driven data pipeline on AWS typically consists of:

  1. Event producers: Applications, devices, or services that generate events
  2. Event stream: A durable, ordered log of events (Kinesis Data Streams)
  3. Event processors: Functions that transform and route events (Lambda)
  4. Event sinks: Destinations for processed data (S3, databases, other streams)

This decoupled architecture allows each component to scale independently and evolve without affecting others.

Setting Up a Kinesis Stream for Event Ingestion

Amazon Kinesis Data Streams serves as the backbone of your event-driven pipeline. It captures streaming data and makes it available for processing by multiple consumers simultaneously.

Understanding Kinesis Architecture

According to AWS documentation, a Kinesis data stream consists of one or more shards, each providing a fixed unit of capacity:

  • Write capacity: Up to 1,000 records per second or 1 MB per second per shard
  • Read capacity: Up to 5 transactions per second with a maximum of 2 MB per second per shard

Each data record contains a sequence number (assigned by Kinesis), a partition key (specified by the producer), and a data blob up to 1 MB. The partition key determines which shard receives each record using an MD5 hash function.

Create the Event Stream

Create a Kinesis stream to ingest your events:

# Create a stream with on-demand capacity mode
aws kinesis create-stream \
    --stream-name event-pipeline \
    --stream-mode-details StreamMode=ON_DEMAND

# Wait for the stream to become active
aws kinesis wait stream-exists --stream-name event-pipeline

# Verify stream status
aws kinesis describe-stream-summary --stream-name event-pipeline

The on-demand capacity mode automatically scales to handle your throughput needs, charging based on actual data ingested and retrieved rather than provisioned capacity.

For predictable workloads, provisioned mode offers more control:

# Create a stream with provisioned capacity (2 shards)
aws kinesis create-stream \
    --stream-name event-pipeline-provisioned \
    --shard-count 2

Configure Data Retention

According to AWS documentation, Kinesis streams retain data for a configurable period ranging from 24 hours (default) to 365 days. Longer retention enables reprocessing historical data but incurs additional costs:

# Extend retention to 7 days for reprocessing capability
aws kinesis increase-stream-retention-period \
    --stream-name event-pipeline \
    --retention-period-hours 168

Producing Events to the Stream

Create a simple event producer to send data to your stream:

// event-producer.mjs
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';

const kinesis = new KinesisClient({ region: 'us-east-1' });
const STREAM_NAME = 'event-pipeline';

// Generate a sample event
function createEvent(eventType, userId) {
    return {
        event_id: `evt_${Date.now()}_${Math.random().toString(36).slice(2, 11)}`,
        event_type: eventType,
        user_id: userId,
        timestamp: new Date().toISOString(),
        properties: {
            source: 'web',
            session_id: `sess_${Math.random().toString(36).slice(2, 11)}`
        }
    };
}

// Send a batch of events
async function publishEvents(events) {
    const records = events.map(event => ({
        Data: Buffer.from(JSON.stringify(event)),
        PartitionKey: event.user_id // Route by user for ordering
    }));

    const command = new PutRecordsCommand({
        StreamName: STREAM_NAME,
        Records: records
    });

    const response = await kinesis.send(command);

    if (response.FailedRecordCount > 0) {
        console.error(`Failed to publish ${response.FailedRecordCount} records`);
    }

    return response;
}

// Example: publish 10 events
const events = [
    createEvent('page_view', 'user_123'),
    createEvent('click', 'user_123'),
    createEvent('purchase', 'user_456'),
    createEvent('signup', 'user_789'),
    createEvent('page_view', 'user_456'),
    createEvent('search', 'user_123'),
    createEvent('add_to_cart', 'user_456'),
    createEvent('page_view', 'user_789'),
    createEvent('checkout_start', 'user_456'),
    createEvent('purchase', 'user_456')
];

await publishEvents(events);
console.log('Events published successfully');

The partition key determines event ordering within a shard. Events with the same partition key maintain their relative order, which is important for user session analysis or transaction processing.

Creating a Lambda Function for Event Processing

AWS Lambda provides serverless compute that integrates directly with Kinesis. According to AWS documentation, Lambda polls each shard in your stream and invokes your function with batches of records, handling the complexity of shard iteration and checkpointing.

Set Up the Lambda Execution Role

Create an IAM role that grants Lambda permission to read from Kinesis and write logs:

# Create trust policy for Lambda
cat > lambda-trust-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "lambda.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
        }
    ]
}
EOF

# Create the execution role
aws iam create-role \
    --role-name event-processor-role \
    --assume-role-policy-document file://lambda-trust-policy.json

# Create policy for Kinesis access and CloudWatch Logs
cat > event-processor-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:*:*:stream/event-pipeline"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}
EOF

# Attach the policy
aws iam put-role-policy \
    --role-name event-processor-role \
    --policy-name EventProcessorPolicy \
    --policy-document file://event-processor-policy.json

Create the Event Processor Function

Build a Lambda function that processes Kinesis events. According to AWS documentation, Kinesis records arrive base64-encoded, and your function receives batches of records from a single shard:

// index.mjs - Event-driven processor for Kinesis records

/**
 * Process a batch of events from Kinesis.
 * Implements idempotent processing to handle at-least-once delivery.
 */
export const handler = async (event) => {
    console.log(`Processing batch of ${event.Records.length} records`);

    const batchItemFailures = [];
    const processedEvents = [];

    for (const record of event.Records) {
        try {
            const processedEvent = await processEvent(record);
            processedEvents.push(processedEvent);
        } catch (error) {
            console.error(`Failed to process record ${record.kinesis.sequenceNumber}:`, error.message);

            // Report failed record for retry
            batchItemFailures.push({
                itemIdentifier: record.kinesis.sequenceNumber
            });
        }
    }

    console.log(`Successfully processed ${processedEvents.length} events`);

    // Return failed items so Lambda can retry them
    return { batchItemFailures };
};

/**
 * Process a single Kinesis record.
 */
async function processEvent(record) {
    // Decode base64 data
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const event = JSON.parse(payload);

    // Validate required fields
    if (!event.event_id || !event.event_type || !event.timestamp) {
        throw new Error('Missing required fields: event_id, event_type, or timestamp');
    }

    // Enrich the event with processing metadata
    const enrichedEvent = {
        ...event,
        processing: {
            processed_at: new Date().toISOString(),
            shard_id: extractShardId(record.eventID),
            sequence_number: record.kinesis.sequenceNumber,
            approximate_arrival: record.kinesis.approximateArrivalTimestamp
        },
        derived: {
            event_date: event.timestamp.split('T')[0],
            hour_of_day: new Date(event.timestamp).getUTCHours(),
            day_of_week: new Date(event.timestamp).getUTCDay()
        }
    };

    // Apply business logic based on event type
    await handleEventType(enrichedEvent);

    return enrichedEvent;
}

/**
 * Extract shard ID from Kinesis event ID.
 */
function extractShardId(eventId) {
    return eventId.split(':')[0];
}

/**
 * Route events based on type for specialized processing.
 */
async function handleEventType(event) {
    switch (event.event_type) {
        case 'purchase':
        case 'refund':
            // Financial events - could trigger notifications or update metrics
            console.log(`Financial event: ${event.event_id}, type: ${event.event_type}`);
            break;

        case 'signup':
            // User acquisition events
            console.log(`New user signup: ${event.user_id}`);
            break;

        case 'page_view':
        case 'click':
        case 'search':
            // Analytics events
            console.log(`Analytics event: ${event.event_type} from ${event.user_id}`);
            break;

        default:
            console.log(`Unhandled event type: ${event.event_type}`);
    }
}

Deploy the Lambda Function

Package and deploy your function:

# Create deployment package
zip function.zip index.mjs

# Create the Lambda function
aws lambda create-function \
    --function-name event-processor \
    --runtime nodejs20.x \
    --handler index.handler \
    --role arn:aws:iam::YOUR_ACCOUNT_ID:role/event-processor-role \
    --zip-file fileb://function.zip \
    --timeout 60 \
    --memory-size 256

# Verify deployment
aws lambda get-function --function-name event-processor

Configure the Kinesis Event Source

Connect Lambda to your Kinesis stream with an event source mapping. According to AWS documentation, you can configure batching behavior to balance latency and throughput:

# Create event source mapping
aws lambda create-event-source-mapping \
    --function-name event-processor \
    --event-source-arn arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/event-pipeline \
    --starting-position LATEST \
    --batch-size 100 \
    --maximum-batching-window-in-seconds 5 \
    --parallelization-factor 2 \
    --function-response-types ReportBatchItemFailures

Key configuration options:

  • batch-size: Maximum records per invocation (1-10,000)
  • maximum-batching-window-in-seconds: Wait time for batch accumulation (0-300 seconds)
  • parallelization-factor: Concurrent Lambda invocations per shard (1-10)
  • function-response-types: Enable partial batch failure reporting

According to AWS documentation, using parallelization-factor allows processing one shard with multiple concurrent Lambda invocations while maintaining in-order processing at the partition-key level.

Implementing Idempotent Processing

AWS documentation emphasizes that Lambda event source mappings process each event at least once, which can result in duplicate processing. Your function must be idempotent—producing the same result regardless of how many times it processes the same event.

Strategies for idempotency:

// Using DynamoDB for deduplication
import { DynamoDBClient, PutItemCommand, ConditionalCheckFailedException } from '@aws-sdk/client-dynamodb';

const dynamodb = new DynamoDBClient({ region: process.env.AWS_REGION });
const DEDUP_TABLE = process.env.DEDUP_TABLE;

async function processEventIdempotently(event) {
    const eventId = event.event_id;

    try {
        // Attempt to record this event ID
        await dynamodb.send(new PutItemCommand({
            TableName: DEDUP_TABLE,
            Item: {
                event_id: { S: eventId },
                processed_at: { S: new Date().toISOString() },
                ttl: { N: String(Math.floor(Date.now() / 1000) + 86400) } // 24-hour TTL
            },
            ConditionExpression: 'attribute_not_exists(event_id)'
        }));

        // First time seeing this event - process it
        return await processEvent(event);

    } catch (error) {
        if (error instanceof ConditionalCheckFailedException) {
            // Already processed - skip
            console.log(`Skipping duplicate event: ${eventId}`);
            return null;
        }
        throw error;
    }
}

Integrating the Pipeline with S3 for Long-Term Storage

Processed events need durable storage for analytics, compliance, and historical analysis. Amazon S3 provides cost-effective, scalable storage that integrates with analytical tools like Athena and Redshift Spectrum.

Add S3 Permissions to Lambda

Extend the Lambda execution role with S3 write permissions:

# Create S3 access policy
cat > s3-write-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::your-data-lake-bucket/events/*"
        }
    ]
}
EOF

# Attach to Lambda role
aws iam put-role-policy \
    --role-name event-processor-role \
    --policy-name S3WritePolicy \
    --policy-document file://s3-write-policy.json

Update Lambda to Write to S3

Modify your function to persist processed events to S3 using a time-partitioned structure:

// index.mjs - Event processor with S3 integration
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const s3 = new S3Client({ region: process.env.AWS_REGION });
const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET;

export const handler = async (event) => {
    const batchItemFailures = [];
    const processedEvents = [];

    // Process each record
    for (const record of event.Records) {
        try {
            const processed = await processEvent(record);
            processedEvents.push(processed);
        } catch (error) {
            console.error(`Failed: ${record.kinesis.sequenceNumber}`, error.message);
            batchItemFailures.push({
                itemIdentifier: record.kinesis.sequenceNumber
            });
        }
    }

    // Write batch to S3
    if (processedEvents.length > 0) {
        await writeEventsToS3(processedEvents);
    }

    return { batchItemFailures };
};

/**
 * Write events to S3 with time-based partitioning.
 */
async function writeEventsToS3(events) {
    const now = new Date();

    // Generate partition path: events/year=YYYY/month=MM/day=DD/hour=HH/
    const partitionPath = [
        'events',
        `year=${now.getUTCFullYear()}`,
        `month=${String(now.getUTCMonth() + 1).padStart(2, '0')}`,
        `day=${String(now.getUTCDate()).padStart(2, '0')}`,
        `hour=${String(now.getUTCHours()).padStart(2, '0')}`
    ].join('/');

    // Create unique filename with timestamp and random suffix
    const filename = `${now.toISOString().replace(/[:.]/g, '-')}-${randomId()}.ndjson`;
    const key = `${partitionPath}/${filename}`;

    // Format as newline-delimited JSON for Athena compatibility
    const body = events.map(e => JSON.stringify(e)).join('\n');

    await s3.send(new PutObjectCommand({
        Bucket: OUTPUT_BUCKET,
        Key: key,
        Body: body,
        ContentType: 'application/x-ndjson'
    }));

    console.log(`Wrote ${events.length} events to s3://${OUTPUT_BUCKET}/${key}`);
}

function randomId() {
    return Math.random().toString(36).slice(2, 10);
}

async function processEvent(record) {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const event = JSON.parse(payload);

    return {
        event_id: event.event_id,
        event_type: event.event_type,
        user_id: event.user_id,
        timestamp: event.timestamp,
        properties: event.properties,
        event_date: event.timestamp.split('T')[0],
        processed_at: new Date().toISOString()
    };
}

Update the Lambda configuration:

aws lambda update-function-configuration \
    --function-name event-processor \
    --environment 'Variables={OUTPUT_BUCKET=your-data-lake-bucket}'

Query Events with Amazon Athena

Create an Athena table to query your event data directly in S3:

-- Create database for event analytics
CREATE DATABASE IF NOT EXISTS event_analytics;

-- Create table with Hive-style partitions
CREATE EXTERNAL TABLE event_analytics.events (
    event_id STRING,
    event_type STRING,
    user_id STRING,
    timestamp STRING,
    properties MAP<STRING, STRING>,
    processed_at STRING
)
PARTITIONED BY (
    year STRING,
    month STRING,
    day STRING,
    hour STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://your-data-lake-bucket/events/'
TBLPROPERTIES ('has_encrypted_data'='false');

-- Load partitions
MSCK REPAIR TABLE event_analytics.events;

-- Example: Analyze events by type for today
SELECT
    event_type,
    COUNT(*) as event_count,
    COUNT(DISTINCT user_id) as unique_users
FROM event_analytics.events
WHERE year = '2026' AND month = '01' AND day = '25'
GROUP BY event_type
ORDER BY event_count DESC;

-- Example: Track user journey
SELECT
    user_id,
    event_type,
    timestamp
FROM event_analytics.events
WHERE user_id = 'user_456'
    AND year = '2026' AND month = '01'
ORDER BY timestamp;

Alternative: Amazon Data Firehose for Automatic Delivery

For simpler pipelines where you don't need custom transformation logic, Amazon Data Firehose provides automatic delivery from Kinesis to S3 with built-in batching and compression:

# Create Firehose delivery stream
aws firehose create-delivery-stream \
    --delivery-stream-name events-to-s3 \
    --delivery-stream-type KinesisStreamAsSource \
    --kinesis-stream-source-configuration '{
        "KinesisStreamARN": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/event-pipeline",
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-kinesis-role"
    }' \
    --extended-s3-destination-configuration '{
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-s3-role",
        "BucketARN": "arn:aws:s3:::your-data-lake-bucket",
        "Prefix": "raw-events/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
        "BufferingHints": {
            "SizeInMBs": 64,
            "IntervalInSeconds": 60
        },
        "CompressionFormat": "GZIP"
    }'

This approach delivers raw events to S3 while your Lambda function can process and route them for real-time use cases.

Monitoring Your Event-Driven Pipeline

Effective monitoring is essential for event-driven systems where problems can cascade quickly.

Key Metrics to Monitor

Lambda publishes several metrics to CloudWatch automatically:

  • Invocations: Total function executions
  • Errors: Failed executions
  • Duration: Processing time per invocation
  • IteratorAge: Time lag between event arrival and processing

For Kinesis:

  • IncomingRecords: Records written to the stream
  • GetRecords.IteratorAgeMilliseconds: Processing backlog

Create Monitoring Alarms

# Alert if processing falls behind (iterator age > 1 minute)
aws cloudwatch put-metric-alarm \
    --alarm-name "EventPipeline-ProcessingLag" \
    --metric-name "IteratorAge" \
    --namespace "AWS/Lambda" \
    --dimensions Name=FunctionName,Value=event-processor \
    --statistic Maximum \
    --period 60 \
    --threshold 60000 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 3 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts

# Alert on high error rate
aws cloudwatch put-metric-alarm \
    --alarm-name "EventPipeline-HighErrorRate" \
    --metric-name "Errors" \
    --namespace "AWS/Lambda" \
    --dimensions Name=FunctionName,Value=event-processor \
    --statistic Sum \
    --period 300 \
    --threshold 10 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 1 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts

Conclusion

You've built a complete event-driven data pipeline that ingests events through Kinesis, processes them with Lambda, and stores results in S3 for analysis. This architecture provides real-time processing with automatic scaling and minimal operational overhead.

Key takeaways for building event-driven pipelines:

  • Design for idempotency: Lambda processes events at least once, so your function must handle duplicates gracefully
  • Choose partition keys carefully: They determine event ordering and shard distribution
  • Monitor iterator age: High values indicate your pipeline is falling behind
  • Use batching windows: Balance latency requirements against processing efficiency
  • Partition data in S3: Time-based partitions enable efficient queries with Athena

Event-driven architecture excels when you need real-time responsiveness, decoupled components, and elastic scaling. As your pipeline evolves, consider adding EventBridge for routing events to multiple consumers, Step Functions for orchestrating complex workflows, or DynamoDB for maintaining real-time aggregations.

Sources