Streaming Data into Action: Building a Scalable Real-Time Analytics Platform on AWS

Real-time analytics has become essential for organizations that need to make decisions based on data as it arrives rather than waiting for batch processing cycles. Whether you're monitoring application performance, detecting fraud, or personalizing user experiences, the ability to process streaming data at scale is a competitive advantage. In this guide, you'll learn how to build a real-time analytics platform on AWS using Amazon Kinesis, Amazon EMR with Apache Spark, and Amazon Redshift.

Understanding Real-Time Analytics Architecture

Real-time analytics requires a fundamentally different architecture than batch processing. Instead of processing data in scheduled intervals, streaming systems continuously ingest, process, and deliver insights with latency measured in seconds rather than hours.

Key Components

A production real-time analytics platform on AWS typically consists of:

┌─────────────┐     ┌─────────────┐     ┌─────────────┐     ┌─────────────┐
│   Data      │────▶│   Amazon    │────▶│   Amazon    │────▶│   Amazon    │
│   Sources   │     │   Kinesis   │     │   EMR       │     │   Redshift  │
└─────────────┘     └─────────────┘     │   (Spark)   │     └─────────────┘
                          │             └─────────────┘            │
                          │                    │                   │
                     ┌────▼────┐          ┌────▼────┐         ┌────▼────┐
                     │ Real-   │          │ Stream  │         │ BI &    │
                     │ time    │          │ Process │         │ Report  │
                     │ Buffer  │          │ & Enrich│         │ Layer   │
                     └─────────┘          └─────────┘         └─────────┘
  1. Amazon Kinesis Data Streams captures and buffers streaming data
  2. Amazon EMR with Apache Spark processes streams with complex transformations
  3. Amazon Redshift stores processed data for long-term analytics and reporting

Amazon Kinesis: The Foundation of Real-Time Data Ingestion

Amazon Kinesis Data Streams is a managed service for collecting and processing large streams of data records in real time. According to AWS documentation, Kinesis provides a typical put-to-get delay of less than one second, meaning records are available for processing almost immediately after they're added to the stream.

Why Kinesis for Real-Time Analytics?

Kinesis addresses several challenges that arise when building streaming data pipelines:

  • Durability: Data is replicated across three Availability Zones within a region
  • Elasticity: Streams can scale up or down based on throughput requirements without data loss
  • Multiple consumers: Multiple applications can independently read from the same stream
  • Integration: Native integration with EMR, Lambda, Firehose, and other AWS services

Common Kinesis Use Cases

According to AWS documentation, Kinesis excels at:

  • Accelerated log intake: Push application and system logs directly to Kinesis for processing within seconds
  • Real-time metrics: Analyze data as it streams rather than waiting for batch intervals
  • Clickstream analytics: Process website user interactions in parallel for real-time personalization
  • Complex stream processing: Build directed acyclic graphs (DAGs) of applications for multi-stage processing

Prerequisites

Before you begin, ensure you have:

  • An AWS account with appropriate IAM permissions
  • AWS CLI installed and configured
  • Basic knowledge of Python and SQL
  • Understanding of AWS networking (VPC, subnets, security groups)
  • Apache Spark fundamentals

Step 1: Creating a Kinesis Data Stream

Start by creating a Kinesis stream to capture your real-time data.

Create the Stream

# Create a Kinesis data stream with 4 shards
aws kinesis create-stream \
    --stream-name realtime-events \
    --shard-count 4 \
    --region us-east-1

# Wait for stream to become active
aws kinesis wait stream-exists --stream-name realtime-events

# Verify stream status
aws kinesis describe-stream-summary --stream-name realtime-events

Understanding Shards

Shards are the base throughput unit of a Kinesis stream. Each shard provides:

  • Write capacity: Up to 1,000 records per second or 1 MB per second
  • Read capacity: Up to 2 MB per second (or 2 MB per second per consumer with enhanced fan-out)

For this tutorial, four shards provide up to 4,000 records per second of write throughput.

Create a Producer Application

The Kinesis Producer Library (KPL) simplifies writing high-throughput producers. According to AWS documentation, KPL handles batching, aggregation, and automatic retries, eliminating the need for complicated producer logic.

Create a Python producer using the Boto3 SDK:

import json
import random
import time
from datetime import datetime
import boto3

# Initialize Kinesis client
kinesis = boto3.client('kinesis', region_name='us-east-1')

STREAM_NAME = 'realtime-events'

def generate_event():
    """Generate a simulated user event."""
    event_types = ['page_view', 'click', 'purchase', 'add_to_cart', 'search']
    regions = ['us-east', 'us-west', 'eu-west', 'ap-southeast']

    return {
        'event_id': f'evt_{int(time.time() * 1000)}_{random.randint(1000, 9999)}',
        'user_id': f'user_{random.randint(1, 10000)}',
        'event_type': random.choice(event_types),
        'page': f'/products/{random.randint(1, 500)}',
        'region': random.choice(regions),
        'timestamp': datetime.utcnow().isoformat(),
        'value': round(random.uniform(0, 500), 2) if random.random() > 0.5 else 0
    }

def send_events(batch_size=100, interval=1):
    """Send batches of events to Kinesis."""
    while True:
        records = []
        for _ in range(batch_size):
            event = generate_event()
            records.append({
                'Data': json.dumps(event).encode('utf-8'),
                'PartitionKey': event['user_id']  # Partition by user for ordering
            })

        response = kinesis.put_records(
            StreamName=STREAM_NAME,
            Records=records
        )

        failed = response.get('FailedRecordCount', 0)
        print(f"Sent {batch_size} records, {failed} failed")

        time.sleep(interval)

if __name__ == '__main__':
    send_events()

Partition Key Strategy

The partition key determines which shard receives a record. Choose your partition key based on your use case:

  • User ID: Groups all events for a user in the same shard, preserving order
  • Event type: Groups similar events together for specialized processing
  • Random: Distributes load evenly but doesn't preserve ordering

Step 2: Setting Up Amazon EMR for Stream Processing

Amazon EMR is a managed big data processing service that uses clusters of EC2 instances. According to AWS documentation, EMR supports Apache Spark with built-in integrations for Kinesis streaming.

EMR Cluster Architecture

An EMR cluster consists of three node types:

Node Type Role Requirements
Primary Manages cluster, coordinates tasks Required (one per cluster)
Core Runs tasks and stores HDFS data Required for multi-node clusters
Task Runs tasks only (no HDFS storage) Optional, for scaling compute

Create an EMR Cluster

# Create EMR cluster with Spark and Kinesis connector
aws emr create-cluster \
    --name "realtime-analytics-cluster" \
    --release-label emr-7.0.0 \
    --applications Name=Spark Name=Hadoop Name=Hive \
    --instance-groups '[
        {
            "Name": "Primary",
            "InstanceGroupType": "MASTER",
            "InstanceType": "m5.xlarge",
            "InstanceCount": 1
        },
        {
            "Name": "Core",
            "InstanceGroupType": "CORE",
            "InstanceType": "m5.2xlarge",
            "InstanceCount": 3
        }
    ]' \
    --ec2-attributes '{
        "KeyName": "your-key-pair",
        "SubnetId": "subnet-xxxxxxxx",
        "EmrManagedMasterSecurityGroup": "sg-xxxxxxxx",
        "EmrManagedSlaveSecurityGroup": "sg-yyyyyyyy"
    }' \
    --service-role EMR_DefaultRole \
    --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole \
    --log-uri s3://your-bucket/emr-logs/ \
    --configurations '[
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.streaming.stopGracefullyOnShutdown": "true",
                "spark.streaming.backpressure.enabled": "true",
                "spark.dynamicAllocation.enabled": "false"
            }
        }
    ]'

Create IAM Role for EMR

EMR needs permissions to access Kinesis and S3:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream",
                "kinesis:DescribeStreamSummary",
                "kinesis:ListShards"
            ],
            "Resource": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT:stream/realtime-events"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::your-analytics-bucket",
                "arn:aws:s3:::your-analytics-bucket/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": "*"
        }
    ]
}

Step 3: Building the Spark Streaming Application

Apache Spark on EMR provides both Spark Streaming (DStreams) and Spark Structured Streaming APIs for processing real-time data. According to AWS documentation, EMR includes the Kinesis connector for Spark Structured Streaming.

Spark Structured Streaming Application

Create a PySpark application that reads from Kinesis and processes events:

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, from_json, window, count, sum as spark_sum,
    avg, current_timestamp, to_timestamp
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType, TimestampType
)

# Initialize Spark session
spark = SparkSession.builder \
    .appName("RealtimeEventsProcessor") \
    .config("spark.sql.streaming.checkpointLocation", "s3://your-bucket/checkpoints/") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Define schema for incoming events
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("page", StringType(), True),
    StructField("region", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("value", DoubleType(), True)
])

# Read from Kinesis stream
kinesis_stream = spark.readStream \
    .format("kinesis") \
    .option("streamName", "realtime-events") \
    .option("region", "us-east-1") \
    .option("initialPosition", "TRIM_HORIZON") \
    .option("awsUseInstanceProfile", "true") \
    .load()

# Parse JSON data
parsed_events = kinesis_stream \
    .selectExpr("CAST(data AS STRING) as json_data") \
    .select(from_json(col("json_data"), event_schema).alias("event")) \
    .select("event.*") \
    .withColumn("event_timestamp", to_timestamp(col("timestamp")))

# Aggregate events in 1-minute windows
windowed_aggregates = parsed_events \
    .withWatermark("event_timestamp", "2 minutes") \
    .groupBy(
        window(col("event_timestamp"), "1 minute"),
        col("event_type"),
        col("region")
    ) \
    .agg(
        count("*").alias("event_count"),
        spark_sum("value").alias("total_value"),
        avg("value").alias("avg_value")
    )

# Write aggregates to S3 in Parquet format
aggregate_query = windowed_aggregates \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3://your-bucket/aggregates/") \
    .option("checkpointLocation", "s3://your-bucket/checkpoints/aggregates/") \
    .partitionBy("region") \
    .trigger(processingTime="1 minute") \
    .start()

# Write raw events to S3 for archival
raw_events_query = parsed_events \
    .writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path", "s3://your-bucket/raw-events/") \
    .option("checkpointLocation", "s3://your-bucket/checkpoints/raw/") \
    .partitionBy("region", "event_type") \
    .trigger(processingTime="1 minute") \
    .start()

# Wait for termination
spark.streams.awaitAnyTermination()

Submit the Spark Job to EMR

# Upload the application to S3
aws s3 cp streaming_app.py s3://your-bucket/scripts/

# Submit as an EMR step
aws emr add-steps \
    --cluster-id j-XXXXXXXXXXXXX \
    --steps '[
        {
            "Name": "Realtime Events Processor",
            "ActionOnFailure": "CONTINUE",
            "Type": "Spark",
            "Properties": {},
            "Args": [
                "spark-submit",
                "--deploy-mode", "cluster",
                "--master", "yarn",
                "--num-executors", "6",
                "--executor-cores", "4",
                "--executor-memory", "8g",
                "--driver-memory", "4g",
                "--packages", "org.apache.spark:spark-streaming-kinesis-asl_2.12:3.5.0",
                "s3://your-bucket/scripts/streaming_app.py"
            ]
        }
    ]'

Advanced Processing: Sessionization

For more sophisticated analytics, implement sessionization to group user events:

from pyspark.sql.functions import (
    lag, when, sum as spark_sum, monotonically_increasing_id
)
from pyspark.sql.window import Window

def sessionize_events(events_df, timeout_minutes=30):
    """
    Group events into sessions based on inactivity timeout.
    """
    # Define window specification
    user_window = Window.partitionBy("user_id").orderBy("event_timestamp")

    # Calculate time gap between events
    sessionized = events_df \
        .withColumn("prev_timestamp", lag("event_timestamp").over(user_window)) \
        .withColumn(
            "time_gap_minutes",
            (col("event_timestamp").cast("long") - col("prev_timestamp").cast("long")) / 60
        ) \
        .withColumn(
            "new_session",
            when(
                (col("time_gap_minutes") > timeout_minutes) |
                col("prev_timestamp").isNull(),
                1
            ).otherwise(0)
        ) \
        .withColumn(
            "session_id",
            spark_sum("new_session").over(user_window)
        ) \
        .withColumn(
            "session_key",
            concat(col("user_id"), lit("_"), col("session_id"))
        )

    return sessionized

# Apply sessionization
sessionized_events = sessionize_events(parsed_events)

# Calculate session-level metrics
session_metrics = sessionized_events \
    .groupBy("session_key", "user_id", "region") \
    .agg(
        count("*").alias("events_in_session"),
        spark_sum("value").alias("session_value"),
        min("event_timestamp").alias("session_start"),
        max("event_timestamp").alias("session_end")
    ) \
    .withColumn(
        "session_duration_seconds",
        col("session_end").cast("long") - col("session_start").cast("long")
    )

Step 4: Integrating with Amazon Redshift

For long-term storage and complex analytical queries, load processed data into Amazon Redshift. According to AWS documentation, Amazon Data Firehose can deliver data directly to Redshift using a two-step process: staging in S3 followed by a COPY command.

Option A: Direct Load via Firehose to Redshift

Create a Firehose delivery stream that loads directly to Redshift:

# Create Firehose delivery stream to Redshift
aws firehose create-delivery-stream \
    --delivery-stream-name events-to-redshift \
    --delivery-stream-type KinesisStreamAsSource \
    --kinesis-stream-source-configuration '{
        "KinesisStreamARN": "arn:aws:kinesis:us-east-1:YOUR_ACCOUNT:stream/realtime-events",
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT:role/FirehoseRole"
    }' \
    --redshift-destination-configuration '{
        "RoleARN": "arn:aws:iam::YOUR_ACCOUNT:role/FirehoseRole",
        "ClusterJDBCURL": "jdbc:redshift://your-cluster.xxxxx.us-east-1.redshift.amazonaws.com:5439/analytics",
        "CopyCommand": {
            "DataTableName": "events.raw_events",
            "CopyOptions": "FORMAT AS JSON '\''auto'\'' GZIP TIMEFORMAT '\''auto'\''"
        },
        "Username": "admin",
        "Password": "YourSecurePassword123!",
        "S3Configuration": {
            "RoleARN": "arn:aws:iam::YOUR_ACCOUNT:role/FirehoseRole",
            "BucketARN": "arn:aws:s3:::your-bucket",
            "Prefix": "firehose/events/",
            "BufferingHints": {
                "SizeInMBs": 128,
                "IntervalInSeconds": 300
            },
            "CompressionFormat": "GZIP"
        }
    }'

Option B: Load Spark Output from S3

For more control over transformations, load the Spark-processed Parquet files:

-- Create schema for streaming data
CREATE SCHEMA IF NOT EXISTS events;

-- Create raw events table
CREATE TABLE events.raw_events (
    event_id VARCHAR(100),
    user_id VARCHAR(50),
    event_type VARCHAR(50),
    page VARCHAR(200),
    region VARCHAR(50),
    event_timestamp TIMESTAMP,
    value DECIMAL(10,2),
    loaded_at TIMESTAMP DEFAULT GETDATE()
)
DISTKEY(event_timestamp)
SORTKEY(event_timestamp, region);

-- Create aggregated metrics table
CREATE TABLE events.hourly_metrics (
    window_start TIMESTAMP,
    window_end TIMESTAMP,
    event_type VARCHAR(50),
    region VARCHAR(50),
    event_count BIGINT,
    total_value DECIMAL(12,2),
    avg_value DECIMAL(10,2),
    loaded_at TIMESTAMP DEFAULT GETDATE()
)
DISTSTYLE AUTO
SORTKEY(window_start, region);

-- Load aggregates from S3
COPY events.hourly_metrics (
    window_start, window_end, event_type, region,
    event_count, total_value, avg_value
)
FROM 's3://your-bucket/aggregates/'
IAM_ROLE 'arn:aws:iam::YOUR_ACCOUNT:role/RedshiftS3Role'
FORMAT AS PARQUET;

Create Analytical Views

Build views for common analytical queries:

-- Real-time dashboard view (refreshes from latest loaded data)
CREATE OR REPLACE VIEW events.realtime_dashboard AS
SELECT
    DATE_TRUNC('hour', window_start) AS hour,
    region,
    SUM(event_count) AS total_events,
    SUM(total_value) AS revenue,
    AVG(avg_value) AS avg_order_value
FROM events.hourly_metrics
WHERE window_start >= DATEADD(day, -7, GETDATE())
GROUP BY 1, 2
ORDER BY 1 DESC, 2;

-- Materialized view for frequently accessed aggregations
CREATE MATERIALIZED VIEW events.daily_summary AS
SELECT
    DATE(window_start) AS event_date,
    event_type,
    region,
    SUM(event_count) AS events,
    SUM(total_value) AS value
FROM events.hourly_metrics
GROUP BY 1, 2, 3;

-- Refresh materialized view
REFRESH MATERIALIZED VIEW events.daily_summary;

Automate Data Loading

Create a stored procedure to incrementally load new data:

CREATE OR REPLACE PROCEDURE events.load_new_aggregates()
AS $$
DECLARE
    last_loaded TIMESTAMP;
BEGIN
    -- Get the last loaded window
    SELECT COALESCE(MAX(window_end), '1970-01-01')
    INTO last_loaded
    FROM events.hourly_metrics;

    -- Load new data from S3
    COPY events.hourly_metrics (
        window_start, window_end, event_type, region,
        event_count, total_value, avg_value
    )
    FROM 's3://your-bucket/aggregates/'
    IAM_ROLE 'arn:aws:iam::YOUR_ACCOUNT:role/RedshiftS3Role'
    FORMAT AS PARQUET;

    -- Refresh materialized views
    REFRESH MATERIALIZED VIEW events.daily_summary;

END;
$$ LANGUAGE plpgsql;

-- Schedule with Redshift scheduler
CREATE SCHEDULE load_aggregates_schedule
    EVERY 15 MINUTES
    EXECUTE PROCEDURE events.load_new_aggregates();

Step 5: Monitoring and Operational Excellence

CloudWatch Metrics for Kinesis

Monitor stream health with key metrics:

# Create dashboard for Kinesis metrics
aws cloudwatch put-dashboard \
    --dashboard-name "RealtimeAnalytics" \
    --dashboard-body '{
        "widgets": [
            {
                "type": "metric",
                "properties": {
                    "title": "Kinesis Throughput",
                    "metrics": [
                        ["AWS/Kinesis", "IncomingRecords", "StreamName", "realtime-events"],
                        [".", "IncomingBytes", ".", "."]
                    ],
                    "period": 60,
                    "stat": "Sum"
                }
            },
            {
                "type": "metric",
                "properties": {
                    "title": "Iterator Age",
                    "metrics": [
                        ["AWS/Kinesis", "GetRecords.IteratorAgeMilliseconds", "StreamName", "realtime-events"]
                    ],
                    "period": 60,
                    "stat": "Maximum"
                }
            }
        ]
    }'

Alert on Processing Lag

# Alert when iterator age exceeds threshold (processing falling behind)
aws cloudwatch put-metric-alarm \
    --alarm-name "KinesisProcessingLag" \
    --metric-name "GetRecords.IteratorAgeMilliseconds" \
    --namespace "AWS/Kinesis" \
    --dimensions Name=StreamName,Value=realtime-events \
    --statistic Maximum \
    --period 300 \
    --threshold 300000 \
    --comparison-operator GreaterThanThreshold \
    --evaluation-periods 2 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT:alerts

EMR Cluster Monitoring

Monitor Spark Streaming jobs:

# Alert on failed EMR steps
aws cloudwatch put-metric-alarm \
    --alarm-name "EMRStepFailure" \
    --metric-name "StepsFailed" \
    --namespace "AWS/ElasticMapReduce" \
    --dimensions Name=ClusterId,Value=j-XXXXXXXXXXXXX \
    --statistic Sum \
    --period 300 \
    --threshold 1 \
    --comparison-operator GreaterThanOrEqualToThreshold \
    --evaluation-periods 1 \
    --alarm-actions arn:aws:sns:us-east-1:YOUR_ACCOUNT:alerts

Best Practices for Production

Kinesis Best Practices

  • Right-size shards: Monitor WriteProvisionedThroughputExceeded to identify when you need more shards
  • Use enhanced fan-out: When multiple consumers need dedicated throughput, enhanced fan-out provides 2 MB/sec per consumer
  • Choose partition keys carefully: Even distribution prevents hot shards; skewed keys cause bottlenecks
  • Enable server-side encryption: Protect data at rest with KMS encryption

Spark Streaming Best Practices

  • Enable checkpointing: Store checkpoints in S3 for fault tolerance and exactly-once processing
  • Use watermarks: Handle late-arriving data gracefully with appropriate watermark delays
  • Monitor backpressure: Enable backpressure to prevent overwhelming executors during traffic spikes
  • Tune batch intervals: Balance latency requirements against processing overhead

Redshift Best Practices

  • Use COPY for bulk loads: According to AWS documentation, COPY is the most efficient way to load data
  • Choose appropriate distribution keys: Distribute on frequently joined columns for optimal query performance
  • Leverage materialized views: Pre-compute expensive aggregations for dashboard queries
  • Schedule VACUUM and ANALYZE: Maintain table statistics and reclaim space regularly

Cost Optimization

  • Use Spot Instances for EMR task nodes: Reduce compute costs for fault-tolerant streaming workloads
  • Right-size Kinesis retention: Default 24-hour retention is often sufficient; longer retention costs more
  • Consider Redshift Serverless: For variable workloads, pay only when the warehouse is active
  • Archive to S3: Move older data to S3 and query with Redshift Spectrum when needed

Conclusion

You've built a complete real-time analytics platform on AWS that ingests streaming data through Kinesis, processes it with Spark on EMR, and stores results in Redshift for long-term analysis. This architecture handles the key challenges of streaming systems: durability, scalability, and low-latency processing.

Key components in this architecture:

  • Amazon Kinesis Data Streams provides durable, scalable data ingestion with sub-second latency
  • Amazon EMR with Apache Spark delivers distributed stream processing with complex transformations
  • Amazon Redshift offers a data warehouse for historical analytics and reporting

As your real-time platform matures, consider extending it with:

  • Amazon Managed Service for Apache Flink for stateful stream processing without managing clusters
  • Amazon OpenSearch Service for real-time search and log analytics
  • Amazon QuickSight for real-time dashboards connected to Redshift
  • AWS Lambda for lightweight event-driven processing

Sources

  1. What is Amazon Kinesis Data Streams? - AWS official documentation on Kinesis architecture, features, and use cases
  2. Apache Spark on Amazon EMR - AWS documentation on Spark integration with EMR including Kinesis connector support
  3. Amazon EMR Overview - AWS documentation on EMR cluster architecture, node types, and cluster lifecycle
  4. Developing Producers Using the Kinesis Producer Library - AWS documentation on KPL features including batching, aggregation, and automatic retries
  5. Building Kinesis Data Streams Consumers - AWS documentation on consumer options including enhanced fan-out and third-party integrations
  6. Amazon Data Firehose Redshift Destination - AWS documentation on configuring Firehose to load data into Redshift