from observee_agents import chat_with_tools_stream
import asyncio
# Define custom tool handler
async def custom_tool_handler(tool_name: str, tool_input: dict) -> str:
"""Handle custom tool executions"""
if tool_name == "add_numbers":
return str(tool_input.get("a", 0) + tool_input.get("b", 0))
elif tool_name == "get_time":
from datetime import datetime
return datetime.now().strftime("%I:%M %p")
else:
return f"Unknown tool: {tool_name}"
# Define custom tools in OpenAI format
custom_tools = [
{
"type": "function",
"function": {
"name": "add_numbers",
"description": "Add two numbers together",
"parameters": {
"type": "object",
"properties": {
"a": {"type": "number", "description": "First number"},
"b": {"type": "number", "description": "Second number"}
},
"required": ["a", "b"]
}
}
},
{
"type": "function",
"function": {
"name": "get_time",
"description": "Get the current time",
"parameters": {
"type": "object",
"properties": {}
}
}
}
]
# Use custom tools
async def example():
async for chunk in chat_with_tools_stream(
message="What's 5 + 3? Also, what time is it?",
provider="anthropic",
custom_tools=custom_tools,
custom_tool_handler=custom_tool_handler,
observee_api_key="obs_your_key_here"
):
if chunk["type"] == "content":
print(chunk["content"], end="", flush=True)
elif chunk["type"] == "tool_result":
print(f"\n🔧 [Tool: {chunk['tool_name']} = {chunk['result']}]")
asyncio.run(example())