Content-Length: 331817 | pFad | http://github.com/Azure/azure-sdk-for-python/issues/41419

DD Azure AI Agents EventHandler | Missing most events · Issue #41419 · Azure/azure-sdk-for-python · GitHub
Skip to content

Azure AI Agents EventHandler | Missing most events #41419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
hayescode opened this issue Jun 4, 2025 · 6 comments
Open

Azure AI Agents EventHandler | Missing most events #41419

hayescode opened this issue Jun 4, 2025 · 6 comments
Assignees
Labels
AI Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.

Comments

@hayescode
Copy link

  • Package Name: azure-ai-agents
  • Package Version: 1.1.0b1
  • Operating System: Windows 11
  • Python Version: 3.13.2

Describe the bug
I am migrating from OpenAI Assistants API to Azure AI Agents and I use AsyncEventHandler to handle the different streaming events.

The sample and the SDK only show the following methods to override (Source)

The Azure OpenAI Assistants API had many more methods (Source) and there are on_xxx_created(), on_xxx_delta(), and on_xxx_done() events to distinguish but this SDK only has on_xxx(). This is making it very difficult to migrate.

The following methods are missing:

def on_run_step_created(self, run_step: RunStep)
def on_run_step_delta(self, delta: RunStepDelta, snapshot: RunStep) # on_run_step() contains deltas
def on_run_step_done(self, run_step: RunStep)
def on_message_created(self, message: Message)
def on_message_done(self, message: Message)
def on_text_created(self, text: Text)
def on_text_delta(self, delta: TextDelta, snapshot: Text)
def on_text_done(self, text: Text)
def on_image_file_done(self, image_file: ImageFile)
def on_tool_call_created(self, tool_call: ToolCall)
def on_tool_call_delta(self, delta: ToolCallDelta, snapshot: ToolCall)
def on_tool_call_done(self, tool_call: ToolCall)

The following methods don't exist, but should.

def on_reasoning_summary_created(self)
def on_reasoning_summary_delta(self)
def on_reasoning_summary_done(self)
@github-actions github-actions bot added AI Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. labels Jun 4, 2025
Copy link

github-actions bot commented Jun 4, 2025

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @luigiw @needuv @paulshealy1 @singankit.

Copy link

github-actions bot commented Jun 4, 2025

Hi @hayescode, I'm an AI Support assistant here to help with your issue. While the Azure SDK team reviews your request, I wanted to provide some possible tips and documentation that might help you in the meantime.

  • The current design exposes a single method per event type (for example, on_run_step) and internally aggregates deltas rather than exposing individual events (like created, delta, and done). This means you may need to implement custom parsing logic in your overridden methods.
  • For scenarios where you need more granular distinction between events, consider overriding the base event handler directly. See the override base event handler sample for guidance.
  • The sample for asynchronous streaming event handling outlines the current API usage and how events are parsed.
  • If you require further customization in your implementation, you can parse the event types and data returned by the default handler and call your own functions accordingly. Refer to the Azure AI Projects README for more details on how event handling is structured.

The Azure SDK team will respond to your issue shortly. I hope these suggestions are helpful in the meantime. If this comment helped you, please give it a 👍. If the suggestion was not helpful or incorrect, please give it a 👎. Your feedback helps us improve!

@hayescode
Copy link
Author

I solved this today, can this be added to the SDK?

import json
from typing import Any, Dict, Optional, Tuple, Type, Union, cast

from azure.ai.agents.models import (
    AgentStreamEvent,
    AgentThread,
    BaseAsyncAgentEventHandler,
    MessageDeltaChunk,
    MessageStatus,
    RunStatus,
    RunStep,
    RunStepDeltaChunk,
    RunStepDeltaDetail,
    RunStepDeltaToolCall,
    RunStepDeltaToolCallObject,
    RunStepStatus,
    StreamEventData,
    SubmitToolOutputsDetails,
    ThreadMessage,
    ThreadRun,
)
from azure.ai.agents.models._models import MessageDeltaChunk as MessageDeltaChunkGenerated


class AsyncAzureAgentEventHandler(BaseAsyncAgentEventHandler[Optional[str]]):
    def __init__(self) -> None:
        super().__init__()
        self._current_tool_call_index: Optional[int] = None
        self._current_tool_call: Optional[RunStepDeltaToolCall] = None
        self.__run_step_snapshots: dict[str, RunStep] = {}

    def _filter_parameters(self, model_class: Type, parameters: Dict[str, Any]) -> Dict[str, Any]:
        new_params = {}
        valid_parameters = set(
            filter(
                lambda x: not x.startswith("_") and hasattr(model_class.__dict__[x], "_type"),
                model_class.__dict__.keys(),
            )
        )
        for k in filter(lambda x: x in valid_parameters, parameters.keys()):
            new_params[k] = parameters[k]
        return new_params

    def _safe_instantiate(
        self, model_class: Type, parameters: Union[str, Dict[str, Any]], *, generated_class: Optional[Type] = None
    ):
        if not generated_class:
            generated_class = model_class
        if not isinstance(parameters, dict):
            return parameters
        return cast(StreamEventData, model_class(**self._filter_parameters(generated_class, parameters)))

    def _parse_event(self, raw: str) -> Tuple[str, dict]:
        lines = raw.strip().splitlines()
        et = next((l.split(":", 1)[1].strip() for l in lines if l.startswith("event:")), None)
        if not et:
            raise ValueError("no event type")
        data = next((l.split(":", 1)[1].strip() for l in lines if l.startswith("data:")), "")
        if not data or data == "[DONE]":
            return et, {}
        return et, json.loads(data)

    async def _process_event(self, raw: str) -> Any:
        event_type, data = self._parse_event(raw)

        # Threads
        if event_type == AgentStreamEvent.THREAD_CREATED:
            new_data = self._safe_instantiate(AgentThread, data)
            return await self.on_thread_created(new_data)
        # Runs
        elif event_type == AgentStreamEvent.THREAD_RUN_CREATED:
            new_data = self._safe_instantiate(ThreadRun, data)
            return await self.on_run_created(new_data)
        elif event_type == AgentStreamEvent.THREAD_RUN_REQUIRES_ACTION:
            new_data = self._safe_instantiate(SubmitToolOutputsDetails, data["required_action"]["submit_tool_outputs"])
            return await self.on_run_requires_action(new_data)
        elif event_type in (
            AgentStreamEvent.THREAD_RUN_QUEUED,
            AgentStreamEvent.THREAD_RUN_IN_PROGRESS,
            AgentStreamEvent.THREAD_RUN_COMPLETED,
            AgentStreamEvent.THREAD_RUN_CANCELLING,
            AgentStreamEvent.THREAD_RUN_CANCELLED,
            AgentStreamEvent.THREAD_RUN_EXPIRED,
        ):
            if self._current_tool_call:
                await self.on_tool_call_done(self._current_tool_call)
            new_data = self._safe_instantiate(ThreadRun, data)
            run_status = new_data.status if hasattr(new_data, "status") else None
            return await self.on_run_update(new_data, run_status)
        # Run Steps
        elif event_type == AgentStreamEvent.THREAD_RUN_STEP_CREATED:
            new_data = self._safe_instantiate(RunStep, data)
            self.__run_step_snapshots[new_data.id] = new_data
            return await self.on_run_step_created(new_data)
        elif event_type == AgentStreamEvent.THREAD_RUN_STEP_FAILED:
            new_data = self._safe_instantiate(RunStep, data)
            return await self.on_run_step_failed(new_data)
        elif event_type in (
            AgentStreamEvent.THREAD_RUN_STEP_IN_PROGRESS,
            AgentStreamEvent.THREAD_RUN_STEP_COMPLETED,
            AgentStreamEvent.THREAD_RUN_STEP_CANCELLED,
            AgentStreamEvent.THREAD_RUN_STEP_EXPIRED,
        ):
            new_data = self._safe_instantiate(RunStep, data)
            run_step_status = new_data.status if hasattr(new_data, "status") else None
            if self._current_tool_call is not None:
                await self.on_tool_call_done(self._current_tool_call)
                self._current_tool_call_index = None
                self._current_tool_call = None
            return await self.on_run_step_update(new_data, run_step_status)
        # Run Steps - Tool Calls
        elif event_type == AgentStreamEvent.THREAD_RUN_STEP_DELTA:
            run_step_data = self._safe_instantiate(RunStepDeltaChunk, data)  # TODO HERE!
            step_details = self._safe_instantiate(RunStepDeltaDetail, run_step_data.delta.step_details)
            if isinstance(step_details, RunStepDeltaToolCallObject):
                step_snapshot = self.__run_step_snapshots[run_step_data.id]
                tool_call = cast(RunStepDeltaToolCall, step_details.tool_calls[0])
                if tool_call.index == self._current_tool_call_index:
                    prev = step_snapshot.step_details.tool_calls[tool_call.index].function.arguments or ""
                    inc = tool_call.function.arguments or ""
                    step_snapshot.step_details.tool_calls[tool_call.index].function.arguments = prev + inc
                    return await self.on_tool_call_delta(
                        tool_call, step_snapshot.step_details.tool_calls[tool_call.index]
                    )
                elif tool_call.index != self._current_tool_call_index:
                    if self._current_tool_call is not None:
                        return await self.on_tool_call_done(self._current_tool_call)
                    if not step_snapshot.step_details.tool_calls:
                        step_snapshot.step_details.tool_calls.append(
                            tool_call
                        )  # Append the new tool call to the snapshot and begin accumulating
                    self._current_tool_call_index = tool_call.index
                    self._current_tool_call = tool_call
                    return await self.on_tool_call_created(self._current_tool_call)
                else:
                    print(
                        f"Unexpected tool call index: {tool_call.index}, current index: {self._current_tool_call_index}"
                    )
                if not step_snapshot.step_details.tool_calls:
                    step_snapshot.step_details.tool_calls.append(tool_call)
                else:
                    self._current_tool_call = step_snapshot.step_details.tool_calls[tool_call.index]

        # Messages
        elif event_type == AgentStreamEvent.THREAD_MESSAGE_CREATED:
            new_data = self._safe_instantiate(ThreadMessage, data)
            return await self.on_message_created(new_data)
        elif event_type == AgentStreamEvent.THREAD_MESSAGE_DELTA:
            new_data = self._safe_instantiate(MessageDeltaChunk, data, generated_class=MessageDeltaChunkGenerated)
            return await self.on_message_delta(new_data)
        elif event_type in (
            AgentStreamEvent.THREAD_MESSAGE_IN_PROGRESS,
            AgentStreamEvent.THREAD_MESSAGE_COMPLETED,
            AgentStreamEvent.THREAD_MESSAGE_INCOMPLETE,
        ):
            new_data = self._safe_instantiate(ThreadMessage, data)
            message_update_status = new_data.status if hasattr(new_data, "status") else None
            return await self.on_message_update(new_data, message_update_status)
        else:
            # Handle unhandled events
            return await self.on_unhandled_event(event_type, data)

    async def on_unhandled_event(self, event_type: str, event: Any):
        """Fires on unhandled events."""
        pass

    # Threads
    async def on_thread_created(self, event: AgentThread):
        """Fires when a new thread is created."""
        pass

    # Runs
    async def on_run_created(self, event: ThreadRun):
        """Fires when a new run is created."""
        pass

    async def on_run_update(self, event: ThreadRun, status: RunStatus):
        """Fires when a run is updated."""
        pass

    async def on_run_requires_action(self, required_action: SubmitToolOutputsDetails):
        """Fires when a run requires action."""
        pass

    # Run Steps
    async def on_run_step_created(self, event: RunStep):
        """Fires when a new run step is created."""
        pass

    async def on_run_step_delta(self, event: RunStepDeltaChunk):
        """Fires when a run step delta is received."""
        pass

    async def on_run_step_update(self, event: RunStep, status: RunStepStatus):
        """Fires when a run step is updated."""
        pass

    async def on_run_step_failed(self, event: RunStep):
        """Fires when a run step fails."""
        pass

    async def on_tool_call_created(self, event: RunStepDeltaToolCallObject):
        """Fires when a tool call is created."""
        pass

    async def on_tool_call_delta(self, event: RunStepDeltaToolCallObject, snapshot: RunStep):
        """Fires when a tool call delta is received."""
        pass

    async def on_tool_call_done(self, event: RunStepDeltaToolCallObject):
        """Fires when a tool call is completed."""
        pass

    # Messages
    async def on_message_created(self, event: ThreadMessage):
        """Fires when a new message is created."""
        pass

    async def on_message_delta(self, event: MessageDeltaChunk):
        """Fires when a message delta is received."""
        pass

    async def on_message_update(self, event: ThreadMessage, status: MessageStatus):
        """Fires when a message is updated."""
        pass

    async def on_error(self, event: Any):
        """Fires when an error occurs."""
        pass

    async def on_done(self, event: Any):
        """Fires when the event handler is done processing."""
        pass

    async def until_done(self) -> None:
        try:
            async for _ in self:
                pass
        except StopAsyncIteration:
            pass

@hayescode
Copy link
Author

Actually the SDK is bugged. You cannot pass in a custom event handler because only the default is allowed.

See here and follow the inheritance and overloads. https://github.com/Azure/azure-sdk-for-python/blob/main/sdk/ai/azure-ai-agents/azure/ai/agents/operations/_patch.py#L893

If you try to pass in the above code to stream() it is ignored and uses the SDK's event handler. This is pretty major please advise. @howieleung @jhakulin

@howieleung
Copy link
Member

Hello @hayescode Thanks for your feedback. I just retested the sample that override the BaseAsyncEventHandler. It works.. Please take a look:
https://github.com/Azure/azure-sdk-for-python/blob/release/azure-ai-agents/1.0.0/sdk/ai/azure-ai-agents/samples/agents_async/sample_agents_stream_with_base_override_eventhandler_async.py

I tried to replace the class with your code, but got many errors on parameters or variables. If you need more help, could you replace the class in this sample and add the attachment here.

@howieleung
Copy link
Member

Regarding method missing, we do invoke one event function per event. So when you migrate from Open AI, you should be able to find one of our event function similar to Open AI but different name. Because this SDK supports non-OpenAI model, we do not plan to create method exactly the same as Open AI. However, because many users use Open AI model, it would make sense to offer AsyncOpenAIEventHandler class overrides BaseAsyncAgentEventHandler as a non-default event handler class with similar method signatures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AI Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team.
Projects
None yet
Development

No branches or pull requests

2 participants








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/Azure/azure-sdk-for-python/issues/41419

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy