Basic Streaming

import asyncio
from observee_agents import chat_with_tools_stream

async def stream_example():
    async for chunk in chat_with_tools_stream(
        message="What's the weather like today?",
        provider="anthropic",
        observee_api_key="obs_your_key_here"
    ):
        if chunk["type"] == "content":
            print(chunk["content"], end="", flush=True)
        elif chunk["type"] == "tool_result":
            print(f"\n🔧 [Tool: {chunk['tool_name']}]")

asyncio.run(stream_example())

Chunk Types

Content Chunks

{
    "type": "content",
    "content": "streaming text content"
}

Tool Result Chunks

{
    "type": "tool_result", 
    "tool_name": "tool_name",
    "result": "tool output"
}

Error Chunks

{
    "type": "error",
    "error": "error message"
}

Complete Example

import asyncio
from observee_agents import chat_with_tools_stream

async def complete_stream():
    print("🤖 AI: ", end="")
    
    async for chunk in chat_with_tools_stream(
        message="Search for recent AI news and summarize",
        provider="anthropic"
    ):
        if chunk["type"] == "content":
            print(chunk["content"], end="", flush=True)
        elif chunk["type"] == "tool_result":
            print(f"\n🔧 Used tool: {chunk['tool_name']}")
            print("🤖 AI: ", end="")
        elif chunk["type"] == "error":
            print(f"\n❌ Error: {chunk['error']}")
    
    print()  # New line at end

asyncio.run(complete_stream())

Different Providers

async def multi_provider_stream():
    providers = ["anthropic", "openai", "gemini"]
    
    for provider in providers:
        print(f"\n--- {provider.upper()} ---")
        
        async for chunk in chat_with_tools_stream(
            message="Tell me about Python",
            provider=provider
        ):
            if chunk["type"] == "content":
                print(chunk["content"], end="")

asyncio.run(multi_provider_stream())

Error Handling

async def safe_stream():
    try:
        async for chunk in chat_with_tools_stream(
            message="Search for news",
            provider="anthropic"
        ):
            if chunk["type"] == "content":
                print(chunk["content"], end="")
            elif chunk["type"] == "error":
                print(f"Error: {chunk['error']}")
                break
    except Exception as e:
        print(f"Stream error: {e}")

asyncio.run(safe_stream())

Configuration

All chat_with_tools parameters work with streaming:

async def configured_stream():
    async for chunk in chat_with_tools_stream(
        message="Creative writing task",
        provider="openai",
        temperature=0.9,
        max_tokens=2000,
        filter_type="local_embedding",
        max_tools=10
    ):
        if chunk["type"] == "content":
            print(chunk["content"], end="")

asyncio.run(configured_stream())

Building a Chat Interface

import asyncio

async def chat_interface():
    print("Chat with AI (type 'quit' to exit)")
    
    while True:
        user_input = input("\n👤 You: ")
        if user_input.lower() == 'quit':
            break
            
        print("🤖 AI: ", end="")
        
        async for chunk in chat_with_tools_stream(
            message=user_input,
            provider="anthropic"
        ):
            if chunk["type"] == "content":
                print(chunk["content"], end="", flush=True)
            elif chunk["type"] == "tool_result":
                print(f"\n🔧 [Used: {chunk['tool_name']}]")
                print("🤖 AI: ", end="")
        
        print()  # New line after response

asyncio.run(chat_interface())

Performance Tips

  1. Buffer content: Collect chunks before displaying for smoother output
  2. Handle tool updates: Show tool usage to keep users informed
  3. Timeout handling: Set reasonable timeouts for long operations
  4. Error recovery: Gracefully handle stream interruptions

Comparison with Non-Streaming

Featurechat_with_toolschat_with_tools_stream
SpeedWait for complete responseReal-time updates
User ExperienceAll at onceProgressive
Tool FeedbackFinal results onlyLive tool execution
Error HandlingSingle try/catchPer-chunk handling
Memory UsageFull response in memoryChunk-by-chunk

Next Steps