Close Menu

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    What's Hot

    Soho Home secures funding to finish $1.8bn takeover deal

    January 17, 2026

    Guillermo del Toro to be recognised at movie awards

    January 17, 2026

    Ripple Strengthens Market Infrastructure With $150M Funding In LMAX – What This Means For XRP

    January 17, 2026
    Facebook X (Twitter) Instagram
    Saturday, January 17
    Trending
    • Soho Home secures funding to finish $1.8bn takeover deal
    • Guillermo del Toro to be recognised at movie awards
    • Ripple Strengthens Market Infrastructure With $150M Funding In LMAX – What This Means For XRP
    • AC Technician Jobs Open in Peshawar 2026 2026 Job Commercial Pakistan
    • Child Steps Lastly Will get Some Love After Clair Obscure Sweeps
    • Pakistan sees sharp development in seafood exports
    • Alberta docs hope new E.R. triage doctor function is correctly resourced
    • California AG sends Musk’s xAI a cease-and-desist order over sexual deepfakes
    • Authorities launches £1bn disaster money fund for households in monetary misery
    • Spanish singer Julio Iglesias says abuse allegations ‘completely false’
    Facebook X (Twitter) Instagram Pinterest Vimeo
    The News92The News92
    • Home
    • World
    • National
    • Sports
    • Crypto
    • Travel
    • Lifestyle
    • Jobs
    • Insurance
    • Gaming
    • AI & Tech
    • Health & Fitness
    The News92The News92
    Home - AI & Tech - A Coding Implementation to Construct a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Occasion-Time Windowing Utilizing DirectRunner
    AI & Tech

    A Coding Implementation to Construct a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Occasion-Time Windowing Utilizing DirectRunner

    Naveed AhmadBy Naveed AhmadJanuary 8, 2026No Comments5 Mins Read
    Share Facebook Twitter Pinterest LinkedIn Tumblr Reddit Telegram Email
    A Coding Implementation to Construct a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Occasion-Time Windowing Utilizing DirectRunner
    Share
    Facebook Twitter LinkedIn Pinterest Email


    On this tutorial, we show learn how to construct a unified Apache Beam pipeline that works seamlessly in each batch and stream-like modes utilizing the DirectRunner. We generate artificial, event-time–conscious knowledge and apply fastened windowing with triggers and allowed lateness to show how Apache Beam constantly handles each on-time and late occasions. By switching solely the enter supply, we maintain the core aggregation logic equivalent, which helps us clearly perceive how Beam’s event-time mannequin, home windows, and panes behave with out counting on exterior streaming infrastructure. Try the FULL CODES here.

    !pip -q set up -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
    !pip -q set up -U apache-beam crcmod
    
    
    import apache_beam as beam
    from apache_beam.choices.pipeline_options import PipelineOptions, StandardOptions
    from apache_beam.transforms.window import FixedWindows
    from apache_beam.transforms.set off import AfterWatermark, AfterProcessingTime, AccumulationMode
    from apache_beam.testing.test_stream import TestStream
    import json
    from datetime import datetime, timezone

    We set up the required dependencies and guarantee model compatibility in order that Apache Beam. We import the core Beam APIs together with windowing, triggers, and TestStream utilities wanted later within the pipeline. We additionally usher in normal Python modules for time dealing with and JSON formatting. Try the FULL CODES here.

    MODE = "stream"
    WINDOW_SIZE_SECS = 60
    ALLOWED_LATENESS_SECS = 120
    
    
    def make_event(user_id, event_type, quantity, event_time_epoch_s):
       return {"user_id": user_id, "event_type": event_type, "quantity": float(quantity), "event_time": int(event_time_epoch_s)}
    
    
    base = datetime.now(timezone.utc).change(microsecond=0)
    t0 = int(base.timestamp())
    
    
    BATCH_EVENTS = [
       make_event("u1", "purchase", 20, t0 + 5),
       make_event("u1", "purchase", 15, t0 + 20),
       make_event("u2", "purchase",  8, t0 + 35),
       make_event("u1", "refund",   -5, t0 + 62),
       make_event("u2", "purchase", 12, t0 + 70),
       make_event("u3", "purchase",  9, t0 + 75),
       make_event("u2", "purchase",  3, t0 + 50),
    ]
    

    We outline the worldwide configuration that controls window dimension, lateness, and execution mode. We create artificial occasions with specific event-time timestamps in order that windowing conduct is deterministic and simple to motive about. We put together a small dataset that deliberately contains out-of-order and late occasions to watch Beam’s event-time semantics. Try the FULL CODES here.

    def format_joined_record(kv):
       user_id, d = kv
       return {
           "user_id": user_id,
           "rely": int(d["count"][0]) if d["count"] else 0,
           "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
       }
    
    
    class WindowedUserAgg(beam.PTransform):
       def increase(self, pcoll):
           stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
           windowed = stamped | beam.WindowInto(
               FixedWindows(WINDOW_SIZE_SECS),
               allowed_lateness=ALLOWED_LATENESS_SECS,
               set off=AfterWatermark(
                   early=AfterProcessingTime(10),
                   late=AfterProcessingTime(10),
               ),
               accumulation_mode=AccumulationMode.ACCUMULATING,
           )
           keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
           counts = keyed | beam.combiners.Depend.PerKey()
           sums = keyed | beam.CombinePerKey(sum)
           return (
               {"rely": counts, "sum_amount": sums}
               | beam.CoGroupByKey()
               | beam.Map(format_joined_record)
           )

    We construct a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fastened home windows, triggers, and accumulation guidelines, then group occasions by person and compute counts and sums. We maintain this remodel impartial of the info supply, so the identical logic applies to each batch and streaming inputs. Try the FULL CODES here.

    class AddWindowInfo(beam.DoFn):
       def course of(self, aspect, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
           ws = float(window.begin)
           we = float(window.finish)
           yield {
               **aspect,
               "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
               "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
               "pane_timing": str(pane_info.timing),
               "pane_is_first": pane_info.is_first,
               "pane_is_last": pane_info.is_last,
           }
    
    
    def build_test_stream():
       return (
           TestStream()
           .advance_watermark_to(t0)
           .add_elements([
               beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
               beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
               beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
           ])
           .advance_processing_time(5)
           .advance_watermark_to(t0 + 61)
           .add_elements([
               beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
               beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
               beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
           ])
           .advance_processing_time(5)
           .add_elements([
               beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
           ])
           .advance_watermark_to(t0 + 121)
           .advance_watermark_to_infinity()
       )

    We enrich every aggregated document with window and pane metadata so we will clearly see when and why outcomes are emitted. We convert Beam’s inside timestamps into human-readable UTC instances for readability. We additionally outline a TestStream that simulates actual streaming conduct utilizing watermarks, processing-time advances, and late knowledge. Try the FULL CODES here.

    def run_batch():
       with beam.Pipeline(choices=PipelineOptions([])) as p:
           (
               p
               | beam.Create(BATCH_EVENTS)
               | WindowedUserAgg()
               | beam.ParDo(AddWindowInfo())
               | beam.Map(json.dumps)
               | beam.Map(print)
           )
    
    
    def run_stream():
       opts = PipelineOptions([])
       opts.view_as(StandardOptions).streaming = True
       with beam.Pipeline(choices=opts) as p:
           (
               p
               | build_test_stream()
               | WindowedUserAgg()
               | beam.ParDo(AddWindowInfo())
               | beam.Map(json.dumps)
               | beam.Map(print)
           )
    
    
    run_stream() if MODE == "stream" else run_batch()

    We wire all the things collectively into executable batch and stream-like pipelines. We toggle between modes by altering a single flag whereas reusing the identical aggregation remodel. We run the pipeline and print the windowed outcomes straight, making the execution circulate and outputs straightforward to examine.

    In conclusion, we demonstrated that the identical Beam pipeline can course of each bounded batch knowledge and unbounded, stream-like knowledge whereas preserving equivalent windowing and aggregation semantics. We noticed how watermarks, triggers, and accumulation modes affect when outcomes are emitted and the way late knowledge updates beforehand computed home windows. Additionally, we centered on the conceptual foundations of Beam’s unified mannequin, offering a strong base for later scaling the identical design to actual streaming runners and manufacturing environments.


    Try the FULL CODES here. Additionally, be happy to observe us on Twitter and don’t neglect to affix our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

    Try our newest launch of ai2025.dev, a 2025-focused analytics platform that turns mannequin launches, benchmarks, and ecosystem exercise right into a structured dataset you possibly can filter, evaluate, and export


    Michal Sutter is an information science skilled with a Grasp of Science in Information Science from the College of Padova. With a strong basis in statistical evaluation, machine studying, and knowledge engineering, Michal excels at remodeling advanced datasets into actionable insights.



    Source link

    Share. Facebook Twitter Pinterest LinkedIn Tumblr Email
    Previous ArticleUAE remembers Nestlé toddler formulation merchandise
    Next Article Scammer makes use of AI picture of lacking lady in try and extort cash from sister
    Naveed Ahmad
    • Website
    • Tumblr

    Related Posts

    AI & Tech

    California AG sends Musk’s xAI a cease-and-desist order over sexual deepfakes

    January 17, 2026
    AI & Tech

    Considering Machines Cofounder’s Workplace Relationship Preceded His Termination

    January 17, 2026
    AI & Tech

    AI cloud startup Runpod hits $120M in ARR — and it began with a Reddit put up  

    January 17, 2026
    Add A Comment
    Leave A Reply Cancel Reply

    Demo
    Top Posts

    Hytale Enters Early Entry After A Decade After Surviving Cancellation

    January 14, 20263 Views

    Textile exports dip throughout EU, US & UK

    January 8, 20262 Views

    Planning & Growth Division Quetta Jobs 2026 2025 Job Commercial Pakistan

    January 3, 20262 Views
    Stay In Touch
    • Facebook
    • YouTube
    • TikTok
    • WhatsApp
    • Twitter
    • Instagram
    Latest Reviews

    Subscribe to Updates

    Get the latest tech news from FooBar about tech, design and biz.

    Demo
    Most Popular

    Hytale Enters Early Entry After A Decade After Surviving Cancellation

    January 14, 20263 Views

    Textile exports dip throughout EU, US & UK

    January 8, 20262 Views

    Planning & Growth Division Quetta Jobs 2026 2025 Job Commercial Pakistan

    January 3, 20262 Views
    Our Picks

    Soho Home secures funding to finish $1.8bn takeover deal

    January 17, 2026

    Guillermo del Toro to be recognised at movie awards

    January 17, 2026

    Ripple Strengthens Market Infrastructure With $150M Funding In LMAX – What This Means For XRP

    January 17, 2026

    Subscribe to Updates

    Get the latest creative news from FooBar about art, design and business.

    Facebook X (Twitter) Instagram Pinterest
    • About Us
    • Contact Us
    • Privacy Policy
    • Terms & Conditions
    • Advertise
    • Disclaimer
    © 2026 TheNews92.com. All Rights Reserved. Unauthorized reproduction or redistribution of content is strictly prohibited.

    Type above and press Enter to search. Press Esc to cancel.