-
Notifications
You must be signed in to change notification settings - Fork 3k
Azure AI Agents EventHandler | Missing most events #41419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @luigiw @needuv @paulshealy1 @singankit. |
Hi @hayescode, I'm an AI Support assistant here to help with your issue. While the Azure SDK team reviews your request, I wanted to provide some possible tips and documentation that might help you in the meantime.
The Azure SDK team will respond to your issue shortly. I hope these suggestions are helpful in the meantime. If this comment helped you, please give it a 👍. If the suggestion was not helpful or incorrect, please give it a 👎. Your feedback helps us improve! |
I solved this today, can this be added to the SDK? import json
from typing import Any, Dict, Optional, Tuple, Type, Union, cast
from azure.ai.agents.models import (
AgentStreamEvent,
AgentThread,
BaseAsyncAgentEventHandler,
MessageDeltaChunk,
MessageStatus,
RunStatus,
RunStep,
RunStepDeltaChunk,
RunStepDeltaDetail,
RunStepDeltaToolCall,
RunStepDeltaToolCallObject,
RunStepStatus,
StreamEventData,
SubmitToolOutputsDetails,
ThreadMessage,
ThreadRun,
)
from azure.ai.agents.models._models import MessageDeltaChunk as MessageDeltaChunkGenerated
class AsyncAzureAgentEventHandler(BaseAsyncAgentEventHandler[Optional[str]]):
def __init__(self) -> None:
super().__init__()
self._current_tool_call_index: Optional[int] = None
self._current_tool_call: Optional[RunStepDeltaToolCall] = None
self.__run_step_snapshots: dict[str, RunStep] = {}
def _filter_parameters(self, model_class: Type, parameters: Dict[str, Any]) -> Dict[str, Any]:
new_params = {}
valid_parameters = set(
filter(
lambda x: not x.startswith("_") and hasattr(model_class.__dict__[x], "_type"),
model_class.__dict__.keys(),
)
)
for k in filter(lambda x: x in valid_parameters, parameters.keys()):
new_params[k] = parameters[k]
return new_params
def _safe_instantiate(
self, model_class: Type, parameters: Union[str, Dict[str, Any]], *, generated_class: Optional[Type] = None
):
if not generated_class:
generated_class = model_class
if not isinstance(parameters, dict):
return parameters
return cast(StreamEventData, model_class(**self._filter_parameters(generated_class, parameters)))
def _parse_event(self, raw: str) -> Tuple[str, dict]:
lines = raw.strip().splitlines()
et = next((l.split(":", 1)[1].strip() for l in lines if l.startswith("event:")), None)
if not et:
raise ValueError("no event type")
data = next((l.split(":", 1)[1].strip() for l in lines if l.startswith("data:")), "")
if not data or data == "[DONE]":
return et, {}
return et, json.loads(data)
async def _process_event(self, raw: str) -> Any:
event_type, data = self._parse_event(raw)
# Threads
if event_type == AgentStreamEvent.THREAD_CREATED:
new_data = self._safe_instantiate(AgentThread, data)
return await self.on_thread_created(new_data)
# Runs
elif event_type == AgentStreamEvent.THREAD_RUN_CREATED:
new_data = self._safe_instantiate(ThreadRun, data)
return await self.on_run_created(new_data)
elif event_type == AgentStreamEvent.THREAD_RUN_REQUIRES_ACTION:
new_data = self._safe_instantiate(SubmitToolOutputsDetails, data["required_action"]["submit_tool_outputs"])
return await self.on_run_requires_action(new_data)
elif event_type in (
AgentStreamEvent.THREAD_RUN_QUEUED,
AgentStreamEvent.THREAD_RUN_IN_PROGRESS,
AgentStreamEvent.THREAD_RUN_COMPLETED,
AgentStreamEvent.THREAD_RUN_CANCELLING,
AgentStreamEvent.THREAD_RUN_CANCELLED,
AgentStreamEvent.THREAD_RUN_EXPIRED,
):
if self._current_tool_call:
await self.on_tool_call_done(self._current_tool_call)
new_data = self._safe_instantiate(ThreadRun, data)
run_status = new_data.status if hasattr(new_data, "status") else None
return await self.on_run_update(new_data, run_status)
# Run Steps
elif event_type == AgentStreamEvent.THREAD_RUN_STEP_CREATED:
new_data = self._safe_instantiate(RunStep, data)
self.__run_step_snapshots[new_data.id] = new_data
return await self.on_run_step_created(new_data)
elif event_type == AgentStreamEvent.THREAD_RUN_STEP_FAILED:
new_data = self._safe_instantiate(RunStep, data)
return await self.on_run_step_failed(new_data)
elif event_type in (
AgentStreamEvent.THREAD_RUN_STEP_IN_PROGRESS,
AgentStreamEvent.THREAD_RUN_STEP_COMPLETED,
AgentStreamEvent.THREAD_RUN_STEP_CANCELLED,
AgentStreamEvent.THREAD_RUN_STEP_EXPIRED,
):
new_data = self._safe_instantiate(RunStep, data)
run_step_status = new_data.status if hasattr(new_data, "status") else None
if self._current_tool_call is not None:
await self.on_tool_call_done(self._current_tool_call)
self._current_tool_call_index = None
self._current_tool_call = None
return await self.on_run_step_update(new_data, run_step_status)
# Run Steps - Tool Calls
elif event_type == AgentStreamEvent.THREAD_RUN_STEP_DELTA:
run_step_data = self._safe_instantiate(RunStepDeltaChunk, data) # TODO HERE!
step_details = self._safe_instantiate(RunStepDeltaDetail, run_step_data.delta.step_details)
if isinstance(step_details, RunStepDeltaToolCallObject):
step_snapshot = self.__run_step_snapshots[run_step_data.id]
tool_call = cast(RunStepDeltaToolCall, step_details.tool_calls[0])
if tool_call.index == self._current_tool_call_index:
prev = step_snapshot.step_details.tool_calls[tool_call.index].function.arguments or ""
inc = tool_call.function.arguments or ""
step_snapshot.step_details.tool_calls[tool_call.index].function.arguments = prev + inc
return await self.on_tool_call_delta(
tool_call, step_snapshot.step_details.tool_calls[tool_call.index]
)
elif tool_call.index != self._current_tool_call_index:
if self._current_tool_call is not None:
return await self.on_tool_call_done(self._current_tool_call)
if not step_snapshot.step_details.tool_calls:
step_snapshot.step_details.tool_calls.append(
tool_call
) # Append the new tool call to the snapshot and begin accumulating
self._current_tool_call_index = tool_call.index
self._current_tool_call = tool_call
return await self.on_tool_call_created(self._current_tool_call)
else:
print(
f"Unexpected tool call index: {tool_call.index}, current index: {self._current_tool_call_index}"
)
if not step_snapshot.step_details.tool_calls:
step_snapshot.step_details.tool_calls.append(tool_call)
else:
self._current_tool_call = step_snapshot.step_details.tool_calls[tool_call.index]
# Messages
elif event_type == AgentStreamEvent.THREAD_MESSAGE_CREATED:
new_data = self._safe_instantiate(ThreadMessage, data)
return await self.on_message_created(new_data)
elif event_type == AgentStreamEvent.THREAD_MESSAGE_DELTA:
new_data = self._safe_instantiate(MessageDeltaChunk, data, generated_class=MessageDeltaChunkGenerated)
return await self.on_message_delta(new_data)
elif event_type in (
AgentStreamEvent.THREAD_MESSAGE_IN_PROGRESS,
AgentStreamEvent.THREAD_MESSAGE_COMPLETED,
AgentStreamEvent.THREAD_MESSAGE_INCOMPLETE,
):
new_data = self._safe_instantiate(ThreadMessage, data)
message_update_status = new_data.status if hasattr(new_data, "status") else None
return await self.on_message_update(new_data, message_update_status)
else:
# Handle unhandled events
return await self.on_unhandled_event(event_type, data)
async def on_unhandled_event(self, event_type: str, event: Any):
"""Fires on unhandled events."""
pass
# Threads
async def on_thread_created(self, event: AgentThread):
"""Fires when a new thread is created."""
pass
# Runs
async def on_run_created(self, event: ThreadRun):
"""Fires when a new run is created."""
pass
async def on_run_update(self, event: ThreadRun, status: RunStatus):
"""Fires when a run is updated."""
pass
async def on_run_requires_action(self, required_action: SubmitToolOutputsDetails):
"""Fires when a run requires action."""
pass
# Run Steps
async def on_run_step_created(self, event: RunStep):
"""Fires when a new run step is created."""
pass
async def on_run_step_delta(self, event: RunStepDeltaChunk):
"""Fires when a run step delta is received."""
pass
async def on_run_step_update(self, event: RunStep, status: RunStepStatus):
"""Fires when a run step is updated."""
pass
async def on_run_step_failed(self, event: RunStep):
"""Fires when a run step fails."""
pass
async def on_tool_call_created(self, event: RunStepDeltaToolCallObject):
"""Fires when a tool call is created."""
pass
async def on_tool_call_delta(self, event: RunStepDeltaToolCallObject, snapshot: RunStep):
"""Fires when a tool call delta is received."""
pass
async def on_tool_call_done(self, event: RunStepDeltaToolCallObject):
"""Fires when a tool call is completed."""
pass
# Messages
async def on_message_created(self, event: ThreadMessage):
"""Fires when a new message is created."""
pass
async def on_message_delta(self, event: MessageDeltaChunk):
"""Fires when a message delta is received."""
pass
async def on_message_update(self, event: ThreadMessage, status: MessageStatus):
"""Fires when a message is updated."""
pass
async def on_error(self, event: Any):
"""Fires when an error occurs."""
pass
async def on_done(self, event: Any):
"""Fires when the event handler is done processing."""
pass
async def until_done(self) -> None:
try:
async for _ in self:
pass
except StopAsyncIteration:
pass |
Actually the SDK is bugged. You cannot pass in a custom event handler because only the default is allowed. See here and follow the inheritance and overloads. https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-agents/azure/ai/agents/operations/_patch.py#L893 If you try to pass in the above code to |
Hello @hayescode Thanks for your feedback. I just retested the sample that override the BaseAsyncEventHandler. It works.. Please take a look: I tried to replace the class with your code, but got many errors on parameters or variables. If you need more help, could you replace the class in this sample and add the attachment here. |
Regarding method missing, we do invoke one event function per event. So when you migrate from Open AI, you should be able to find one of our event function similar to Open AI but different name. Because this SDK supports non-OpenAI model, we do not plan to create method exactly the same as Open AI. However, because many users use Open AI model, it would make sense to offer AsyncOpenAIEventHandler class overrides BaseAsyncAgentEventHandler as a non-default event handler class with similar method signatures. |
Describe the bug
I am migrating from OpenAI Assistants API to Azure AI Agents and I use AsyncEventHandler to handle the different streaming events.
The sample and the SDK only show the following methods to override (Source)
The Azure OpenAI Assistants API had many more methods (Source) and there are
on_xxx_created()
,on_xxx_delta()
, andon_xxx_done()
events to distinguish but this SDK only hason_xxx()
. This is making it very difficult to migrate.The following methods are missing:
The following methods don't exist, but should.
The text was updated successfully, but these errors were encountered: