diff --git a/contributing/samples/concurrent_tool_calls/__init__.py b/contributing/samples/concurrent_tool_calls/__init__.py new file mode 100755 index 000000000..c48963cdc --- /dev/null +++ b/contributing/samples/concurrent_tool_calls/__init__.py @@ -0,0 +1,15 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import agent diff --git a/contributing/samples/concurrent_tool_calls/agent.py b/contributing/samples/concurrent_tool_calls/agent.py new file mode 100755 index 000000000..d7fbe9775 --- /dev/null +++ b/contributing/samples/concurrent_tool_calls/agent.py @@ -0,0 +1,80 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime +import random +from google.adk import Agent +from google.adk.planners import BuiltInPlanner +from google.adk.planners import PlanReActPlanner +from google.adk.tools.tool_context import ToolContext +from google.genai import types + + +async def get_weather(city: str, tool_context: ToolContext) -> str: + """Get weather information for a specified city. + + Args: + city: The name of the city to get weather information for. + + Returns: + A string containing weather information for the city. + """ + print('@get_weather is starting', datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) + import asyncio + # Use async sleep for non-blocking delay to simulate a real-world weather API call. + await asyncio.sleep(3) + + # Mock weather data for demonstration + weather_conditions = ["sunny", "cloudy", "rainy", "snowy", "partly cloudy", "stormy"] + temperature = random.randint(-10, 35) # Temperature in Celsius + condition = random.choice(weather_conditions) + humidity = random.randint(30, 90) + + weather_info = f"Weather in {city}: {condition}, {temperature}°C, humidity {humidity}%" + + if not 'weather_queries' in tool_context.state: + tool_context.state['weather_queries'] = [] + + tool_context.state['weather_queries'] = tool_context.state['weather_queries'] + [{"city": city, "weather": weather_info}] + return weather_info + +def before_tool_cb(tool, args, tool_context): + print('@before_tool_cb1', datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) + +def after_tool_cb(tool, args, tool_context, tool_response): + print('@after_tool_cb1', datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')) + + +root_agent = Agent( + model='gemini-2.0-flash', + name='weather_agent', + description=( + 'Weather information agent that can provide current weather conditions' + ' for different cities around the world.' + ), + instruction=""" + You provide weather information for cities and answer questions about weather conditions. + You can check weather for different cities around the world. + You can use multiple tools in parallel by calling functions in parallel (in one request and in one round). + It is ok to discuss previous weather queries and compare weather conditions between cities. + When you are asked to check weather for a city, you must call the get_weather tool with the city name. Be sure to pass in a string with the city name. + + IMPORTANT: When you are asked to check weather for multiple cities (e.g., "check weather in New York, London, and Tokyo"), you MUST make ALL the get_weather function calls in parallel (simultaneously in one turn) to provide a faster response. Do NOT wait between weather queries. + """, + tools=[ + get_weather, + ], + before_tool_callback=[before_tool_cb], + after_tool_callback=[after_tool_cb], +) diff --git a/contributing/samples/concurrent_tool_calls/main.py b/contributing/samples/concurrent_tool_calls/main.py new file mode 100755 index 000000000..bada25655 --- /dev/null +++ b/contributing/samples/concurrent_tool_calls/main.py @@ -0,0 +1,78 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import time +import warnings + +import agent +from dotenv import load_dotenv +from google.adk import Runner +from google.adk.artifacts import InMemoryArtifactService +from google.adk.cli.utils import logs +from google.adk.sessions import InMemorySessionService +from google.adk.sessions import Session +from google.genai import types + +load_dotenv(override=True) +warnings.filterwarnings('ignore', category=UserWarning) +logs.log_to_tmp_folder() + + +async def main(): + app_name = 'my_app' + user_id_1 = 'user1' + session_service = InMemorySessionService() + artifact_service = InMemoryArtifactService() + runner = Runner( + app_name=app_name, + agent=agent.root_agent, + artifact_service=artifact_service, + session_service=session_service, + ) + session_11 = await session_service.create_session( + app_name=app_name, user_id=user_id_1 + ) + + async def run_prompt(session: Session, new_message: str): + content = types.Content( + role='user', parts=[types.Part.from_text(text=new_message)] + ) + print('** User says:', content.model_dump(exclude_none=True)) + async for event in runner.run_async( + user_id=user_id_1, + session_id=session.id, + new_message=content, + ): + if event.content.parts and event.content.parts[0].text: + print(f'** {event.author}: {event.content.parts[0].text}') + + start_time = time.time() + print('Start time:', start_time) + print('------------------------------------') + await run_prompt(session_11, 'Hi') + await run_prompt(session_11, 'I have an urgent meeting. I need to check weather in New York, London, and Tokyo as soon as possible.') + print( + await artifact_service.list_artifact_keys( + app_name=app_name, user_id=user_id_1, session_id=session_11.id + ) + ) + end_time = time.time() + print('------------------------------------') + print('End time:', end_time) + print('Total time:', end_time - start_time) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/src/google/adk/flows/llm_flows/functions.py b/src/google/adk/flows/llm_flows/functions.py index 2541ac664..43d88f117 100644 --- a/src/google/adk/flows/llm_flows/functions.py +++ b/src/google/adk/flows/llm_flows/functions.py @@ -123,88 +123,130 @@ def generate_auth_event( ) +async def _execute_single_function_call_async( + invocation_context: InvocationContext, + function_call_event: Event, + function_call: types.FunctionCall, + tools_dict: dict[str, BaseTool], +) -> Optional[Event]: + """Executes a single function call and returns the function response event.""" + from ...agents.llm_agent import LlmAgent + + agent = invocation_context.agent + if not isinstance(agent, LlmAgent): + return None + + tool, tool_context = _get_tool_and_context( + invocation_context, + function_call_event, + function_call, + tools_dict, + ) + + with tracer.start_as_current_span(f'execute_tool {tool.name}'): + # do not use "args" as the variable name, because it is a reserved keyword + # in python debugger. + function_args = function_call.args or {} + function_response: Optional[dict] = None + + for callback in agent.canonical_before_tool_callbacks: + function_response = callback( + tool=tool, args=function_args, tool_context=tool_context + ) + if inspect.isawaitable(function_response): + function_response = await function_response + if function_response: + break + + if not function_response: + function_response = await __call_tool_async( + tool, args=function_args, tool_context=tool_context + ) + + for callback in agent.canonical_after_tool_callbacks: + altered_function_response = callback( + tool=tool, + args=function_args, + tool_context=tool_context, + tool_response=function_response, + ) + if inspect.isawaitable(altered_function_response): + altered_function_response = await altered_function_response + if altered_function_response is not None: + function_response = altered_function_response + break + + if tool.is_long_running: + # Allow long running function to return None to not provide function response. + if not function_response: + return None + + # Builds the function response event. + function_response_event = __build_response_event( + tool, function_response, tool_context, invocation_context + ) + trace_tool_call( + tool=tool, + args=function_args, + function_response_event=function_response_event, + ) + return function_response_event + + async def handle_function_calls_async( invocation_context: InvocationContext, function_call_event: Event, tools_dict: dict[str, BaseTool], filters: Optional[set[str]] = None, ) -> Optional[Event]: - """Calls the functions and returns the function response event.""" + """Calls the functions concurrently and returns the function response event.""" from ...agents.llm_agent import LlmAgent agent = invocation_context.agent if not isinstance(agent, LlmAgent): - return + return None function_calls = function_call_event.get_function_calls() - function_response_events: list[Event] = [] + # Filter function calls based on filters if provided + filtered_function_calls = [] for function_call in function_calls: if filters and function_call.id not in filters: continue - tool, tool_context = _get_tool_and_context( - invocation_context, - function_call_event, - function_call, - tools_dict, - ) + filtered_function_calls.append(function_call) - with tracer.start_as_current_span(f'execute_tool {tool.name}'): - # do not use "args" as the variable name, because it is a reserved keyword - # in python debugger. - function_args = function_call.args or {} - function_response: Optional[dict] = None - - for callback in agent.canonical_before_tool_callbacks: - function_response = callback( - tool=tool, args=function_args, tool_context=tool_context - ) - if inspect.isawaitable(function_response): - function_response = await function_response - if function_response: - break - - if not function_response: - function_response = await __call_tool_async( - tool, args=function_args, tool_context=tool_context - ) + if not filtered_function_calls: + return None - for callback in agent.canonical_after_tool_callbacks: - altered_function_response = callback( - tool=tool, - args=function_args, - tool_context=tool_context, - tool_response=function_response, - ) - if inspect.isawaitable(altered_function_response): - altered_function_response = await altered_function_response - if altered_function_response is not None: - function_response = altered_function_response - break - - if tool.is_long_running: - # Allow long running function to return None to not provide function response. - if not function_response: - continue - - # Builds the function response event. - function_response_event = __build_response_event( - tool, function_response, tool_context, invocation_context - ) - trace_tool_call( - tool=tool, - args=function_args, - function_response_event=function_response_event, - ) - function_response_events.append(function_response_event) + # Create tasks for concurrent execution + tasks = [] + for function_call in filtered_function_calls: + task = asyncio.create_task(_execute_single_function_call_async( + invocation_context, function_call_event, function_call, tools_dict + )) + tasks.append(task) + + # Execute all function calls concurrently + function_response_events = await asyncio.gather(*tasks, return_exceptions=True) + + # Filter out None results and handle exceptions + valid_function_response_events: list[Event] = [] + for result in function_response_events: + if isinstance(result, Exception): + # Log the exception but continue processing other function calls + logger.error(f"Error executing function call: {result}") + continue + if result is not None: + valid_function_response_events.append(result) - if not function_response_events: + if not valid_function_response_events: return None + merged_event = merge_parallel_function_response_events( - function_response_events + valid_function_response_events ) - if len(function_response_events) > 1: + if len(valid_function_response_events) > 1: # this is needed for debug traces of parallel calls # individual response with tool.name is traced in __build_response_event # (we drop tool.name from span name here as this is merged event) @@ -216,89 +258,123 @@ async def handle_function_calls_async( return merged_event +async def _execute_single_function_call_live( + invocation_context: InvocationContext, + function_call_event: Event, + function_call: types.FunctionCall, + tools_dict: dict[str, BaseTool], +) -> Optional[Event]: + """Executes a single function call for live handling and returns the function response event.""" + from ...agents.llm_agent import LlmAgent + + agent = cast(LlmAgent, invocation_context.agent) + tool, tool_context = _get_tool_and_context( + invocation_context, function_call_event, function_call, tools_dict + ) + with tracer.start_as_current_span(f'execute_tool {tool.name}'): + # do not use "args" as the variable name, because it is a reserved keyword + # in python debugger. + function_args = function_call.args or {} + function_response = None + # # Calls the tool if before_tool_callback does not exist or returns None. + # if agent.before_tool_callback: + # function_response = agent.before_tool_callback( + # tool, function_args, tool_context + # ) + if agent.before_tool_callback: + function_response = agent.before_tool_callback( + tool=tool, args=function_args, tool_context=tool_context + ) + if inspect.isawaitable(function_response): + function_response = await function_response + + if not function_response: + function_response = await _process_function_live_helper( + tool, tool_context, function_call, function_args, invocation_context + ) + + # Calls after_tool_callback if it exists. + # if agent.after_tool_callback: + # new_response = agent.after_tool_callback( + # tool, + # function_args, + # tool_context, + # function_response, + # ) + # if new_response: + # function_response = new_response + if agent.after_tool_callback: + altered_function_response = agent.after_tool_callback( + tool=tool, + args=function_args, + tool_context=tool_context, + tool_response=function_response, + ) + if inspect.isawaitable(altered_function_response): + altered_function_response = await altered_function_response + if altered_function_response is not None: + function_response = altered_function_response + + if tool.is_long_running: + # Allow async function to return None to not provide function response. + if not function_response: + return None + + # Builds the function response event. + function_response_event = __build_response_event( + tool, function_response, tool_context, invocation_context + ) + trace_tool_call( + tool=tool, + args=function_args, + response_event_id=function_response_event.id, + function_response=function_response, + ) + return function_response_event + + async def handle_function_calls_live( invocation_context: InvocationContext, function_call_event: Event, tools_dict: dict[str, BaseTool], ) -> Event: - """Calls the functions and returns the function response event.""" + """Calls the functions concurrently and returns the function response event.""" from ...agents.llm_agent import LlmAgent agent = cast(LlmAgent, invocation_context.agent) function_calls = function_call_event.get_function_calls() - function_response_events: list[Event] = [] + if not function_calls: + return None + + # Create tasks for concurrent execution + tasks = [] for function_call in function_calls: - tool, tool_context = _get_tool_and_context( + task = asyncio.create_task(_execute_single_function_call_live( invocation_context, function_call_event, function_call, tools_dict - ) - with tracer.start_as_current_span(f'execute_tool {tool.name}'): - # do not use "args" as the variable name, because it is a reserved keyword - # in python debugger. - function_args = function_call.args or {} - function_response = None - # # Calls the tool if before_tool_callback does not exist or returns None. - # if agent.before_tool_callback: - # function_response = agent.before_tool_callback( - # tool, function_args, tool_context - # ) - if agent.before_tool_callback: - function_response = agent.before_tool_callback( - tool=tool, args=function_args, tool_context=tool_context - ) - if inspect.isawaitable(function_response): - function_response = await function_response - - if not function_response: - function_response = await _process_function_live_helper( - tool, tool_context, function_call, function_args, invocation_context - ) - - # Calls after_tool_callback if it exists. - # if agent.after_tool_callback: - # new_response = agent.after_tool_callback( - # tool, - # function_args, - # tool_context, - # function_response, - # ) - # if new_response: - # function_response = new_response - if agent.after_tool_callback: - altered_function_response = agent.after_tool_callback( - tool=tool, - args=function_args, - tool_context=tool_context, - tool_response=function_response, - ) - if inspect.isawaitable(altered_function_response): - altered_function_response = await altered_function_response - if altered_function_response is not None: - function_response = altered_function_response - - if tool.is_long_running: - # Allow async function to return None to not provide function response. - if not function_response: - continue - - # Builds the function response event. - function_response_event = __build_response_event( - tool, function_response, tool_context, invocation_context - ) - trace_tool_call( - tool=tool, - args=function_args, - response_event_id=function_response_event.id, - function_response=function_response, - ) - function_response_events.append(function_response_event) + )) + tasks.append(task) + + # Execute all function calls concurrently + function_response_events = await asyncio.gather(*tasks, return_exceptions=True) + + # Filter out None results and handle exceptions + valid_function_response_events: list[Event] = [] + for result in function_response_events: + if isinstance(result, Exception): + # Log the exception but continue processing other function calls + logger.error(f"Error executing function call: {result}") + continue + if result is not None: + valid_function_response_events.append(result) - if not function_response_events: + if not valid_function_response_events: return None + merged_event = merge_parallel_function_response_events( - function_response_events + valid_function_response_events ) - if len(function_response_events) > 1: + if len(valid_function_response_events) > 1: # this is needed for debug traces of parallel calls # individual response with tool.name is traced in __build_response_event # (we drop tool.name from span name here as this is merged event) diff --git a/tests/unittests/flows/llm_flows/test_functions_concurrent.py b/tests/unittests/flows/llm_flows/test_functions_concurrent.py new file mode 100644 index 000000000..de9e60316 --- /dev/null +++ b/tests/unittests/flows/llm_flows/test_functions_concurrent.py @@ -0,0 +1,279 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for concurrent function execution in LLM flows.""" + +import asyncio +import time +from typing import Any +from typing import Dict + +from google.adk.agents import Agent +from google.adk.tools import ToolContext +from google.genai import types +import pytest + +from ... import testing_utils + + +@pytest.mark.asyncio +async def test_concurrent_function_execution(): + """Test that multiple functions are executed concurrently, not sequentially.""" + + # Track execution order and timing + execution_log = [] + start_time = time.time() + + async def slow_function_1(delay: float = 0.1) -> Dict[str, Any]: + """A function that simulates slow execution.""" + execution_log.append(f'function_1_start: {time.time() - start_time:.3f}s') + await asyncio.sleep(delay) + execution_log.append(f'function_1_end: {time.time() - start_time:.3f}s') + return {'result': 'function_1_completed', 'delay': delay} + + async def slow_function_2(delay: float = 0.1) -> Dict[str, Any]: + """A function that simulates slow execution.""" + execution_log.append(f'function_2_start: {time.time() - start_time:.3f}s') + await asyncio.sleep(delay) + execution_log.append(f'function_2_end: {time.time() - start_time:.3f}s') + return {'result': 'function_2_completed', 'delay': delay} + + async def slow_function_3(delay: float = 0.1) -> Dict[str, Any]: + """A function that simulates slow execution.""" + execution_log.append(f'function_3_start: {time.time() - start_time:.3f}s') + await asyncio.sleep(delay) + execution_log.append(f'function_3_end: {time.time() - start_time:.3f}s') + return {'result': 'function_3_completed', 'delay': delay} + + # Create function calls that will be executed concurrently + function_calls = [ + types.Part.from_function_call(name='slow_function_1', args={'delay': 0.1}), + types.Part.from_function_call(name='slow_function_2', args={'delay': 0.1}), + types.Part.from_function_call(name='slow_function_3', args={'delay': 0.1}), + ] + + # Expected function responses + function_responses = [ + types.Part.from_function_response( + name='slow_function_1', + response={'result': 'function_1_completed', 'delay': 0.1} + ), + types.Part.from_function_response( + name='slow_function_2', + response={'result': 'function_2_completed', 'delay': 0.1} + ), + types.Part.from_function_response( + name='slow_function_3', + response={'result': 'function_3_completed', 'delay': 0.1} + ), + ] + + responses: list[types.Content] = [ + function_calls, + 'All functions completed successfully', + ] + + mock_model = testing_utils.MockModel.create(responses=responses) + + agent = Agent( + name='concurrent_agent', + model=mock_model, + tools=[slow_function_1, slow_function_2, slow_function_3], + ) + + runner = testing_utils.TestInMemoryRunner(agent) + + # Reset start time for accurate measurement + start_time = time.time() + execution_log.clear() + + events = await runner.run_async_with_new_session('Execute all three functions') + + total_execution_time = time.time() - start_time + + # Verify the events structure + assert testing_utils.simplify_events(events) == [ + ('concurrent_agent', function_calls), + ('concurrent_agent', function_responses), + ('concurrent_agent', 'All functions completed successfully'), + ] + + # Verify concurrent execution by checking timing + # If executed concurrently: ~0.1s total (max of individual delays) + # If executed sequentially: ~0.3s total (sum of individual delays) + assert total_execution_time < 0.25, ( + f"Functions appear to be executed sequentially. " + f"Total time: {total_execution_time:.3f}s, expected < 0.25s" + ) + + # Verify that all functions started before any completed (concurrent execution) + start_count = len([log for log in execution_log if 'start' in log]) + end_count = len([log for log in execution_log if 'end' in log]) + + assert start_count == 3, f"Expected 3 function starts, got {start_count}" + assert end_count == 3, f"Expected 3 function ends, got {end_count}" + + # Print execution log for debugging (only on failure) + if total_execution_time >= 0.25: + print("Execution log:") + for log_entry in execution_log: + print(f" {log_entry}") + + +@pytest.mark.asyncio +async def test_concurrent_execution_with_exception(): + """Test that exceptions in one function don't prevent others from completing.""" + + execution_results = [] + + async def successful_function() -> Dict[str, Any]: + """A function that succeeds.""" + execution_results.append('successful_function_completed') + return {'result': 'success'} + + async def failing_function() -> Dict[str, Any]: + """A function that raises an exception.""" + execution_results.append('failing_function_started') + raise ValueError("This function always fails") + + async def another_successful_function() -> Dict[str, Any]: + """Another function that succeeds.""" + execution_results.append('another_successful_function_completed') + return {'result': 'another_success'} + + # Create function calls + function_calls = [ + types.Part.from_function_call(name='successful_function', args={}), + types.Part.from_function_call(name='failing_function', args={}), + types.Part.from_function_call(name='another_successful_function', args={}), + ] + + # Expected function responses (only successful ones) + function_responses = [ + types.Part.from_function_response( + name='successful_function', + response={'result': 'success'} + ), + types.Part.from_function_response( + name='another_successful_function', + response={'result': 'another_success'} + ), + ] + + responses: list[types.Content] = [ + function_calls, + 'Functions completed with some errors', + ] + + mock_model = testing_utils.MockModel.create(responses=responses) + + agent = Agent( + name='error_handling_agent', + model=mock_model, + tools=[successful_function, failing_function, another_successful_function], + ) + + runner = testing_utils.TestInMemoryRunner(agent) + events = await runner.run_async_with_new_session('Execute all functions') + + # Verify that successful functions completed despite one failing + assert 'successful_function_completed' in execution_results + assert 'another_successful_function_completed' in execution_results + assert 'failing_function_started' in execution_results + + # Verify the events structure contains successful function responses + event_content = testing_utils.simplify_events(events) + + # The first event should be the function calls + assert event_content[0] == ('error_handling_agent', function_calls) + + # The second event should contain the successful function responses + # (The exact structure may vary due to error handling) + assert len(event_content) >= 2 + + +@pytest.mark.asyncio +async def test_concurrent_with_mixed_sync_async(): + """Test concurrent execution with a mix of sync and async functions.""" + + execution_order = [] + + def sync_function(value: int) -> Dict[str, Any]: + """A synchronous function.""" + execution_order.append(f'sync_function_{value}') + return {'result': f'sync_{value}'} + + async def async_function(value: int) -> Dict[str, Any]: + """An asynchronous function.""" + execution_order.append(f'async_function_{value}') + await asyncio.sleep(0.05) # Small delay to ensure async behavior + return {'result': f'async_{value}'} + + # Create function calls + function_calls = [ + types.Part.from_function_call(name='sync_function', args={'value': 1}), + types.Part.from_function_call(name='async_function', args={'value': 2}), + types.Part.from_function_call(name='sync_function', args={'value': 3}), + ] + + # Expected function responses + function_responses = [ + types.Part.from_function_response( + name='sync_function', + response={'result': 'sync_1'} + ), + types.Part.from_function_response( + name='async_function', + response={'result': 'async_2'} + ), + types.Part.from_function_response( + name='sync_function', + response={'result': 'sync_3'} + ), + ] + + responses: list[types.Content] = [ + function_calls, + 'Mixed functions completed', + ] + + mock_model = testing_utils.MockModel.create(responses=responses) + + agent = Agent( + name='mixed_agent', + model=mock_model, + tools=[sync_function, async_function], + ) + + runner = testing_utils.TestInMemoryRunner(agent) + + start_time = time.time() + events = await runner.run_async_with_new_session('Execute mixed functions') + total_time = time.time() - start_time + + # Verify the events structure + assert testing_utils.simplify_events(events) == [ + ('mixed_agent', function_calls), + ('mixed_agent', function_responses), + ('mixed_agent', 'Mixed functions completed'), + ] + + # Verify all functions executed + assert len(execution_order) == 3 + assert 'sync_function_1' in execution_order + assert 'async_function_2' in execution_order + assert 'sync_function_3' in execution_order + + # Verify execution was reasonably fast (concurrent) + assert total_time < 0.2, f"Execution took too long: {total_time:.3f}s" \ No newline at end of file