Serverless Data Processing Made Easy: Building a Scalable Pipeline on AWS
Serverless computing has fundamentally changed how data engineers build and operate data pipelines. Instead of provisioning servers, managing capacity, and paying for idle resources, you can focus entirely on your data processing logic while AWS handles the infrastructure. In this guide, you'll learn how to build a serverless data processing pipeline using AWS Lambda and Amazon Kinesis, with practical Node.js examples and S3 integration for long-term storage.
Understanding Serverless Computing in Data Engineering
Serverless computing eliminates the need to manage infrastructure while providing automatic scaling and pay-per-use pricing. According to AWS documentation, Lambda is a serverless compute service that runs code without requiring server management, automatically scaling based on demand.
Why Serverless for Data Pipelines?
Traditional data pipelines require capacity planning, server provisioning, and ongoing maintenance. Serverless architectures offer distinct advantages:
| Traditional Pipeline | Serverless Pipeline |
|---|---|
| Fixed capacity requires manual scaling | Automatic scaling to match demand |
| Pay for idle servers | Pay only for actual compute time |
| Infrastructure maintenance overhead | Managed infrastructure |
| Complex deployment processes | Simple function deployment |
The Lambda Execution Model
Lambda follows an event-driven model that aligns naturally with data processing workflows:
- Event sources (Kinesis, S3, API Gateway) trigger your function
- Lambda runtime executes your code in a managed environment
- Automatic scaling handles concurrent invocations
- Built-in fault tolerance retries failed executions
According to AWS documentation, Lambda supports multiple use cases including file processing with S3 and stream processing with Kinesis Data Streams for real-time analytics and monitoring.
Prerequisites
Before you begin, ensure you have:
- An AWS account with appropriate IAM permissions
- AWS CLI installed and configured
- Node.js 18.x or later installed locally
- Basic familiarity with JavaScript/Node.js
- Understanding of AWS IAM roles and policies
Setting Up a Lambda Function for Data Processing
Let's start by creating a Lambda function that processes incoming data records. We'll use Node.js for its excellent performance with JSON data and broad ecosystem of data processing libraries.
Create the IAM Execution Role
Lambda functions need an execution role that grants permission to access AWS services:
# Create the trust policy document
cat > trust-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
EOF
# Create the IAM role
aws iam create-role \
--role-name lambda-kinesis-processor-role \
--assume-role-policy-document file://trust-policy.json
# Attach basic Lambda execution policy (CloudWatch Logs)
aws iam attach-role-policy \
--role-name lambda-kinesis-processor-role \
--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
Create the Lambda Function
Create a Node.js function that processes individual records:
// index.mjs - Lambda function for processing data records
/**
* Process incoming data records from Kinesis.
* Validates, transforms, and prepares data for downstream storage.
*/
export const handler = async (event) => {
console.log(`Processing ${event.Records.length} records`);
const processedRecords = [];
const failedRecords = [];
for (const record of event.Records) {
try {
// Decode base64-encoded Kinesis data
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
const data = JSON.parse(payload);
// Validate required fields
if (!data.event_id || !data.timestamp) {
throw new Error('Missing required fields: event_id or timestamp');
}
// Transform the record
const transformedRecord = {
event_id: data.event_id,
user_id: data.user_id || 'anonymous',
event_type: data.event_type,
timestamp: data.timestamp,
processed_at: new Date().toISOString(),
// Add derived fields
event_date: data.timestamp.split('T')[0],
event_hour: new Date(data.timestamp).getUTCHours(),
// Preserve original payload for audit
raw_data: data
};
processedRecords.push(transformedRecord);
} catch (error) {
console.error(`Error processing record: ${error.message}`);
failedRecords.push({
sequenceNumber: record.kinesis.sequenceNumber,
error: error.message
});
}
}
console.log(`Successfully processed ${processedRecords.length} records`);
console.log(`Failed to process ${failedRecords.length} records`);
return {
statusCode: 200,
body: {
processed: processedRecords.length,
failed: failedRecords.length,
records: processedRecords
}
};
};
Deploy the Lambda Function
# Create deployment package
zip function.zip index.mjs
# Create the Lambda function
aws lambda create-function \
--function-name kinesis-data-processor \
--runtime nodejs18.x \
--handler index.handler \
--role arn:aws:iam::YOUR_ACCOUNT_ID:role/lambda-kinesis-processor-role \
--zip-file fileb://function.zip \
--timeout 60 \
--memory-size 256
# Verify the function
aws lambda get-function --function-name kinesis-data-processor
Configure Function Settings
For data processing workloads, configure appropriate timeout and memory settings:
# Update function configuration
aws lambda update-function-configuration \
--function-name kinesis-data-processor \
--timeout 120 \
--memory-size 512 \
--environment 'Variables={LOG_LEVEL=INFO,OUTPUT_BUCKET=your-output-bucket}'
Creating a Kinesis Stream and Processing Pipeline
Amazon Kinesis Data Streams captures and buffers streaming data for processing. According to AWS documentation, Kinesis provides a typical put-to-get delay of less than one second, making it suitable for real-time data processing.
Create the Kinesis Data Stream
# Create a Kinesis stream with 2 shards
aws kinesis create-stream \
--stream-name serverless-events \
--shard-count 2
# Wait for the stream to become active
aws kinesis wait stream-exists --stream-name serverless-events
# Verify stream status
aws kinesis describe-stream-summary --stream-name serverless-events
Understanding Shards and Throughput
Each Kinesis shard provides:
- Write capacity: Up to 1,000 records per second or 1 MB per second
- Read capacity: Up to 2 MB per second (or 2 MB/sec per consumer with enhanced fan-out)
For this tutorial, two shards provide 2,000 records per second of write throughput.
Configure Lambda to Process Kinesis Records
According to AWS documentation, Lambda polls each shard in your Kinesis stream and invokes your function with batches of records. You can configure batching behavior to optimize for throughput or latency.
First, grant Lambda permission to read from Kinesis:
# Create the Kinesis access policy
cat > kinesis-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream",
"kinesis:DescribeStreamSummary",
"kinesis:ListShards",
"kinesis:ListStreams"
],
"Resource": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events"
}
]
}
EOF
# Create and attach the policy
aws iam put-role-policy \
--role-name lambda-kinesis-processor-role \
--policy-name KinesisReadPolicy \
--policy-document file://kinesis-policy.json
Create the event source mapping to connect Lambda to Kinesis:
# Create event source mapping
aws lambda create-event-source-mapping \
--function-name kinesis-data-processor \
--event-source-arn arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events \
--starting-position LATEST \
--batch-size 100 \
--maximum-batching-window-in-seconds 5
# Verify the mapping
aws lambda list-event-source-mappings \
--function-name kinesis-data-processor
Batching and Parallelization Options
According to AWS documentation, you can tune the event source mapping for your workload:
Batch Size and Batching Window:
- batch-size: Maximum records per invocation (up to 10,000)
- maximum-batching-window-in-seconds: Buffer time before invoking (up to 300 seconds)
Parallelization:
- parallelization-factor: Concurrent batches per shard (1-10)
# Update for higher throughput with parallel processing
aws lambda update-event-source-mapping \
--uuid YOUR_MAPPING_UUID \
--batch-size 500 \
--maximum-batching-window-in-seconds 10 \
--parallelization-factor 2
Enhanced Lambda Function with Error Handling
AWS documentation emphasizes that Lambda processes each event at least once, so your function should be idempotent. Here's an enhanced version with proper error handling:
// index.mjs - Enhanced Lambda function with error handling
import { KinesisClient, PutRecordCommand } from '@aws-sdk/client-kinesis';
const kinesis = new KinesisClient({ region: process.env.AWS_REGION });
/**
* Process Kinesis records with error handling and retry logic.
* Returns failed record identifiers for partial batch failure reporting.
*/
export const handler = async (event) => {
const batchItemFailures = [];
for (const record of event.Records) {
try {
await processRecord(record);
} catch (error) {
console.error(`Failed to process record ${record.kinesis.sequenceNumber}:`, error);
// Report this record as failed for retry
batchItemFailures.push({
itemIdentifier: record.kinesis.sequenceNumber
});
}
}
// Return failed items for Lambda to retry
return { batchItemFailures };
};
async function processRecord(record) {
// Decode the record
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
const data = JSON.parse(payload);
// Validate the record
validateRecord(data);
// Transform the record
const transformed = transformRecord(data, record);
// Process based on event type
await routeRecord(transformed);
return transformed;
}
function validateRecord(data) {
const requiredFields = ['event_id', 'event_type', 'timestamp'];
for (const field of requiredFields) {
if (!data[field]) {
throw new Error(`Missing required field: ${field}`);
}
}
// Validate timestamp format
const timestamp = new Date(data.timestamp);
if (isNaN(timestamp.getTime())) {
throw new Error(`Invalid timestamp format: ${data.timestamp}`);
}
}
function transformRecord(data, kinesisRecord) {
return {
// Core fields
event_id: data.event_id,
event_type: data.event_type,
user_id: data.user_id || 'anonymous',
timestamp: data.timestamp,
// Derived fields
event_date: data.timestamp.split('T')[0],
hour_of_day: new Date(data.timestamp).getUTCHours(),
day_of_week: new Date(data.timestamp).getUTCDay(),
// Kinesis metadata
partition_key: kinesisRecord.kinesis.partitionKey,
sequence_number: kinesisRecord.kinesis.sequenceNumber,
shard_id: kinesisRecord.eventID.split(':')[0],
approximate_arrival: kinesisRecord.kinesis.approximateArrivalTimestamp,
// Processing metadata
processed_at: new Date().toISOString(),
processor_version: '1.0.0',
// Original payload
payload: data
};
}
async function routeRecord(record) {
// Route based on event type for downstream processing
switch (record.event_type) {
case 'purchase':
case 'refund':
// High-priority financial events
console.log(`Financial event: ${record.event_id}`);
break;
case 'page_view':
case 'click':
// Standard analytics events
console.log(`Analytics event: ${record.event_id}`);
break;
default:
console.log(`Unknown event type: ${record.event_type}`);
}
}
Send Test Data to Kinesis
Create a simple producer to test your pipeline:
// producer.mjs - Test data producer
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
const kinesis = new KinesisClient({ region: 'us-east-1' });
const STREAM_NAME = 'serverless-events';
function generateEvent() {
const eventTypes = ['page_view', 'click', 'purchase', 'search', 'add_to_cart'];
const regions = ['us-east', 'us-west', 'eu-west', 'ap-southeast'];
return {
event_id: `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`,
user_id: `user_${Math.floor(Math.random() * 10000)}`,
event_type: eventTypes[Math.floor(Math.random() * eventTypes.length)],
page: `/products/${Math.floor(Math.random() * 500)}`,
region: regions[Math.floor(Math.random() * regions.length)],
timestamp: new Date().toISOString(),
value: Math.random() > 0.5 ? Math.round(Math.random() * 500 * 100) / 100 : 0
};
}
async function sendBatch(batchSize = 50) {
const records = [];
for (let i = 0; i < batchSize; i++) {
const event = generateEvent();
records.push({
Data: Buffer.from(JSON.stringify(event)),
PartitionKey: event.user_id
});
}
const command = new PutRecordsCommand({
StreamName: STREAM_NAME,
Records: records
});
const response = await kinesis.send(command);
console.log(`Sent ${batchSize} records, ${response.FailedRecordCount} failed`);
return response;
}
// Send batches continuously
async function main() {
while (true) {
await sendBatch(50);
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
main().catch(console.error);
Run the producer:
node producer.mjs
Integrating the Pipeline with S3 for Long-Term Storage
Processed data needs durable storage for analytics, compliance, and reprocessing. Amazon S3 provides a cost-effective destination that integrates with analytical tools like Athena, Redshift Spectrum, and EMR.
Option 1: Direct S3 Writes from Lambda
Update the Lambda function to write processed records directly to S3:
// index.mjs - Lambda function with S3 integration
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
const s3 = new S3Client({ region: process.env.AWS_REGION });
const OUTPUT_BUCKET = process.env.OUTPUT_BUCKET;
export const handler = async (event) => {
const batchItemFailures = [];
const processedRecords = [];
// Process all records
for (const record of event.Records) {
try {
const processed = await processRecord(record);
processedRecords.push(processed);
} catch (error) {
console.error(`Failed: ${record.kinesis.sequenceNumber}`, error);
batchItemFailures.push({
itemIdentifier: record.kinesis.sequenceNumber
});
}
}
// Write batch to S3 if we have processed records
if (processedRecords.length > 0) {
await writeBatchToS3(processedRecords);
}
return { batchItemFailures };
};
async function writeBatchToS3(records) {
// Generate time-based path for partitioning
const now = new Date();
const year = now.getUTCFullYear();
const month = String(now.getUTCMonth() + 1).padStart(2, '0');
const day = String(now.getUTCDate()).padStart(2, '0');
const hour = String(now.getUTCHours()).padStart(2, '0');
// Create unique file name
const timestamp = now.toISOString().replace(/[:.]/g, '-');
const key = `processed/year=${year}/month=${month}/day=${day}/hour=${hour}/${timestamp}.json`;
// Write as newline-delimited JSON (NDJSON)
const body = records.map(r => JSON.stringify(r)).join('\n');
const command = new PutObjectCommand({
Bucket: OUTPUT_BUCKET,
Key: key,
Body: body,
ContentType: 'application/x-ndjson'
});
await s3.send(command);
console.log(`Wrote ${records.length} records to s3://${OUTPUT_BUCKET}/${key}`);
}
async function processRecord(record) {
const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8');
const data = JSON.parse(payload);
return {
event_id: data.event_id,
event_type: data.event_type,
user_id: data.user_id || 'anonymous',
timestamp: data.timestamp,
event_date: data.timestamp.split('T')[0],
payload: data,
processed_at: new Date().toISOString()
};
}
Add S3 permissions to the Lambda role:
# Create S3 write policy
cat > s3-policy.json << 'EOF'
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl"
],
"Resource": "arn:aws:s3:::your-output-bucket/processed/*"
}
]
}
EOF
# Attach the policy
aws iam put-role-policy \
--role-name lambda-kinesis-processor-role \
--policy-name S3WritePolicy \
--policy-document file://s3-policy.json
# Update Lambda environment variable
aws lambda update-function-configuration \
--function-name kinesis-data-processor \
--environment 'Variables={OUTPUT_BUCKET=your-output-bucket}'
Option 2: Using Amazon Data Firehose for Delivery
For higher throughput or when you need automatic batching and compression, Amazon Data Firehose provides a fully managed delivery pipeline. According to AWS documentation, Firehose automatically buffers data by size or time interval before delivery.
# Create Firehose delivery stream
aws firehose create-delivery-stream \
--delivery-stream-name processed-events-delivery \
--delivery-stream-type KinesisStreamAsSource \
--kinesis-stream-source-configuration '{
"KinesisStreamARN": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT_ID:stream/serverless-events",
"RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-kinesis-role"
}' \
--extended-s3-destination-configuration '{
"RoleARN": "arn:aws:iam::YOUR_ACCOUNT_ID:role/firehose-s3-role",
"BucketARN": "arn:aws:s3:::your-output-bucket",
"Prefix": "raw/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/",
"ErrorOutputPrefix": "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/",
"BufferingHints": {
"SizeInMBs": 64,
"IntervalInSeconds": 60
},
"CompressionFormat": "GZIP",
"DataFormatConversionConfiguration": {
"Enabled": false
}
}'
Hybrid Approach: Lambda Processing with Firehose Delivery
For complex transformations with efficient delivery, combine Lambda processing with Firehose:
Kinesis → Lambda (transform) → Output Kinesis Stream → Firehose → S3
Create a second Kinesis stream for processed records:
# Create output stream
aws kinesis create-stream \
--stream-name processed-events \
--shard-count 2
Update Lambda to write to the output stream:
// index.mjs - Lambda with Kinesis output
import { KinesisClient, PutRecordsCommand } from '@aws-sdk/client-kinesis';
const kinesis = new KinesisClient({ region: process.env.AWS_REGION });
const OUTPUT_STREAM = process.env.OUTPUT_STREAM || 'processed-events';
export const handler = async (event) => {
const batchItemFailures = [];
const outputRecords = [];
for (const record of event.Records) {
try {
const processed = await processRecord(record);
outputRecords.push({
Data: Buffer.from(JSON.stringify(processed)),
PartitionKey: processed.event_date
});
} catch (error) {
batchItemFailures.push({
itemIdentifier: record.kinesis.sequenceNumber
});
}
}
// Write to output stream
if (outputRecords.length > 0) {
const command = new PutRecordsCommand({
StreamName: OUTPUT_STREAM,
Records: outputRecords
});
const response = await kinesis.send(command);
console.log(`Forwarded ${outputRecords.length} records, ${response.FailedRecordCount} failed`);
}
return { batchItemFailures };
};
Query Data with Amazon Athena
Once data lands in S3, create an Athena table to query it:
-- Create database
CREATE DATABASE IF NOT EXISTS serverless_events;
-- Create table over S3 data
CREATE EXTERNAL TABLE serverless_events.processed_events (
event_id STRING,
event_type STRING,
user_id STRING,
timestamp STRING,
event_date STRING,
payload STRING,
processed_at STRING
)
PARTITIONED BY (
year STRING,
month STRING,
day STRING,
hour STRING
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://your-output-bucket/processed/'
TBLPROPERTIES ('has_encrypted_data'='false');
-- Load partitions
MSCK REPAIR TABLE serverless_events.processed_events;
-- Query recent events
SELECT
event_type,
COUNT(*) as event_count,
COUNT(DISTINCT user_id) as unique_users
FROM serverless_events.processed_events
WHERE year = '2026' AND month = '01' AND day = '25'
GROUP BY event_type
ORDER BY event_count DESC;
Monitoring and Operational Excellence
CloudWatch Metrics and Alarms
Lambda automatically publishes metrics to CloudWatch. Create alarms for critical conditions:
# Alert on high error rate
aws cloudwatch put-metric-alarm \
--alarm-name "LambdaHighErrorRate" \
--metric-name "Errors" \
--namespace "AWS/Lambda" \
--dimensions Name=FunctionName,Value=kinesis-data-processor \
--statistic Sum \
--period 300 \
--threshold 10 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 2 \
--alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts
# Alert on iterator age (processing falling behind)
aws cloudwatch put-metric-alarm \
--alarm-name "KinesisIteratorAge" \
--metric-name "IteratorAge" \
--namespace "AWS/Lambda" \
--dimensions Name=FunctionName,Value=kinesis-data-processor \
--statistic Maximum \
--period 60 \
--threshold 60000 \
--comparison-operator GreaterThanThreshold \
--evaluation-periods 3 \
--alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT_ID:alerts
Lambda Function Logging
Implement structured logging for observability:
// Structured logging utility
function log(level, message, metadata = {}) {
console.log(JSON.stringify({
level,
message,
timestamp: new Date().toISOString(),
...metadata
}));
}
export const handler = async (event) => {
const requestId = event.Records[0]?.eventID || 'unknown';
log('INFO', 'Starting batch processing', {
requestId,
recordCount: event.Records.length
});
// ... processing logic ...
log('INFO', 'Completed batch processing', {
requestId,
processed: processedCount,
failed: failedCount,
durationMs: Date.now() - startTime
});
};
Best Practices for Production
Lambda Best Practices
- Right-size memory: Lambda CPU scales with memory; benchmark to find optimal settings
- Minimize cold starts: Keep functions warm for latency-sensitive workloads
- Use environment variables: Store configuration outside code for flexibility
- Implement idempotency: According to AWS documentation, Lambda processes each event at least once, so your function should handle duplicate processing
Kinesis Best Practices
- Choose partition keys carefully: Distribute load evenly across shards to prevent hot spots
- Monitor iterator age: High iterator age indicates processing is falling behind
- Use enhanced fan-out: When multiple consumers need dedicated throughput
- Enable server-side encryption: Protect data at rest with KMS
Cost Optimization
- Batch efficiently: Larger batches reduce per-invocation overhead
- Use ARM64 architecture: Lambda Graviton2 processors offer better price-performance
- Compress data: Reduce Kinesis and S3 costs with compression
- Right-size shards: Don't over-provision Kinesis capacity
# Create ARM64 Lambda function for cost savings
aws lambda update-function-configuration \
--function-name kinesis-data-processor \
--architectures arm64
Conclusion
You've built a complete serverless data processing pipeline on AWS that ingests streaming data through Kinesis, transforms it with Lambda, and stores results in S3 for long-term analysis. This architecture provides automatic scaling, pay-per-use pricing, and minimal operational overhead.
Key components in this architecture:
- AWS Lambda provides serverless compute that scales automatically with incoming data
- Amazon Kinesis Data Streams captures and buffers streaming data with sub-second latency
- Amazon S3 offers durable, cost-effective storage for processed data
- Amazon Athena enables SQL queries directly on S3 data
As your pipeline matures, consider extending it with:
- Amazon Data Firehose for automatic batching and delivery to S3, Redshift, or OpenSearch
- AWS Step Functions for orchestrating complex multi-step data workflows
- Amazon EventBridge for routing events to multiple destinations
- Amazon Redshift for data warehousing and complex analytical queries
Sources
- What is AWS Lambda? - AWS official documentation on Lambda architecture, features, and serverless computing model
- Using Lambda with Kinesis - AWS documentation on configuring Lambda to process Kinesis streams, including batching, parallelization, and error handling
- What is Amazon Kinesis Data Streams? - AWS documentation on Kinesis architecture, shards, throughput, and real-time data processing
- What is Amazon Data Firehose? - AWS documentation on Firehose for automatic delivery of streaming data to S3 and other destinations
- Using Lambda with S3 - AWS documentation on S3 event notifications and Lambda integration patterns