Serverless Data Processing Made Easy: Building a Scalable Pipeline on AWS

Serverless computing has fundamentally changed how data engineers build and operate data pipelines. Instead of provisioning servers, managing capacity, and paying for idle resources, you can focus entirely on your data processing logic while AWS handles the infrastructure. In this guide, you'll learn how to build a serverless data processing pipeline using AWS Lambda and Amazon Kinesis, with practical Node.js examples and S3 integration for long-term storage.

Understanding Serverless Computing in Data Engineering

Serverless computing eliminates the need to manage infrastructure while providing automatic scaling and pay-per-use pricing. According to AWS documentation, Lambda is a serverless compute service that runs code without requiring server management, automatically scaling based on demand.

Why Serverless for Data Pipelines?

Traditional data pipelines require capacity planning, server provisioning, and ongoing maintenance. Serverless architectures offer distinct advantages:

Traditional Pipeline Serverless Pipeline
Fixed capacity requires manual scaling Automatic scaling to match demand
Pay for idle servers Pay only for actual compute time
Infrastructure maintenance overhead Managed infrastructure
Complex deployment processes Simple function deployment

The Lambda Execution Model

Lambda follows an event-driven model that aligns naturally with data processing workflows:

  1. Event sources (Kinesis, S3, API Gateway) trigger your function
  2. Lambda runtime executes your code in a managed environment
  3. Automatic scaling handles concurrent invocations
  4. Built-in fault tolerance retries failed executions

According to AWS documentation, Lambda supports multiple use cases including file processing with S3 and stream processing with Kinesis Data Streams for real-time analytics and monitoring.

Prerequisites

Before you begin, ensure you have:

  • An AWS account with appropriate IAM permissions
  • AWS CLI installed and configured
  • Node.js 18.x or later installed locally
  • Basic familiarity with JavaScript/Node.js
  • Understanding of AWS IAM roles and policies

Setting Up a Lambda Function for Data Processing

Let's start by creating a Lambda function that processes incoming data records. We'll use Node.js for its excellent performance with JSON data and broad ecosystem of data processing libraries.

Create the IAM Execution Role

Lambda functions need an execution role that grants permission to access AWS services:

# Create the trust policy document
cat > trust-policy.json << 'EOF'
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "lambda.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
EOF

# Create the IAM role
aws iam create-role \
    --role-name lambda-kinesis-processor-role \
    --assume-role-policy-document file://trust-policy.json

# Attach basic Lambda execution policy (CloudWatch Logs)
aws iam attach-role-policy \
    --role-name lambda-kinesis-processor-role \
    --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole

Create the Lambda Function

Create a Node.js function that processes individual records:

// index.mjs - Lambda function for processing data records

/**
 * Process incoming data records from Kinesis.
 * Validates, transforms, and prepares data for downstream storage.
 */
export const handler = async (event) => {
    console.log(`Processing ${event.Records.length} records`);

    const processedRecords = [];
    const failedRecords = [];

    for (const record of event.Records) {
        try {
            // Decode base64-encoded Kinesis data
            const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
            const data = JSON.parse(payload);

            // Validate required fields
            if (!data.event_id || !data.timestamp) {
                throw new Error('Missing required fields: event_id or timestamp');
            }

            // Transform the record
            const transformedRecord = {
                event_id: data.event_id,
                user_id: data.user_id || 'anonymous',
                event_type: data.event_type,
                timestamp: data.timestamp,
                processed_at: new Date().toISOString(),
                // Add derived fields
                event_date: data.timestamp.split('T')[0],
                event_hour: new Date(data.timestamp).getUTCHours(),
                // Preserve original payload for audit
                raw_data: data
            };

            processedRecords.push(transformedRecord);

        } catch (error) {
            console.error(`Error processing record: ${error.message}`);
            failedRecords.push({
                sequenceNumber: record.kinesis.sequenceNumber,
                error: error.message
            });
        }
    }

    console.log(`Successfully processed ${processedRecords.length} records`);
    console.log(`Failed to process ${failedRecords.length} records`);

    return {
        statusCode: 200,
        body: {
            processed: processedRecords.length,
            failed: failedRecords.length,
            records: processedRecords
        }
    };
};

Deploy the Lambda Function

# Create deployment package
zip function.zip index.mjs

# Create the Lambda function
aws lambda create-function \
    --function-name kinesis-data-processor \
    --runtime nodejs18.x \
    --handler index.handler \
    --role arn:aws:iam::YOUR_ACCOUNT_ID:role/lambda-kinesis-processor-role \
    --zip-file fileb://function.zip \
    --timeout 60 \
    --memory-size 256

# Verify the function
aws lambda get-function --function-name kinesis-data-processor

Configure Function Settings

For data processing workloads, configure appropriate timeout and memory settings:

# Update function configuration
aws lambda update-function-configuration \
    --function-name kinesis-data-processor \
    --timeout 120 \
    --memory-size 512 \
    --environment 'Variables={LOG_LEVEL=INFO,OUTPUT_BUCKET=your-output-bucket}'

Creating a Kinesis Stream and Processing Pipeline

Amazon Kinesis Data Streams captures and buffers streaming data for processing. According to AWS documentation, Kinesis provides a typical put-to-get delay of less than one second, making it suitable for real-time data processing.

Create the Kinesis Data Stream

# Create a Kinesis stream with 2 shards
aws kinesis create-stream \
    --stream-name serverless-events \
    --shard-count 2

# Wait for the stream to become active
aws kinesis wait stream-exists --stream-name serverless-events

# Verify stream status
aws kinesis describe-stream-summary --stream-name serverless-events

Understanding Shards and Throughput

Each Kinesis shard provides:

  • Write capacity: Up to 1,000 records per second or 1 MB per second
  • Read capacity: Up to 2 MB per second (or 2 MB/sec per consumer with enhanced fan-out)

For this tutorial, two shards provide 2,000 records per second of write throughput.

Configure Lambda to Process Kinesis Records

According to AWS documentation, Lambda polls each shard in your Kinesis stream and invokes your function with batches of records. You can configure batching behavior to optimize for throughput or latency.

First, grant Lambda permission to read from Kinesis:

# Create the Kinesis access policy
cat > kinesis-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards",
                "kinesis:ListStreams"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events"
        }
    ]
}
EOF

# Create and attach the policy
aws iam put-role-policy \
    --role-name lambda-kinesis-processor-role \
    --policy-name KinesisReadPolicy \
    --policy-document file://kinesis-policy.json

Create the event source mapping to connect Lambda to Kinesis:

# Create event source mapping
aws lambda create-event-source-mapping \
    --function-name kinesis-data-processor \
    --event-source-arn arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events \
    --starting-position LATEST \
    --batch-size 100 \
    --maximum-batching-window-in-seconds 5

# Verify the mapping
aws lambda list-event-source-mappings \
    --function-name kinesis-data-processor

Batching and Parallelization Options

According to AWS documentation, you can tune the event source mapping for your workload:

Batch Size and Batching Window: - batch-size: Maximum records per invocation (up to 10,000) - maximum-batching-window-in-seconds: Buffer time before invoking (up to 300 seconds)

Parallelization: - parallelization-factor: Concurrent batches per shard (1-10)

# Update for higher throughput with parallel processing
aws lambda update-event-source-mapping \
    --uuid YOUR_MAPPING_UUID \
    --batch-size 500 \
    --maximum-batching-window-in-seconds 10 \
    --parallelization-factor 2

Enhanced Lambda Function with Error Handling

AWS documentation emphasizes that Lambda processes each event at least once, so your function should be idempotent. Here's an enhanced version with proper error handling:

// index.mjs - Enhanced Lambda function with error handling

import { KinesisClient, PutRecordCommand } from '@aws-sdk/client-kinesis';

const kinesis = new KinesisClient({ region: process.env.AWS_REGION });

/**
 * Process Kinesis records with error handling and retry logic.
 * Returns failed record identifiers for partial batch failure reporting.
 */
export const handler = async (event) => {
    const batchItemFailures = [];

    for (const record of event.Records) {
        try {
            await processRecord(record);
        } catch (error) {
            console.error(`Failed to process record ${record.kinesis.sequenceNumber}:`, error);

            // Report this record as failed for retry
            batchItemFailures.push({
                itemIdentifier: record.kinesis.sequenceNumber
            });
        }
    }

    // Return failed items for Lambda to retry
    return { batchItemFailures };
};

async function processRecord(record) {
    // Decode the record
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const data = JSON.parse(payload);

    // Validate the record
    validateRecord(data);

    // Transform the record
    const transformed = transformRecord(data, record);

    // Process based on event type
    await routeRecord(transformed);

    return transformed;
}

function validateRecord(data) {
    const requiredFields = ['event_id', 'event_type', 'timestamp'];

    for (const field of requiredFields) {
        if (!data[field]) {
            throw new Error(`Missing required field: ${field}`);
        }
    }

    // Validate timestamp format
    const timestamp = new Date(data.timestamp);
    if (isNaN(timestamp.getTime())) {
        throw new Error(`Invalid timestamp format: ${data.timestamp}`);
    }
}

function transformRecord(data, kinesisRecord) {
    return {
        // Core fields
        event_id: data.event_id,
        event_type: data.event_type,
        user_id: data.user_id || 'anonymous',
        timestamp: data.timestamp,

        // Derived fields
        event_date: data.timestamp.split('T')[0],
        hour_of_day: new Date(data.timestamp).getUTCHours(),
        day_of_week: new Date(data.timestamp).getUTCDay(),

        // Kinesis metadata
        partition_key: kinesisRecord.kinesis.partitionKey,
        sequence_number: kinesisRecord.kinesis.sequenceNumber,
        shard_id: kinesisRecord.eventID.split(':')[0],
        approximate_arrival: kinesisRecord.kinesis.approximateArrivalTimestamp,

        // Processing metadata
        processed_at: new Date().toISOString(),
        processor_version: '1.0.0',

        // Original payload
        payload: data
    };
}

async function routeRecord(record) {
    // Route based on event type for downstream processing
    switch (record.event_type) {
        case 'purchase':
        case 'refund':
            // High-priority financial events
            console.log(`Financial event: ${record.event_id}`);
            break;
        case 'page_view':
        case 'click':
            // Standard analytics events
            console.log(`Analytics event: ${record.event_id}`);
            break;
        default:
            console.log(`Unknown event type: ${record.event_type}`);
    }
}

Send Test Data to Kinesis

Create a simple producer to test your pipeline:

// producer.mjs - Test data producer

import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';

const kinesis = new KinesisClient({ region: 'us-east-1' });
const STREAM_NAME = 'serverless-events';

function generateEvent() {
    const eventTypes = ['page_view', 'click', 'purchase', 'search', 'add_to_cart'];
    const regions = ['us-east', 'us-west', 'eu-west', 'ap-southeast'];

    return {
        event_id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
        user_id: `user_${Math.floor(Math.random() * 10000)}`,
        event_type: eventTypes[Math.floor(Math.random() * eventTypes.length)],
        page: `/products/${Math.floor(Math.random() * 500)}`,
        region: regions[Math.floor(Math.random() * regions.length)],
        timestamp: new Date().toISOString(),
        value: Math.random() > 0.5 ? Math.round(Math.random() * 500 * 100) / 100 : 0
    };
}

async function sendBatch(batchSize = 50) {
    const records = [];

    for (let i = 0; i < batchSize; i++) {
        const event = generateEvent();
        records.push({
            Data: Buffer.from(JSON.stringify(event)),
            PartitionKey: event.user_id
        });
    }

    const command = new PutRecordsCommand({
        StreamName: STREAM_NAME,
        Records: records
    });

    const response = await kinesis.send(command);
    console.log(`Sent ${batchSize} records, ${response.FailedRecordCount} failed`);

    return response;
}

// Send batches continuously
async function main() {
    while (true) {
        await sendBatch(50);
        await new Promise(resolve => setTimeout(resolve, 1000));
    }
}

main().catch(console.error);

Run the producer:

node producer.mjs

Integrating the Pipeline with S3 for Long-Term Storage

Processed data needs durable storage for analytics, compliance, and reprocessing. Amazon S3 provides a cost-effective destination that integrates with analytical tools like Athena, Redshift Spectrum, and EMR.

Option 1: Direct S3 Writes from Lambda

Update the Lambda function to write processed records directly to S3:

// index.mjs - Lambda function with S3 integration

import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';

const s3 = new S3Client({ region: process.env.AWS_REGION });
const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET;

export const handler = async (event) => {
    const batchItemFailures = [];
    const processedRecords = [];

    // Process all records
    for (const record of event.Records) {
        try {
            const processed = await processRecord(record);
            processedRecords.push(processed);
        } catch (error) {
            console.error(`Failed: ${record.kinesis.sequenceNumber}`, error);
            batchItemFailures.push({
                itemIdentifier: record.kinesis.sequenceNumber
            });
        }
    }

    // Write batch to S3 if we have processed records
    if (processedRecords.length > 0) {
        await writeBatchToS3(processedRecords);
    }

    return { batchItemFailures };
};

async function writeBatchToS3(records) {
    // Generate time-based path for partitioning
    const now = new Date();
    const year = now.getUTCFullYear();
    const month = String(now.getUTCMonth() + 1).padStart(2, '0');
    const day = String(now.getUTCDate()).padStart(2, '0');
    const hour = String(now.getUTCHours()).padStart(2, '0');

    // Create unique file name
    const timestamp = now.toISOString().replace(/[:.]/g, '-');
    const key = `processed/year=${year}/month=${month}/day=${day}/hour=${hour}/${timestamp}.json`;

    // Write as newline-delimited JSON (NDJSON)
    const body = records.map(r => JSON.stringify(r)).join('\n');

    const command = new PutObjectCommand({
        Bucket: OUTPUT_BUCKET,
        Key: key,
        Body: body,
        ContentType: 'application/x-ndjson'
    });

    await s3.send(command);
    console.log(`Wrote ${records.length} records to s3://${OUTPUT_BUCKET}/${key}`);
}

async function processRecord(record) {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
    const data = JSON.parse(payload);

    return {
        event_id: data.event_id,
        event_type: data.event_type,
        user_id: data.user_id || 'anonymous',
        timestamp: data.timestamp,
        event_date: data.timestamp.split('T')[0],
        payload: data,
        processed_at: new Date().toISOString()
    };
}

Add S3 permissions to the Lambda role:

# Create S3 write policy
cat > s3-policy.json << 'EOF'
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::your-output-bucket/processed/*"
        }
    ]
}
EOF

# Attach the policy
aws iam put-role-policy \
    --role-name lambda-kinesis-processor-role \
    --policy-name S3WritePolicy \
    --policy-document file://s3-policy.json

# Update Lambda environment variable
aws lambda update-function-configuration \
    --function-name kinesis-data-processor \
    --environment 'Variables={OUTPUT_BUCKET=your-output-bucket}'

Option 2: Using Amazon Data Firehose for Delivery

For higher throughput or when you need automatic batching and compression, Amazon Data Firehose provides a fully managed delivery pipeline. According to AWS documentation, Firehose automatically buffers data by size or time interval before delivery.

# Create Firehose delivery stream
aws firehose create-delivery-stream \
    --delivery-stream-name processed-events-delivery \
    --delivery-stream-type KinesisStreamAsSource \
    --kinesis-stream-source-configuration '{
        "KinesisStreamARN": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events",
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-kinesis-role"
    }' \
    --extended-s3-destination-configuration '{
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-s3-role",
        "BucketARN": "arn:aws:s3:::your-output-bucket",
        "Prefix": "raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
        "ErrorOutputPrefix": "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
        "BufferingHints": {
            "SizeInMBs": 64,
            "IntervalInSeconds": 60
        },
        "CompressionFormat": "GZIP",
        "DataFormatConversionConfiguration": {
            "Enabled": false
        }
    }'

Hybrid Approach: Lambda Processing with Firehose Delivery

For complex transformations with efficient delivery, combine Lambda processing with Firehose:

Kinesis → Lambda (transform) → Output Kinesis Stream → Firehose → S3

Create a second Kinesis stream for processed records:

# Create output stream
aws kinesis create-stream \
    --stream-name processed-events \
    --shard-count 2

Update Lambda to write to the output stream:

// index.mjs - Lambda with Kinesis output

import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';

const kinesis = new KinesisClient({ region: process.env.AWS_REGION });
const OUTPUT_STREAM = process.env.OUTPUT_STREAM || 'processed-events';

export const handler = async (event) => {
    const batchItemFailures = [];
    const outputRecords = [];

    for (const record of event.Records) {
        try {
            const processed = await processRecord(record);
            outputRecords.push({
                Data: Buffer.from(JSON.stringify(processed)),
                PartitionKey: processed.event_date
            });
        } catch (error) {
            batchItemFailures.push({
                itemIdentifier: record.kinesis.sequenceNumber
            });
        }
    }

    // Write to output stream
    if (outputRecords.length > 0) {
        const command = new PutRecordsCommand({
            StreamName: OUTPUT_STREAM,
            Records: outputRecords
        });

        const response = await kinesis.send(command);
        console.log(`Forwarded ${outputRecords.length} records, ${response.FailedRecordCount} failed`);
    }

    return { batchItemFailures };
};

Query Data with Amazon Athena

Once data lands in S3, create an Athena table to query it:

-- Create database
CREATE DATABASE IF NOT EXISTS serverless_events;

-- Create table over S3 data
CREATE EXTERNAL TABLE serverless_events.processed_events (
    event_id STRING,
    event_type STRING,
    user_id STRING,
    timestamp STRING,
    event_date STRING,
    payload STRING,
    processed_at STRING
)
PARTITIONED BY (
    year STRING,
    month STRING,
    day STRING,
    hour STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://your-output-bucket/processed/'
TBLPROPERTIES ('has_encrypted_data'='false');

-- Load partitions
MSCK REPAIR TABLE serverless_events.processed_events;

-- Query recent events
SELECT
    event_type,
    COUNT(*) as event_count,
    COUNT(DISTINCT user_id) as unique_users
FROM serverless_events.processed_events
WHERE year = '2026' AND month = '01' AND day = '25'
GROUP BY event_type
ORDER BY event_count DESC;

Monitoring and Operational Excellence

CloudWatch Metrics and Alarms

Lambda automatically publishes metrics to CloudWatch. Create alarms for critical conditions:

# Alert on high error rate
aws cloudwatch put-metric-alarm \
    --alarm-name "LambdaHighErrorRate" \
    --metric-name "Errors" \
    --namespace "AWS/Lambda" \
    --dimensions Name=FunctionName,Value=kinesis-data-processor \
    --statistic Sum \
    --period 300 \
    --threshold 10 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 2 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts

# Alert on iterator age (processing falling behind)
aws cloudwatch put-metric-alarm \
    --alarm-name "KinesisIteratorAge" \
    --metric-name "IteratorAge" \
    --namespace "AWS/Lambda" \
    --dimensions Name=FunctionName,Value=kinesis-data-processor \
    --statistic Maximum \
    --period 60 \
    --threshold 60000 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 3 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts

Lambda Function Logging

Implement structured logging for observability:

// Structured logging utility
function log(level, message, metadata = {}) {
    console.log(JSON.stringify({
        level,
        message,
        timestamp: new Date().toISOString(),
        ...metadata
    }));
}

export const handler = async (event) => {
    const requestId = event.Records[0]?.eventID || 'unknown';

    log('INFO', 'Starting batch processing', {
        requestId,
        recordCount: event.Records.length
    });

    // ... processing logic ...

    log('INFO', 'Completed batch processing', {
        requestId,
        processed: processedCount,
        failed: failedCount,
        durationMs: Date.now() - startTime
    });
};

Best Practices for Production

Lambda Best Practices

  • Right-size memory: Lambda CPU scales with memory; benchmark to find optimal settings
  • Minimize cold starts: Keep functions warm for latency-sensitive workloads
  • Use environment variables: Store configuration outside code for flexibility
  • Implement idempotency: According to AWS documentation, Lambda processes each event at least once, so your function should handle duplicate processing

Kinesis Best Practices

  • Choose partition keys carefully: Distribute load evenly across shards to prevent hot spots
  • Monitor iterator age: High iterator age indicates processing is falling behind
  • Use enhanced fan-out: When multiple consumers need dedicated throughput
  • Enable server-side encryption: Protect data at rest with KMS

Cost Optimization

  • Batch efficiently: Larger batches reduce per-invocation overhead
  • Use ARM64 architecture: Lambda Graviton2 processors offer better price-performance
  • Compress data: Reduce Kinesis and S3 costs with compression
  • Right-size shards: Don't over-provision Kinesis capacity
# Create ARM64 Lambda function for cost savings
aws lambda update-function-configuration \
    --function-name kinesis-data-processor \
    --architectures arm64

Conclusion

You've built a complete serverless data processing pipeline on AWS that ingests streaming data through Kinesis, transforms it with Lambda, and stores results in S3 for long-term analysis. This architecture provides automatic scaling, pay-per-use pricing, and minimal operational overhead.

Key components in this architecture:

  • AWS Lambda provides serverless compute that scales automatically with incoming data
  • Amazon Kinesis Data Streams captures and buffers streaming data with sub-second latency
  • Amazon S3 offers durable, cost-effective storage for processed data
  • Amazon Athena enables SQL queries directly on S3 data

As your pipeline matures, consider extending it with:

  • Amazon Data Firehose for automatic batching and delivery to S3, Redshift, or OpenSearch
  • AWS Step Functions for orchestrating complex multi-step data workflows
  • Amazon EventBridge for routing events to multiple destinations
  • Amazon Redshift for data warehousing and complex analytical queries

Sources

  1. What is AWS Lambda? - AWS official documentation on Lambda architecture, features, and serverless computing model
  2. Using Lambda with Kinesis - AWS documentation on configuring Lambda to process Kinesis streams, including batching, parallelization, and error handling
  3. What is Amazon Kinesis Data Streams? - AWS documentation on Kinesis architecture, shards, throughput, and real-time data processing
  4. What is Amazon Data Firehose? - AWS documentation on Firehose for automatic delivery of streaming data to S3 and other destinations
  5. Using Lambda with S3 - AWS documentation on S3 event notifications and Lambda integration patterns