|
| 1 | +""" |
| 2 | +Example: Real-time Progress Tracking for Workflow Generation |
| 3 | +
|
| 4 | +This example demonstrates how to use the new on_step_recorded and on_status_update |
| 5 | +callbacks to track workflow generation progress in real-time. |
| 6 | +
|
| 7 | +Usage: |
| 8 | + python examples/progress_tracking_example.py |
| 9 | +""" |
| 10 | + |
| 11 | +import asyncio |
| 12 | +from datetime import datetime |
| 13 | + |
| 14 | +from browser_use.llm import ChatBrowserUse |
| 15 | + |
| 16 | +from workflow_use.healing.service import HealingService |
| 17 | + |
| 18 | + |
| 19 | +# Example 1: Simple console logging |
| 20 | +async def simple_console_example(): |
| 21 | + """Basic example: Print steps to console as they're recorded.""" |
| 22 | + print('=' * 80) |
| 23 | + print('EXAMPLE 1: Simple Console Logging') |
| 24 | + print('=' * 80) |
| 25 | + |
| 26 | + def step_callback(step_data: dict): |
| 27 | + """Called each time a step is recorded.""" |
| 28 | + print(f'\n📍 Step {step_data["step_number"]}: {step_data["description"]}') |
| 29 | + print(f' Type: {step_data["action_type"]}') |
| 30 | + print(f' URL: {step_data["url"]}') |
| 31 | + if step_data.get('target_text'): |
| 32 | + print(f' Target: {step_data["target_text"]}') |
| 33 | + if step_data.get('extracted_data'): |
| 34 | + print(f' Extracted: {step_data["extracted_data"]}') |
| 35 | + |
| 36 | + def status_callback(status: str): |
| 37 | + """Called for general status updates.""" |
| 38 | + print(f'\n🔄 {status}') |
| 39 | + |
| 40 | + # Initialize service |
| 41 | + llm = ChatBrowserUse(model='bu-latest') |
| 42 | + healing_service = HealingService( |
| 43 | + llm=llm, |
| 44 | + use_deterministic_conversion=True, |
| 45 | + enable_variable_extraction=True, |
| 46 | + ) |
| 47 | + |
| 48 | + # Generate workflow with callbacks |
| 49 | + workflow = await healing_service.generate_workflow_from_prompt( |
| 50 | + prompt='Go to example.com and extract the page title', |
| 51 | + agent_llm=llm, |
| 52 | + extraction_llm=llm, |
| 53 | + use_cloud=False, |
| 54 | + on_step_recorded=step_callback, |
| 55 | + on_status_update=status_callback, |
| 56 | + ) |
| 57 | + |
| 58 | + print(f'\n✅ Generated workflow with {len(workflow.steps)} steps!') |
| 59 | + |
| 60 | + |
| 61 | +# Example 2: Store steps in a list (for database storage) |
| 62 | +async def database_storage_example(): |
| 63 | + """Example: Store steps in memory (simulates database storage).""" |
| 64 | + print('\n' + '=' * 80) |
| 65 | + print('EXAMPLE 2: Database Storage Pattern') |
| 66 | + print('=' * 80) |
| 67 | + |
| 68 | + # Simulated database storage |
| 69 | + stored_steps = [] |
| 70 | + status_history = [] |
| 71 | + |
| 72 | + async def step_callback(step_data: dict): |
| 73 | + """Store step in database (simulated with list).""" |
| 74 | + stored_steps.append(step_data) |
| 75 | + print(f'✓ Stored step {step_data["step_number"]} in database') |
| 76 | + |
| 77 | + async def status_callback(status: str): |
| 78 | + """Store status update in database.""" |
| 79 | + status_history.append({'timestamp': datetime.now().isoformat(), 'status': status}) |
| 80 | + print(f'ℹ️ {status}') |
| 81 | + |
| 82 | + # Initialize service |
| 83 | + llm = ChatBrowserUse(model='bu-latest') |
| 84 | + healing_service = HealingService( |
| 85 | + llm=llm, |
| 86 | + use_deterministic_conversion=True, |
| 87 | + enable_variable_extraction=True, |
| 88 | + ) |
| 89 | + |
| 90 | + # Generate workflow with async callbacks |
| 91 | + workflow = await healing_service.generate_workflow_from_prompt( |
| 92 | + prompt='Go to example.com and extract the page title', |
| 93 | + agent_llm=llm, |
| 94 | + extraction_llm=llm, |
| 95 | + use_cloud=False, |
| 96 | + on_step_recorded=lambda data: asyncio.create_task(step_callback(data)), |
| 97 | + on_status_update=lambda status: asyncio.create_task(status_callback(status)), |
| 98 | + ) |
| 99 | + |
| 100 | + # Display stored data |
| 101 | + print(f'\n📊 Stored {len(stored_steps)} steps and {len(status_history)} status updates') |
| 102 | + print('\nStored Steps:') |
| 103 | + for step in stored_steps: |
| 104 | + print(f' {step["step_number"]}. {step["description"]}') |
| 105 | + |
| 106 | + print('\nStatus History:') |
| 107 | + for status in status_history: |
| 108 | + print(f' [{status["timestamp"]}] {status["status"]}') |
| 109 | + |
| 110 | + |
| 111 | +# Example 3: Real-time progress bar |
| 112 | +async def progress_bar_example(): |
| 113 | + """Example: Show progress with a simple progress indicator.""" |
| 114 | + print('\n' + '=' * 80) |
| 115 | + print('EXAMPLE 3: Progress Bar') |
| 116 | + print('=' * 80) |
| 117 | + |
| 118 | + step_count = {'count': 0} |
| 119 | + |
| 120 | + def step_callback(step_data: dict): |
| 121 | + """Update progress bar as steps are recorded.""" |
| 122 | + step_count['count'] = step_data['step_number'] |
| 123 | + # Simple progress indicator |
| 124 | + bar = '█' * step_data['step_number'] + '░' * (10 - step_data['step_number']) |
| 125 | + print(f'\rProgress: [{bar}] Step {step_data["step_number"]}: {step_data["description"][:40]}...', end='') |
| 126 | + |
| 127 | + def status_callback(status: str): |
| 128 | + """Display status updates.""" |
| 129 | + print(f'\n\n🔄 {status}') |
| 130 | + |
| 131 | + # Initialize service |
| 132 | + llm = ChatBrowserUse(model='bu-latest') |
| 133 | + healing_service = HealingService( |
| 134 | + llm=llm, |
| 135 | + use_deterministic_conversion=True, |
| 136 | + enable_variable_extraction=True, |
| 137 | + ) |
| 138 | + |
| 139 | + # Generate workflow with callbacks |
| 140 | + workflow = await healing_service.generate_workflow_from_prompt( |
| 141 | + prompt='Go to example.com and extract the page title', |
| 142 | + agent_llm=llm, |
| 143 | + extraction_llm=llm, |
| 144 | + use_cloud=False, |
| 145 | + on_step_recorded=step_callback, |
| 146 | + on_status_update=status_callback, |
| 147 | + ) |
| 148 | + |
| 149 | + print(f'\n\n✅ Completed! Generated workflow with {step_count["count"]} steps') |
| 150 | + |
| 151 | + |
| 152 | +# Example 4: Real-world pattern for Browser-Use Cloud backend |
| 153 | +async def cloud_backend_pattern(): |
| 154 | + """ |
| 155 | + Example: Pattern for Browser-Use Cloud backend integration. |
| 156 | +
|
| 157 | + This shows how to integrate with your database to store steps |
| 158 | + in real-time for frontend polling. |
| 159 | + """ |
| 160 | + print('\n' + '=' * 80) |
| 161 | + print('EXAMPLE 4: Browser-Use Cloud Backend Pattern') |
| 162 | + print('=' * 80) |
| 163 | + |
| 164 | + # Simulated workflow_id (would come from your database) |
| 165 | + workflow_id = 'wf_123abc' |
| 166 | + generation_metadata = {'steps': [], 'status_history': []} |
| 167 | + |
| 168 | + async def step_callback(step_data: dict): |
| 169 | + """ |
| 170 | + Store step immediately in database for real-time display. |
| 171 | +
|
| 172 | + In your actual implementation, this would be: |
| 173 | + async with await database.get_session() as session: |
| 174 | + workflow = await get_workflow(session, workflow_id) |
| 175 | + if workflow and workflow.generation_metadata: |
| 176 | + steps = workflow.generation_metadata.get('steps', []) |
| 177 | + steps.append(step_data) |
| 178 | + workflow.generation_metadata['steps'] = steps |
| 179 | + await session.commit() |
| 180 | + """ |
| 181 | + # Simulated database storage |
| 182 | + generation_metadata['steps'].append(step_data) |
| 183 | + |
| 184 | + print(f'💾 Stored step {step_data["step_number"]} to workflow {workflow_id}') |
| 185 | + print(f' Description: {step_data["description"]}') |
| 186 | + print(f' Type: {step_data["action_type"]}') |
| 187 | + print(f' Timestamp: {step_data["timestamp"]}') |
| 188 | + |
| 189 | + async def status_callback(status: str): |
| 190 | + """Store status updates for display in the frontend.""" |
| 191 | + status_entry = {'timestamp': datetime.now().isoformat(), 'message': status} |
| 192 | + generation_metadata['status_history'].append(status_entry) |
| 193 | + |
| 194 | + print(f'ℹ️ Status update: {status}') |
| 195 | + |
| 196 | + # Initialize service |
| 197 | + llm = ChatBrowserUse(model='bu-latest') |
| 198 | + healing_service = HealingService( |
| 199 | + llm=llm, |
| 200 | + use_deterministic_conversion=True, |
| 201 | + enable_variable_extraction=True, |
| 202 | + ) |
| 203 | + |
| 204 | + # Generate workflow with progress tracking |
| 205 | + print(f'\n🚀 Starting workflow generation for {workflow_id}...') |
| 206 | + |
| 207 | + workflow = await healing_service.generate_workflow_from_prompt( |
| 208 | + prompt='Go to example.com and extract the page title', |
| 209 | + agent_llm=llm, |
| 210 | + extraction_llm=llm, |
| 211 | + use_cloud=False, |
| 212 | + on_step_recorded=lambda data: asyncio.create_task(step_callback(data)), |
| 213 | + on_status_update=lambda status: asyncio.create_task(status_callback(status)), |
| 214 | + ) |
| 215 | + |
| 216 | + # Display final metadata (what would be in your database) |
| 217 | + print('\n' + '=' * 80) |
| 218 | + print('FINAL DATABASE STATE') |
| 219 | + print('=' * 80) |
| 220 | + print(f'\nWorkflow ID: {workflow_id}') |
| 221 | + print(f'Total Steps Recorded: {len(generation_metadata["steps"])}') |
| 222 | + print(f'Total Status Updates: {len(generation_metadata["status_history"])}') |
| 223 | + |
| 224 | + print('\n📋 Steps Timeline:') |
| 225 | + for step in generation_metadata['steps']: |
| 226 | + print(f' [{step["timestamp"]}] Step {step["step_number"]}: {step["description"]}') |
| 227 | + |
| 228 | + print('\n📊 Status Timeline:') |
| 229 | + for status in generation_metadata['status_history']: |
| 230 | + print(f' [{status["timestamp"]}] {status["message"]}') |
| 231 | + |
| 232 | + print(f'\n✅ Workflow generation complete! Final workflow has {len(workflow.steps)} steps') |
| 233 | + |
| 234 | + |
| 235 | +# Run all examples |
| 236 | +async def main(): |
| 237 | + """Run all examples (commented out to avoid actual API calls).""" |
| 238 | + print('Progress Tracking Examples') |
| 239 | + print('=' * 80) |
| 240 | + print('\nThese examples demonstrate different patterns for tracking') |
| 241 | + print('workflow generation progress in real-time.') |
| 242 | + print('\nNote: Examples are commented out to avoid actual API calls.') |
| 243 | + print('Uncomment the examples you want to run.\n') |
| 244 | + |
| 245 | + # Uncomment the examples you want to run: |
| 246 | + |
| 247 | + # await simple_console_example() |
| 248 | + # await database_storage_example() |
| 249 | + # await progress_bar_example() |
| 250 | + # await cloud_backend_pattern() |
| 251 | + |
| 252 | + print('\n✅ Examples completed!') |
| 253 | + |
| 254 | + |
| 255 | +if __name__ == '__main__': |
| 256 | + asyncio.run(main()) |
0 commit comments