7
7
8
8
from .quiet_console import console
9
9
10
- from .claude_log_utils import (
11
- get_claude_session_log_path ,
12
- parse_jsonl_line ,
13
- read_existing_log_lines ,
14
- validate_log_entry ,
15
- format_log_for_api
16
- )
10
+ from .claude_log_utils import get_claude_session_log_path , parse_jsonl_line , read_existing_log_lines , validate_log_entry , format_log_for_api
17
11
from .claude_session_api import send_claude_session_log
18
12
19
13
20
14
class ClaudeLogWatcher :
21
15
"""Watches Claude Code session log files for new entries and sends them to the API."""
22
-
23
- def __init__ (
24
- self ,
25
- session_id : str ,
26
- org_id : Optional [int ] = None ,
27
- poll_interval : float = 1.0 ,
28
- on_log_entry : Optional [Callable [[Dict [str , Any ]], None ]] = None
29
- ):
16
+
17
+ def __init__ (self , session_id : str , org_id : Optional [int ] = None , poll_interval : float = 1.0 , on_log_entry : Optional [Callable [[Dict [str , Any ]], None ]] = None ):
30
18
"""Initialize the log watcher.
31
-
19
+
32
20
Args:
33
21
session_id: The Claude session ID to watch
34
22
org_id: Organization ID for API calls
@@ -39,56 +27,56 @@ def __init__(
39
27
self .org_id = org_id
40
28
self .poll_interval = poll_interval
41
29
self .on_log_entry = on_log_entry
42
-
30
+
43
31
self .log_path = get_claude_session_log_path (session_id )
44
32
self .last_line_count = 0
45
33
self .is_running = False
46
34
self .watcher_thread : Optional [threading .Thread ] = None
47
-
35
+
48
36
# Stats
49
37
self .total_entries_processed = 0
50
38
self .total_entries_sent = 0
51
39
self .total_send_failures = 0
52
-
40
+
53
41
def start (self ) -> bool :
54
42
"""Start the log watcher in a background thread.
55
-
43
+
56
44
Returns:
57
45
True if started successfully, False otherwise
58
46
"""
59
47
if self .is_running :
60
48
console .print (f"⚠️ Log watcher for session { self .session_id [:8 ]} ... is already running" , style = "yellow" )
61
49
return False
62
-
50
+
63
51
# Initialize line count
64
52
self .last_line_count = read_existing_log_lines (self .log_path )
65
-
53
+
66
54
self .is_running = True
67
55
self .watcher_thread = threading .Thread (target = self ._watch_loop , daemon = True )
68
56
self .watcher_thread .start ()
69
-
57
+
70
58
console .print (f"📋 Started log watcher for session { self .session_id [:8 ]} ..." , style = "green" )
71
59
console .print (f" Log file: { self .log_path } " , style = "dim" )
72
60
console .print (f" Starting from line: { self .last_line_count + 1 } " , style = "dim" )
73
-
61
+
74
62
return True
75
-
63
+
76
64
def stop (self ) -> None :
77
65
"""Stop the log watcher."""
78
66
if not self .is_running :
79
67
return
80
-
68
+
81
69
self .is_running = False
82
-
70
+
83
71
if self .watcher_thread and self .watcher_thread .is_alive ():
84
72
self .watcher_thread .join (timeout = 2.0 )
85
-
73
+
86
74
console .print (f"📋 Stopped log watcher for session { self .session_id [:8 ]} ..." , style = "dim" )
87
75
console .print (f" Processed: { self .total_entries_processed } entries" , style = "dim" )
88
76
console .print (f" Sent: { self .total_entries_sent } entries" , style = "dim" )
89
77
if self .total_send_failures > 0 :
90
78
console .print (f" Failures: { self .total_send_failures } entries" , style = "yellow" )
91
-
79
+
92
80
def _watch_loop (self ) -> None :
93
81
"""Main watching loop that runs in a background thread."""
94
82
while self .is_running :
@@ -98,104 +86,104 @@ def _watch_loop(self) -> None:
98
86
except Exception as e :
99
87
console .print (f"⚠️ Error in log watcher: { e } " , style = "yellow" )
100
88
time .sleep (self .poll_interval * 2 ) # Back off on errors
101
-
89
+
102
90
def _check_for_new_entries (self ) -> None :
103
91
"""Check for new log entries and process them."""
104
92
if not self .log_path .exists ():
105
93
return
106
-
94
+
107
95
try :
108
96
current_line_count = read_existing_log_lines (self .log_path )
109
-
97
+
110
98
if current_line_count > self .last_line_count :
111
99
new_entries = self ._read_new_lines (self .last_line_count , current_line_count )
112
-
100
+
113
101
for entry in new_entries :
114
102
self ._process_log_entry (entry )
115
-
103
+
116
104
self .last_line_count = current_line_count
117
-
105
+
118
106
except Exception as e :
119
107
console .print (f"⚠️ Error reading log file: { e } " , style = "yellow" )
120
-
108
+
121
109
def _read_new_lines (self , start_line : int , end_line : int ) -> list [Dict [str , Any ]]:
122
110
"""Read new lines from the log file.
123
-
111
+
124
112
Args:
125
113
start_line: Line number to start from (0-indexed)
126
114
end_line: Line number to end at (0-indexed, exclusive)
127
-
115
+
128
116
Returns:
129
117
List of parsed log entries
130
118
"""
131
119
entries = []
132
-
120
+
133
121
try :
134
- with open (self .log_path , 'r' , encoding = ' utf-8' ) as f :
122
+ with open (self .log_path , "r" , encoding = " utf-8" ) as f :
135
123
lines = f .readlines ()
136
-
124
+
137
125
# Read only the new lines
138
126
for i in range (start_line , min (end_line , len (lines ))):
139
127
line = lines [i ]
140
128
entry = parse_jsonl_line (line )
141
-
129
+
142
130
if entry is not None :
143
131
entries .append (entry )
144
-
132
+
145
133
except (OSError , UnicodeDecodeError ) as e :
146
134
console .print (f"⚠️ Error reading log file: { e } " , style = "yellow" )
147
-
135
+
148
136
return entries
149
-
137
+
150
138
def _process_log_entry (self , log_entry : Dict [str , Any ]) -> None :
151
139
"""Process a single log entry.
152
-
140
+
153
141
Args:
154
142
log_entry: The parsed log entry
155
143
"""
156
144
self .total_entries_processed += 1
157
-
145
+
158
146
# Validate the entry
159
147
if not validate_log_entry (log_entry ):
160
148
console .print (f"⚠️ Invalid log entry skipped: { log_entry } " , style = "yellow" )
161
149
return
162
-
150
+
163
151
# Format for API
164
152
formatted_entry = format_log_for_api (log_entry )
165
-
153
+
166
154
# Call optional callback
167
155
if self .on_log_entry :
168
156
try :
169
157
self .on_log_entry (formatted_entry )
170
158
except Exception as e :
171
159
console .print (f"⚠️ Error in log entry callback: { e } " , style = "yellow" )
172
-
160
+
173
161
# Send to API
174
162
self ._send_log_entry (formatted_entry )
175
-
163
+
176
164
def _send_log_entry (self , log_entry : Dict [str , Any ]) -> None :
177
165
"""Send a log entry to the API.
178
-
166
+
179
167
Args:
180
168
log_entry: The formatted log entry
181
169
"""
182
170
try :
183
171
success = send_claude_session_log (self .session_id , log_entry , self .org_id )
184
-
172
+
185
173
if success :
186
174
self .total_entries_sent += 1
187
175
# Only show verbose output in debug mode
188
176
console .print (f"📤 Sent log entry: { log_entry .get ('type' , 'unknown' )} " , style = "dim" )
189
177
else :
190
178
self .total_send_failures += 1
191
-
179
+
192
180
except Exception as e :
193
181
self .total_send_failures += 1
194
182
console .print (f"⚠️ Failed to send log entry: { e } " , style = "yellow" )
195
-
183
+
196
184
def get_stats (self ) -> Dict [str , Any ]:
197
185
"""Get watcher statistics.
198
-
186
+
199
187
Returns:
200
188
Dictionary with watcher stats
201
189
"""
@@ -208,96 +196,79 @@ def get_stats(self) -> Dict[str, Any]:
208
196
"total_entries_processed" : self .total_entries_processed ,
209
197
"total_entries_sent" : self .total_entries_sent ,
210
198
"total_send_failures" : self .total_send_failures ,
211
- "success_rate" : (
212
- self .total_entries_sent / max (1 , self .total_entries_processed ) * 100
213
- if self .total_entries_processed > 0 else 0
214
- )
199
+ "success_rate" : (self .total_entries_sent / max (1 , self .total_entries_processed ) * 100 if self .total_entries_processed > 0 else 0 ),
215
200
}
216
201
217
202
218
203
class ClaudeLogWatcherManager :
219
204
"""Manages multiple log watchers for different sessions."""
220
-
205
+
221
206
def __init__ (self ):
222
207
self .watchers : Dict [str , ClaudeLogWatcher ] = {}
223
-
224
- def start_watcher (
225
- self ,
226
- session_id : str ,
227
- org_id : Optional [int ] = None ,
228
- poll_interval : float = 1.0 ,
229
- on_log_entry : Optional [Callable [[Dict [str , Any ]], None ]] = None
230
- ) -> bool :
208
+
209
+ def start_watcher (self , session_id : str , org_id : Optional [int ] = None , poll_interval : float = 1.0 , on_log_entry : Optional [Callable [[Dict [str , Any ]], None ]] = None ) -> bool :
231
210
"""Start a log watcher for a session.
232
-
211
+
233
212
Args:
234
213
session_id: The Claude session ID
235
214
org_id: Organization ID for API calls
236
215
poll_interval: How often to check for new entries (seconds)
237
216
on_log_entry: Optional callback for each new log entry
238
-
217
+
239
218
Returns:
240
219
True if started successfully, False otherwise
241
220
"""
242
221
if session_id in self .watchers :
243
222
console .print (f"⚠️ Watcher for session { session_id [:8 ]} ... already exists" , style = "yellow" )
244
223
return False
245
-
246
- watcher = ClaudeLogWatcher (
247
- session_id = session_id ,
248
- org_id = org_id ,
249
- poll_interval = poll_interval ,
250
- on_log_entry = on_log_entry
251
- )
252
-
224
+
225
+ watcher = ClaudeLogWatcher (session_id = session_id , org_id = org_id , poll_interval = poll_interval , on_log_entry = on_log_entry )
226
+
253
227
if watcher .start ():
254
228
self .watchers [session_id ] = watcher
255
229
return True
256
230
return False
257
-
231
+
258
232
def stop_watcher (self , session_id : str ) -> None :
259
233
"""Stop a log watcher for a session.
260
-
234
+
261
235
Args:
262
236
session_id: The Claude session ID
263
237
"""
264
238
if session_id in self .watchers :
265
239
self .watchers [session_id ].stop ()
266
240
del self .watchers [session_id ]
267
-
241
+
268
242
def stop_all_watchers (self ) -> None :
269
243
"""Stop all active watchers."""
270
244
for session_id in list (self .watchers .keys ()):
271
245
self .stop_watcher (session_id )
272
-
246
+
273
247
def get_active_sessions (self ) -> list [str ]:
274
248
"""Get list of active session IDs being watched.
275
-
249
+
276
250
Returns:
277
251
List of session IDs
278
252
"""
279
253
return list (self .watchers .keys ())
280
-
254
+
281
255
def get_watcher_stats (self , session_id : str ) -> Optional [Dict [str , Any ]]:
282
256
"""Get stats for a specific watcher.
283
-
257
+
284
258
Args:
285
259
session_id: The Claude session ID
286
-
260
+
287
261
Returns:
288
262
Watcher stats or None if not found
289
263
"""
290
264
if session_id in self .watchers :
291
265
return self .watchers [session_id ].get_stats ()
292
266
return None
293
-
267
+
294
268
def get_all_stats (self ) -> Dict [str , Dict [str , Any ]]:
295
269
"""Get stats for all active watchers.
296
-
270
+
297
271
Returns:
298
272
Dictionary mapping session IDs to their stats
299
273
"""
300
- return {
301
- session_id : watcher .get_stats ()
302
- for session_id , watcher in self .watchers .items ()
303
- }
274
+ return {session_id : watcher .get_stats () for session_id , watcher in self .watchers .items ()}
0 commit comments