Aurelio logo
Updated on January 29, 2025

Streaming with LangChain

AI Engineering

Streaming is a common pattern in AI applications. We've all seen AI interfaces where answers from AI chatbots appear on the screen as a word-by-word stream of information.

This word-by-word stream can look nice but provides many more benefits. Streamed text feels more natural to the user, which means the user can begin reading a response sooner.

The Time-to-First-Token of models like GPT-4o is very low (just 1-2 seconds in many cases). However, the full generation time (or Time-to-Last-Token, TTLT) can vary significantly. When generating long-length responses from GPT-4o, a TTLT of 10-20 seconds is typical.

A significant difference exists in having users wait 1-2 seconds vs 10-20 seconds. But beyond this, streaming also allows us to send intermediate steps to our interfaces. Suppose an agent uses various tools and/or takes multiple steps to generate a final response. In that case, we can use streaming to send this information to our application, allowing us to render UI components that tell a user what the agent is doing.

Using these intermediate step components, we provide continual feedback to the user, preventing them from being stuck staring at a blank screen. These components also provide us with an interface to provide more information to the user, such as research sources or results from intermediate calculations.

In this chapter, we will introduce LangChain's async streaming. Async streaming is an essential feature for APIs wanting to support real-time information streaming and enable the enhanced user experience described above.


We'll start by initializing our connection to the OpenAI API for the LLM. We need an OpenAI API key, which you can get from the OpenAI platform.

We will use the gpt-4o-mini model with a temperature of 0.0:

python
import os
from getpass import getpass
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") \
or getpass("Enter your OpenAI API key: ")

llm = ChatOpenAI(
model_name="gpt-4o",
temperature=0.0,
streaming=True
)
python
llm_out = llm.invoke("Hello there")
llm_out
python
AIMessage(
content='Hello! How can I assist you today?',
additional_kwargs={},
response_metadata={
'finish_reason': 'stop',
'model_name': 'gpt-4o-2024-08-06',
'system_fingerprint': 'fp_50cad350e4'
},
id='run-fec5b393-e928-431c-b82c-9bb48ae2aec7-0'
)

Streaming with astream

We will start by creating an async stream from our LLM. We do this within an async for loop, allowing us to iterate through the chunks of data and use them as soon as the async astream method returns the tokens to us. By adding a pipe character |, we will see the individual tokens generated by the LLM. We set flush equal to True as this forces immediate output to the console, resulting in smoother streaming.

python
tokens = []
async for token in llm.astream("What is NLP?"):
tokens.append(token)
print(token.content, end="|", flush=True)
text
|N|LP| stands| for| Natural| Language| Processing|.| It| is| a| field| of| artificial|
intelligence| and| computational| lingu|istics| that| focuses| on| the| interaction|
between| computers| and| humans| through| natural| language|.| The| goal| of| NLP| is|
to| enable| computers| to| understand|,| interpret|,| and| generate| human| language|
in| a| way| that| is| both| meaningful| and| useful|.

|N|LP| encompasses| a| variety| of| tasks|,| including|:

|1|.| **|Text| Analysis|**|:| Understanding| the| structure| and| meaning| of| text|,|
including| syntax|,| semantics|,| and| context|.
|2|.| **|Sent|iment| Analysis|**|:| Determ|ining| the| sentiment| or| emotional| tone|
behind| a| body| of| text|.
|3|.| **|Machine| Translation|**|:| Automatically| translating| text| from| one|
language| to| another|.
|4|.| **|Speech| Recognition|**|:| Con|verting| spoken| language| into| text|.
|5|.| **|Text|-to|-S|peech|**|:| Con|verting| text| into| spoken| language|.
|6|.| **|Named| Entity| Recognition| (|NER|)**|:| Ident|ifying| and| class|ifying| key|
elements| in| text|,| such| as| names| of| people|,| organizations|,| locations|,| etc|.
|7|.| **|Part|-of|-S|peech| Tag|ging|**|:| Ident|ifying| the| grammatical| parts| of|
speech| in| a| sentence|.
|8|.| **|Question| Answer|ing|**|:| Building| systems| that| can| answer| questions|
posed| in| natural| language|.
|9|.| **|Chat|bots| and| Convers|ational| Agents|**|:| Creating| systems| that| can|
engage| in| dialogue| with| humans|.

|N|LP| combines| techniques| from| computer| science|,| lingu|istics|,| and| machine|
learning| to| process| and| analyze| large| amounts| of| natural| language| data|.| It|
is| widely| used| in| applications| such| as| virtual| assistants|,| search| engines|,|
translation| services|,| and| more|.||

We can also see what is inside each one since we appended each token to the tokens list.

python
tokens[0]
python
AIMessageChunk(
content='', # in the first chunk there is no content, this is normal
additional_kwargs={},
response_metadata={},
id='run-294f7102-8834-47d0-9152-219efb8319eb'
)

Let's see the second chunk:

python
tokens[1]
python
AIMessageChunk(
content='N', # next we have the 'N' from 'NLP'
additional_kwargs={},
response_metadata={},
id='run-294f7102-8834-47d0-9152-219efb8319eb'
)

We can also merge multiple AIMessageChunk objects with the + operator, creating a larger set of tokens/chunks:

python
tokens[0] + tokens[1] + tokens[2] + tokens[3] + tokens[4]
python
AIMessageChunk(
content='NLP stands for', # this is merged `content` from chunks 0 to 4
additional_kwargs={},
response_metadata={},
id='run-294f7102-8834-47d0-9152-219efb8319eb'
)

A word of caution: nothing is preventing you from merging tokens in the incorrect order, so be cautious not to output any token omelettes:

python
tokens[4] + tokens[3] + tokens[2] + tokens[1] + tokens[0]
python
AIMessageChunk(
content=' for standsLPN', # merged but scrambled `content` from chunks 4 to 0
additional_kwargs={},
response_metadata={},
id='run-294f7102-8834-47d0-9152-219efb8319eb'
)

Streaming with Agents

Streaming with agents, particularly the custom agent executor, is a little more complex. Let's begin by creating a simple agent executor that matches what we built in the Agent Executor chapter.

To construct the agent executor, we need:

  • Tools

  • ChatPromptTemplate

  • Our LLM (already defined with llm)

  • An agent

  • Finally, the agent executor

Let's start defining each.

Tools

Now, we will define a few tools that an async agent executor can use. Our goals for tool use in regards to streaming are:

  • The callback handler will stream tool-use steps, beginning with the tool being used and then following with the tool input parameters token-by-token.

  • The final LLM output will be streamed token-by-token as we saw above.

  • We must be able to distinguish between tool-use tokens and final LLM output tokens, which will allow us to handle them differently in downstream processes.

We need to define a few math tools and our final answer tool.

python
from langchain_core.tools import tool

@tool
def add(x: float, y: float) -> float:
"""Add 'x' and 'y'."""
return x + y

@tool
def multiply(x: float, y: float) -> float:
"""Multiply 'x' and 'y'."""
return x * y

@tool
def exponentiate(x: float, y: float) -> float:
"""Raise 'x' to the power of 'y'."""
return x ** y

@tool
def subtract(x: float, y: float) -> float:
"""Subtract 'x' from 'y'."""
return y - x

@tool
def final_answer(answer: str, tools_used: list[str]) -> str:
"""Use this tool to provide a final answer to the user.
The answer should be in natural language as this will be provided
to the user directly. The tools_used must include a list of tool
names that were used within the `scratchpad`. You MUST use this tool
to conclude the interaction.
"""
return {"answer": answer, "tools_used": tools_used}

When defining our agent and agent_executor, we must list all our tools.

python
tools = [add, multiply, exponentiate, subtract, final_answer]

ChatPromptTemplate

We will create our ChatPromptTemplate, using a system message, chat history, user input, and a scratchpad for intermediate steps.

python
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder

prompt = ChatPromptTemplate.from_messages([
("system", (
"You're a helpful assistant. When answering a user's question "
"you should first use one of the tools provided. After using a "
"tool the tool output will be provided back to you. You MUST "
"then use the final_answer tool to provide a final answer to the user. "
"DO NOT use the same tool more than once."
)),
MessagesPlaceholder(variable_name="chat_history"),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])

Agent

As before, we will define our agent with LCEL.

python
from langchain_core.runnables.base import RunnableSerializable

tools = [add, subtract, multiply, exponentiate, final_answer]

# define the agent runnable
agent: RunnableSerializable = (
{
"input": lambda x: x["input"],
"chat_history": lambda x: x["chat_history"],
"agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
}
| prompt
| llm.bind_tools(tools, tool_choice="any")
)

Agent Executor

Finally, we will create the agent executor.

python
import json
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage

# create tool name to function mapping
name2tool = {tool.name: tool.func for tool in tools}

class CustomAgentExecutor:
chat_history: list[BaseMessage]

def __init__(self, max_iterations: int = 3):
self.chat_history = []
self.max_iterations = max_iterations
self.agent: RunnableSerializable = (
{
"input": lambda x: x["input"],
"chat_history": lambda x: x["chat_history"],
"agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
}
| prompt
| llm.bind_tools(tools, tool_choice="any") # we're forcing tool use again
)

def invoke(self, input: str) -> dict:
# invoke the agent but we do this iteratively in a loop until
# reaching a final answer
count = 0
agent_scratchpad = []
while count < self.max_iterations:
# invoke a step for the agent to generate a tool call
out = self.agent.invoke({
"input": input,
"chat_history": self.chat_history,
"agent_scratchpad": agent_scratchpad
})
# if the tool call is the final answer tool, we stop
if out.tool_calls[0]["name"] == "final_answer":
break
agent_scratchpad.append(out) # add tool call to scratchpad
# otherwise we execute the tool and add it's output to the agent scratchpad
tool_out = name2tool[out.tool_calls[0]["name"]](**out.tool_calls[0]["args"])
# add the tool output to the agent scratchpad
action_str = f"The {out.tool_calls[0]['name']} tool returned {tool_out}"
agent_scratchpad.append({
"role": "tool",
"content": action_str,
"tool_call_id": out.tool_calls[0]["id"]
})
# add a print so we can see intermediate steps
print(f"{count}: {action_str}")
count += 1
# add the final output to the chat history
final_answer = out.tool_calls[0]["args"]
# this is a dictionary, so we convert it to a string for compatibility with
# the chat history
final_answer_str = json.dumps(final_answer)
self.chat_history.append({"input": input, "output": final_answer_str})
self.chat_history.extend([
HumanMessage(content=input),
AIMessage(content=final_answer_str)
])
# return the final answer in dict form
return final_answer

agent_executor = CustomAgentExecutor()

Our agent_executor is now ready to use. Let's quickly test it before adding streaming.

python
agent_executor.invoke(input="What is 10 + 10")
text
0: The add tool returned 20

{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

Let's modify our agent_executor to use streaming and parse the streamed output into a format that we can more easily work with.

First, when streaming with our custom agent executor, we will need to pass our callback handler to the agent on every new invocation. To make this simpler, we can make the callbacks field a configurable field. This will allow us to initialize the agent using the with_config method, allowing us to pass the callback handler to the agent with every invocation.

python
from langchain_core.runnables import ConfigurableField

llm = ChatOpenAI(
model_name="gpt-4o-mini",
temperature=0.0,
streaming=True
).configurable_fields(
callbacks=ConfigurableField(
id="callbacks",
name="callbacks",
description="A list of callbacks to use for streaming",
)
)

We reinitialize our agent, nothing changes here:

python
# define the agent runnable
agent: RunnableSerializable = (
{
"input": lambda x: x["input"],
"chat_history": lambda x: x["chat_history"],
"agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
}
| prompt
| llm.bind_tools(tools, tool_choice="any")
)

Now, we will define our custom callback handler. Our callback handler will use an asyncio.Queue object to stream the agent's output and yield the tokens as the LLM generates them.

python
import asyncio
from langchain.callbacks.base import AsyncCallbackHandler


class QueueCallbackHandler(AsyncCallbackHandler):
"""Callback handler that puts tokens into a queue."""

def __init__(self, queue: asyncio.Queue):
self.queue = queue
self.final_answer_seen = False

async def __aiter__(self):
while True:
if self.queue.empty():
await asyncio.sleep(0.1)
continue
token_or_done = await self.queue.get()

if token_or_done == "<<DONE>>":
# this means we're done
return
if token_or_done:
yield token_or_done

async def on_llm_new_token(self, *args, **kwargs) -> None:
"""Put new token in the queue."""
#print(f"on_llm_new_token: {args}, {kwargs}")
chunk = kwargs.get("chunk")
if chunk:
# check for final_answer tool call
if tool_calls := chunk.message.additional_kwargs.get("tool_calls"):
if tool_calls[0]["function"]["name"] == "final_answer":
# this will allow the stream to end on the next `on_llm_end` call
self.final_answer_seen = True
self.queue.put_nowait(kwargs.get("chunk"))
return

async def on_llm_end(self, *args, **kwargs) -> None:
"""Put None in the queue to signal completion."""
#print(f"on_llm_end: {args}, {kwargs}")
# this should only be used at the end of our agent execution, however LangChain
# will call this at the end of every tool call, not just the final tool call
# so we must only send the "done" signal if we have already seen the final_answer
# tool call
if self.final_answer_seen:
self.queue.put_nowait("<<DONE>>")
else:
self.queue.put_nowait("<<STEP_END>>")
return

We can see how this works together in our agent invocation:

python
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

async def stream(query: str):
response = agent.with_config(
callbacks=[streamer]
)
async for token in response.astream({
"input": query,
"chat_history": [],
"agent_scratchpad": []
}):
print(token, flush=True)

await stream("What is 10 + 10")
text
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_EiasIFX9FBDrUzjBVKEq4QnA', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_EiasIFX9FBDrUzjBVKEq4QnA', 'type': 'tool_call'}] tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_EiasIFX9FBDrUzjBVKEq4QnA', 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}] tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': 'x', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': 'x', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': '":', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': '":', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ',"', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': ',"', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': ',"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'y', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': 'y', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': 'y', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': '":', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': '":', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '}', 'name': None}, 'type': None}]} response_metadata={} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18' invalid_tool_calls=[{'name': None, 'args': '}', 'id': None, 'error': None, 'type': 'invalid_tool_call'}] tool_call_chunks=[{'name': None, 'args': '}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}]
content='' additional_kwargs={} response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'} id='run-e63bd8b0-7b02-41ae-bdf5-ecdfc86b2f18'

Now, we see that the output is being streamed token by token. Because we're being streamed a tool call, the content field is empty. Instead, we can see that LangChain adds the generated inside the tool_calls fields, within id, function.name, and function.arguments.

python
from langchain_core.messages import ToolMessage

class CustomAgentExecutor:
chat_history: list[BaseMessage]

def __init__(self, max_iterations: int = 3):
self.chat_history = []
self.max_iterations = max_iterations
self.agent: RunnableSerializable = (
{
"input": lambda x: x["input"],
"chat_history": lambda x: x["chat_history"],
"agent_scratchpad": lambda x: x.get("agent_scratchpad", [])
}
| prompt
| llm.bind_tools(tools, tool_choice="any") # we're forcing tool use again
)

async def invoke(self, input: str, streamer: QueueCallbackHandler, verbose: bool = False) -> dict:
# invoke the agent but we do this iteratively in a loop until
# reaching a final answer
count = 0
agent_scratchpad = []
while count < self.max_iterations:
# invoke a step for the agent to generate a tool call
async def stream(query: str):
response = self.agent.with_config(
callbacks=[streamer]
)
# we initialize the output dictionary that we will be populating with
# our streamed output
output = None
# now we begin streaming
async for token in response.astream({
"input": query,
"chat_history": self.chat_history,
"agent_scratchpad": agent_scratchpad
}):
if output is None:
output = token
else:
# we can just add the tokens together as they are streamed and
# we'll have the full response object at the end
output += token
if token.content != "":
# we can capture various parts of the response object
if verbose: print(f"content: {token.content}", flush=True)
tool_calls = token.additional_kwargs.get("tool_calls")
if tool_calls:
if verbose: print(f"tool_calls: {tool_calls}", flush=True)
tool_name = tool_calls[0]["function"]["name"]
if tool_name:
if verbose: print(f"tool_name: {tool_name}", flush=True)
arg = tool_calls[0]["function"]["arguments"]
if arg != "":
if verbose: print(f"arg: {arg}", flush=True)
return AIMessage(
content=output.content,
tool_calls=output.tool_calls,
tool_call_id=output.tool_calls[0]["id"]
)

tool_call = await stream(query=input)
# add initial tool call to scratchpad
agent_scratchpad.append(tool_call)
# otherwise we execute the tool and add it's output to the agent scratchpad
tool_name = tool_call.tool_calls[0]["name"]
tool_args = tool_call.tool_calls[0]["args"]
tool_call_id = tool_call.tool_call_id
tool_out = name2tool[tool_name](**tool_args)
# add the tool output to the agent scratchpad
tool_exec = ToolMessage(
content=f"{tool_out}",
tool_call_id=tool_call_id
)
agent_scratchpad.append(tool_exec)
count += 1
# if the tool call is the final answer tool, we stop
if tool_name == "final_answer":
break
# add the final output to the chat history, we only add the "answer" field
final_answer = tool_out["answer"]
self.chat_history.extend([
HumanMessage(content=input),
AIMessage(content=final_answer)
])
# return the final answer in dict form
return tool_args

agent_executor = CustomAgentExecutor()

We've added a few print statements to help us see what is being output; we activate those by setting verbose=True. Let's see what our executor will return:

python
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer, verbose=True)
text
tool_calls: [{'index': 0, 'id': 'call_gLVlhf7x7lXbTfrkgpbH1Feb', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]
tool_name: add
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]
arg: {"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]
arg: x
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]
arg: ":
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
arg: 10
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ',"', 'name': None}, 'type': None}]
arg: ,"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'y', 'name': None}, 'type': None}]
arg: y
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]
arg: ":
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
arg: 10
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '}', 'name': None}, 'type': None}]
arg: }
tool_calls: [{'index': 0, 'id': 'call_nMVudKf9Q4vhWqxPgcZvKyj7', 'function': {'arguments': '', 'name': 'final_answer'}, 'type': 'function'}]
tool_name: final_answer
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]
arg: {"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'answer', 'name': None}, 'type': None}]
arg: answer
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":"', 'name': None}, 'type': None}]
arg: ":"
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
arg: 10
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ' +', 'name': None}, 'type': None}]
arg: +
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ' ', 'name': None}, 'type': None}]
arg:
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]
arg: 10
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ' equals', 'name': None}, 'type': None}]
arg: equals
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': ' ', 'name': None}, 'type': None}]
arg:
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '20', 'name': None}, 'type': None}]
arg: 20
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '.","', 'name': None}, 'type': None}]
arg: .","
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'tools', 'name': None}, 'type': None}]
arg: tools
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '_used', 'name': None}, 'type': None}]
arg: _used
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '":["', 'name': None}, 'type': None}]
arg: ":["
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': 'functions', 'name': None}, 'type': None}]
arg: functions
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '.add', 'name': None}, 'type': None}]
arg: .add
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '"]', 'name': None}, 'type': None}]
arg: "]
tool_calls: [{'index': 0, 'id': None, 'function': {'arguments': '}', 'name': None}, 'type': None}]
arg: }

We can see what is being output through the verbose=True flag. However, if we do not print the output, we will see nothing:

python
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

out = await agent_executor.invoke("What is 10 + 10", streamer)

Although we see nothing, it does not mean that our executor is processing nothing — we're just not using our callback handler and asyncio.Queue. To use these, we create an asyncio task, iterate over the __aiter__ method of our streamer object, and await the task like so:

python
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer))

async for token in streamer:
print(token, flush=True)

await task
text
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_MTzry6jOWDaPeY64l03tfcDE', 'function': {'arguments': '', 'name': 'add'}, 'type': 'function'}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', tool_calls=[{'name': 'add', 'args': {}, 'id': 'call_MTzry6jOWDaPeY64l03tfcDE', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'add', 'args': '', 'id': 'call_MTzry6jOWDaPeY64l03tfcDE', 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}], tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'x', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': 'x', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'x', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': '":', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '":', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ',"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': ',"', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': ',"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'y', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': 'y', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'y', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': '":', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '":', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '}', 'name': None}, 'type': None}]}, response_metadata={}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6', invalid_tool_calls=[{'name': None, 'args': '}', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
generation_info={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'} message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'}, id='run-e4a442ef-9c3c-4d0f-b2f7-d3834eb739a6')
<<STEP_END>>
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': 'call_jIc6ki3gvfjWHUgDSuTXU7e7', 'function': {'arguments': '', 'name': 'final_answer'}, 'type': 'function'}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', tool_calls=[{'name': 'final_answer', 'args': {}, 'id': 'call_jIc6ki3gvfjWHUgDSuTXU7e7', 'type': 'tool_call'}], tool_call_chunks=[{'name': 'final_answer', 'args': '', 'id': 'call_jIc6ki3gvfjWHUgDSuTXU7e7', 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '{"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', tool_calls=[{'name': '', 'args': {}, 'id': None, 'type': 'tool_call'}], tool_call_chunks=[{'name': None, 'args': '{"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'answer', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': 'answer', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'answer', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":"', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '":"', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '":"', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ' +', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': ' +', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': ' +', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ' ', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': ' ', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': ' ', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '10', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '10', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '10', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ' equals', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': ' equals', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': ' equals', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': ' ', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': ' ', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': ' ', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '20', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '20', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '20', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '.","', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '.","', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '.","', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'tools', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': 'tools', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'tools', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '_used', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '_used', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '_used', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '":["', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '":["', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '":["', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': 'functions', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': 'functions', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': 'functions', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '.add', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '.add', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '.add', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '"]', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '"]', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '"]', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
message=AIMessageChunk(content='', additional_kwargs={'tool_calls': [{'index': 0, 'id': None, 'function': {'arguments': '}', 'name': None}, 'type': None}]}, response_metadata={}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6', invalid_tool_calls=[{'name': None, 'args': '}', 'id': None, 'error': None, 'type': 'invalid_tool_call'}], tool_call_chunks=[{'name': None, 'args': '}', 'id': None, 'index': 0, 'type': 'tool_call_chunk'}])
generation_info={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'} message=AIMessageChunk(content='', additional_kwargs={}, response_metadata={'finish_reason': 'tool_calls', 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_bd83329f63'}, id='run-92c0199f-9526-427e-86ba-a2981b8f3ca6')
python
{'answer': '10 + 10 equals 20.', 'tools_used': ['functions.add']}

Although this seems like a lot of work, we're now streaming tokens in a way that allows us to pass them on to other parts of our code—such as through a websocket, streamed API response, or some downstream processing.

Let's try this out. We'll put together some simple post-processing to allow us to format the streamed output from our agent more nicely.

python
queue = asyncio.Queue()
streamer = QueueCallbackHandler(queue)

task = asyncio.create_task(agent_executor.invoke("What is 10 + 10", streamer))

async for token in streamer:
# first identify if we have a <<STEP_END>> token
if token == "<<STEP_END>>":
print("\n", flush=True)
# we'll first identify if the token is a tool call
elif tool_calls := token.message.additional_kwargs.get("tool_calls"):
# if we have a tool call with a tool name, we'll print it
if tool_name := tool_calls[0]["function"]["name"]:
print(f"Calling {tool_name}...", flush=True)
# if we have a tool call with arguments, we add them to our args string
if tool_args := tool_calls[0]["function"]["arguments"]:
print(f"{tool_args}", end="", flush=True)

_ = await task
text
Calling add...
{"x":10,"y":10}

Calling final_answer...
{"answer":"10 + 10 equals 20.","tools_used":["functions.add"]}

With that, we've produced a nice streaming output within our notebook, which, of course, can be applied with very similar logic elsewhere, such as within a more polished web app.