On this tutorial, we show learn how to construct a unified Apache Beam pipeline that works seamlessly in each batch and stream-like modes utilizing the DirectRunner. We generate artificial, event-time–conscious knowledge and apply fastened windowing with triggers and allowed lateness to show how Apache Beam constantly handles each on-time and late occasions. By switching solely the enter supply, we maintain the core aggregation logic equivalent, which helps us clearly perceive how Beam’s event-time mannequin, home windows, and panes behave with out counting on exterior streaming infrastructure. Try the FULL CODES here.
!pip -q set up -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q set up -U apache-beam crcmod
import apache_beam as beam
from apache_beam.choices.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.set off import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezoneWe set up the required dependencies and guarantee model compatibility in order that Apache Beam. We import the core Beam APIs together with windowing, triggers, and TestStream utilities wanted later within the pipeline. We additionally usher in normal Python modules for time dealing with and JSON formatting. Try the FULL CODES here.
MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120
def make_event(user_id, event_type, quantity, event_time_epoch_s):
return {"user_id": user_id, "event_type": event_type, "quantity": float(quantity), "event_time": int(event_time_epoch_s)}
base = datetime.now(timezone.utc).change(microsecond=0)
t0 = int(base.timestamp())
BATCH_EVENTS = [
make_event("u1", "purchase", 20, t0 + 5),
make_event("u1", "purchase", 15, t0 + 20),
make_event("u2", "purchase", 8, t0 + 35),
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "purchase", 12, t0 + 70),
make_event("u3", "purchase", 9, t0 + 75),
make_event("u2", "purchase", 3, t0 + 50),
]
We outline the worldwide configuration that controls window dimension, lateness, and execution mode. We create artificial occasions with specific event-time timestamps in order that windowing conduct is deterministic and simple to motive about. We put together a small dataset that deliberately contains out-of-order and late occasions to watch Beam’s event-time semantics. Try the FULL CODES here.
def format_joined_record(kv):
user_id, d = kv
return {
"user_id": user_id,
"rely": int(d["count"][0]) if d["count"] else 0,
"sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
}
class WindowedUserAgg(beam.PTransform):
def increase(self, pcoll):
stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
set off=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10),
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
counts = keyed | beam.combiners.Depend.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return (
{"rely": counts, "sum_amount": sums}
| beam.CoGroupByKey()
| beam.Map(format_joined_record)
)We construct a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fastened home windows, triggers, and accumulation guidelines, then group occasions by person and compute counts and sums. We maintain this remodel impartial of the info supply, so the identical logic applies to each batch and streaming inputs. Try the FULL CODES here.
class AddWindowInfo(beam.DoFn):
def course of(self, aspect, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.begin)
we = float(window.finish)
yield {
**aspect,
"window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}
def build_test_stream():
return (
TestStream()
.advance_watermark_to(t0)
.add_elements([
beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
])
.advance_processing_time(5)
.advance_watermark_to(t0 + 61)
.add_elements([
beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
])
.advance_processing_time(5)
.add_elements([
beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
])
.advance_watermark_to(t0 + 121)
.advance_watermark_to_infinity()
)We enrich every aggregated document with window and pane metadata so we will clearly see when and why outcomes are emitted. We convert Beam’s inside timestamps into human-readable UTC instances for readability. We additionally outline a TestStream that simulates actual streaming conduct utilizing watermarks, processing-time advances, and late knowledge. Try the FULL CODES here.
def run_batch():
with beam.Pipeline(choices=PipelineOptions([])) as p:
(
p
| beam.Create(BATCH_EVENTS)
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
def run_stream():
opts = PipelineOptions([])
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(choices=opts) as p:
(
p
| build_test_stream()
| WindowedUserAgg()
| beam.ParDo(AddWindowInfo())
| beam.Map(json.dumps)
| beam.Map(print)
)
run_stream() if MODE == "stream" else run_batch()We wire all the things collectively into executable batch and stream-like pipelines. We toggle between modes by altering a single flag whereas reusing the identical aggregation remodel. We run the pipeline and print the windowed outcomes straight, making the execution circulate and outputs straightforward to examine.
In conclusion, we demonstrated that the identical Beam pipeline can course of each bounded batch knowledge and unbounded, stream-like knowledge whereas preserving equivalent windowing and aggregation semantics. We noticed how watermarks, triggers, and accumulation modes affect when outcomes are emitted and the way late knowledge updates beforehand computed home windows. Additionally, we centered on the conceptual foundations of Beam’s unified mannequin, offering a strong base for later scaling the identical design to actual streaming runners and manufacturing environments.
Try the FULL CODES here. Additionally, be happy to observe us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Try our newest launch of ai2025.dev, a 2025-focused analytics platform that turns mannequin launches, benchmarks, and ecosystem exercise right into a structured dataset you possibly can filter, evaluate, and export
Michal Sutter is an information science skilled with a Grasp of Science in Information Science from the College of Padova. With a strong basis in statistical evaluation, machine studying, and knowledge engineering, Michal excels at remodeling advanced datasets into actionable insights.

