On this tutorial, we construct an end-to-end streaming voice agent that mirrors how trendy low-latency conversational programs function in actual time. We simulate the entire pipeline, from chunked audio enter and streaming speech recognition to incremental language mannequin reasoning and streamed text-to-speech output, whereas explicitly monitoring latency at each stage. By working with strict latency budgets and observing metrics corresponding to time to first token and time to first audio, we deal with the sensible engineering trade-offs that form responsive voice-based consumer experiences. Take a look at the FULL CODES here.
import time
import asyncio
import numpy as np
from collections import deque
from dataclasses import dataclass
from typing import Checklist, AsyncIterator
from enum import Enum
import matplotlib.pyplot as plt
@dataclass
class LatencyMetrics:
audio_chunk_received: float = 0.0
asr_started: float = 0.0
asr_partial: float = 0.0
asr_complete: float = 0.0
llm_started: float = 0.0
llm_first_token: float = 0.0
llm_complete: float = 0.0
tts_started: float = 0.0
tts_first_chunk: float = 0.0
tts_complete: float = 0.0
def get_time_to_first_audio(self) -> float:
return self.tts_first_chunk - self.asr_complete if self.tts_first_chunk and self.asr_complete else 0.0
def get_total_latency(self) -> float:
return self.tts_complete - self.audio_chunk_received if self.tts_complete else 0.0
@dataclass
class LatencyBudgets:
asr_processing: float = 0.1
asr_finalization: float = 0.3
llm_first_token: float = 0.5
llm_token_generation: float = 0.02
tts_first_chunk: float = 0.2
tts_chunk_generation: float = 0.05
time_to_first_audio: float = 1.0
class AgentState(Enum):
LISTENING = "listening"
PROCESSING_SPEECH = "processing_speech"
THINKING = "considering"
SPEAKING = "talking"
INTERRUPTED = "interrupted"We outline the core knowledge buildings and state representations that permit us to trace latency throughout all the voice pipeline. We formalize timing alerts for ASR, LLM, and TTS to make sure constant measurement throughout all levels. We additionally set up a transparent agent state machine that guides how the system transitions throughout a conversational flip. Take a look at the FULL CODES here.
class AudioInputStream:
def __init__(self, sample_rate: int = 16000, chunk_duration_ms: int = 100):
self.sample_rate = sample_rate
self.chunk_duration_ms = chunk_duration_ms
self.chunk_size = int(sample_rate * chunk_duration_ms / 1000)
async def stream_audio(self, textual content: str) -> AsyncIterator[np.ndarray]:
chars_per_second = (150 * 5) / 60
duration_seconds = len(textual content) / chars_per_second
num_chunks = int(duration_seconds * 1000 / self.chunk_duration_ms)
for _ in vary(num_chunks):
chunk = np.random.randn(self.chunk_size).astype(np.float32) * 0.1
await asyncio.sleep(self.chunk_duration_ms / 1000)
yield chunkWe simulate real-time audio enter by breaking speech into fixed-duration chunks that arrive asynchronously. We mannequin practical talking charges and streaming habits to imitate dwell microphone enter. We use this stream as the inspiration for testing downstream latency-sensitive elements. Take a look at the FULL CODES here.
class StreamingASR:
def __init__(self, latency_budget: float = 0.1):
self.latency_budget = latency_budget
self.silence_threshold = 0.5
async def transcribe_stream(
self,
audio_stream: AsyncIterator[np.ndarray],
ground_truth: str
) -> AsyncIterator[tuple[str, bool]]:
phrases = ground_truth.cut up()
words_transcribed = 0
silence_duration = 0.0
chunk_count = 0
async for chunk in audio_stream:
chunk_count += 1
await asyncio.sleep(self.latency_budget)
if chunk_count % 3 == 0 and words_transcribed < len(phrases):
words_transcribed += 1
yield " ".be part of(phrases[:words_transcribed]), False
audio_power = np.imply(np.abs(chunk))
silence_duration = silence_duration + 0.1 if audio_power < 0.05 else 0.0
if silence_duration >= self.silence_threshold:
await asyncio.sleep(0.2)
yield ground_truth, True
return
yield ground_truth, TrueWe implement a streaming ASR module that produces partial transcriptions earlier than emitting a last outcome. We progressively reveal phrases to mirror how trendy ASR programs function in actual time. We additionally introduce silence-based finalization to approximate end-of-utterance detection. Take a look at the FULL CODES here.
class StreamingLLM:
def __init__(self, time_to_first_token: float = 0.3, tokens_per_second: float = 50):
self.time_to_first_token = time_to_first_token
self.tokens_per_second = tokens_per_second
async def generate_response(self, immediate: str) -> AsyncIterator[str]:
responses = {
"hiya": "Hey! How can I enable you at present?",
"climate": "The climate is sunny with a temperature of 72°F.",
"time": "The present time is 2:30 PM.",
"default": "I perceive. Let me enable you with that."
}
response = responses["default"]
for key in responses:
if key in immediate.decrease():
response = responses[key]
break
await asyncio.sleep(self.time_to_first_token)
for phrase in response.cut up():
yield phrase + " "
await asyncio.sleep(1.0 / self.tokens_per_second)
class StreamingTTS:
def __init__(self, time_to_first_chunk: float = 0.2, chars_per_second: float = 15):
self.time_to_first_chunk = time_to_first_chunk
self.chars_per_second = chars_per_second
async def synthesize_stream(self, text_stream: AsyncIterator[str]) -> AsyncIterator[np.ndarray]:
first_chunk = True
buffer = ""
async for textual content in text_stream:
buffer += textual content
if len(buffer) >= 20 or first_chunk:
if first_chunk:
await asyncio.sleep(self.time_to_first_chunk)
first_chunk = False
length = len(buffer) / self.chars_per_second
yield np.random.randn(int(16000 * length)).astype(np.float32) * 0.1
buffer = ""
await asyncio.sleep(length * 0.5)On this snippet, we mannequin a streaming language mannequin and a streaming text-to-speech engine working collectively. We generate responses token by token to seize time-to-first-token habits. We then convert incremental textual content into audio chunks to simulate early and steady speech synthesis. Take a look at the FULL CODES here.
class StreamingVoiceAgent:
def __init__(self, latency_budgets: LatencyBudgets):
self.budgets = latency_budgets
self.audio_stream = AudioInputStream()
self.asr = StreamingASR(latency_budgets.asr_processing)
self.llm = StreamingLLM(
latency_budgets.llm_first_token,
1.0 / latency_budgets.llm_token_generation
)
self.tts = StreamingTTS(
latency_budgets.tts_first_chunk,
1.0 / latency_budgets.tts_chunk_generation
)
self.state = AgentState.LISTENING
self.metrics_history: Checklist[LatencyMetrics] = []
async def process_turn(self, user_input: str) -> LatencyMetrics:
metrics = LatencyMetrics()
start_time = time.time()
metrics.audio_chunk_received = time.time() - start_time
audio_gen = self.audio_stream.stream_audio(user_input)
metrics.asr_started = time.time() - start_time
async for textual content, last in self.asr.transcribe_stream(audio_gen, user_input):
if last:
metrics.asr_complete = time.time() - start_time
transcription = textual content
metrics.llm_started = time.time() - start_time
response = ""
async for token in self.llm.generate_response(transcription):
if not metrics.llm_first_token:
metrics.llm_first_token = time.time() - start_time
response += token
metrics.llm_complete = time.time() - start_time
metrics.tts_started = time.time() - start_time
async def text_stream():
for phrase in response.cut up():
yield phrase + " "
async for _ in self.tts.synthesize_stream(text_stream()):
if not metrics.tts_first_chunk:
metrics.tts_first_chunk = time.time() - start_time
metrics.tts_complete = time.time() - start_time
self.metrics_history.append(metrics)
return metricsWe orchestrate the total voice agent by wiring audio enter, ASR, LLM, and TTS right into a single asynchronous move. We report exact timestamps at every transition to compute vital latency metrics. We deal with every consumer flip as an remoted experiment to allow systematic efficiency evaluation. Take a look at the FULL CODES here.
async def run_demo():
budgets = LatencyBudgets(
asr_processing=0.08,
llm_first_token=0.3,
llm_token_generation=0.02,
tts_first_chunk=0.15,
time_to_first_audio=0.8
)
agent = StreamingVoiceAgent(budgets)
inputs = [
"Hello, how are you today?",
"What's the weather like?",
"Can you tell me the time?"
]
for textual content in inputs:
await agent.process_turn(textual content)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(run_demo())We run all the system throughout a number of conversational turns to look at latency consistency and variance. We apply aggressive latency budgets to emphasize the pipeline beneath practical constraints. We use these runs to validate whether or not the system meets responsiveness targets throughout interactions.
In conclusion, we demonstrated how a totally streaming voice agent will be orchestrated as a single asynchronous pipeline with clear stage boundaries and measurable efficiency ensures. We confirmed that combining partial ASR, token-level LLM streaming, and early-start TTS reduces perceived latency, even when whole computation time stays non-trivial. This strategy helps us cause systematically about turn-taking, responsiveness, and optimization levers, and it offers a stable basis for extending the system towards real-world deployments utilizing manufacturing ASR, LLM, and TTS fashions.
Take a look at the FULL CODES here. Additionally, be happy to observe us on Twitter and don’t neglect to hitch our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.

