2
2
import math
3
3
import time
4
4
from concurrent .futures import ThreadPoolExecutor
5
-
5
+ import torch . multiprocessing as mp
6
6
from deepview_profile .analysis .runner import analyze_project
7
7
from deepview_profile .exceptions import AnalysisError
8
8
from deepview_profile .nvml import NVML
@@ -24,6 +24,7 @@ def __init__(self, enqueue_response, message_sender, connection_manager):
24
24
self ._message_sender = message_sender
25
25
self ._connection_manager = connection_manager
26
26
self ._nvml = NVML ()
27
+ mp .set_start_method ("spawn" )
27
28
28
29
def start (self ):
29
30
self ._nvml .start ()
@@ -52,18 +53,13 @@ def _handle_analysis_request(self, analysis_request, context):
52
53
context .sequence_number ,
53
54
* (context .address ),
54
55
)
55
- connection = self ._connection_manager .get_connection (context .address )
56
+ connection = self ._connection_manager .get_connection (
57
+ context .address )
56
58
analyzer = analyze_project (
57
59
connection .project_root , connection .entry_point , self ._nvml )
58
60
59
61
# Abort early if the connection has been closed
60
- if not context .state .connected :
61
- logger .error (
62
- 'Aborting request %d from (%s:%d) early '
63
- 'because the client has disconnected.' ,
64
- context .sequence_number ,
65
- * (context .address ),
66
- )
62
+ if self ._early_disconnection_error (context ):
67
63
return
68
64
69
65
breakdown = next (analyzer )
@@ -73,13 +69,7 @@ def _handle_analysis_request(self, analysis_request, context):
73
69
context ,
74
70
)
75
71
76
- if not context .state .connected :
77
- logger .error (
78
- 'Aborting request %d from (%s:%d) early '
79
- 'because the client has disconnected.' ,
80
- context .sequence_number ,
81
- * (context .address ),
82
- )
72
+ if self ._early_disconnection_error (context ):
83
73
return
84
74
85
75
throughput = next (analyzer )
@@ -90,13 +80,7 @@ def _handle_analysis_request(self, analysis_request, context):
90
80
)
91
81
92
82
# send habitat response
93
- if not context .state .connected :
94
- logger .error (
95
- 'Aborting request %d from (%s:%d) early '
96
- 'because the client has disconnected.' ,
97
- context .sequence_number ,
98
- * (context .address ),
99
- )
83
+ if self ._early_disconnection_error (context ):
100
84
return
101
85
102
86
habitat_resp = next (analyzer )
@@ -106,14 +90,19 @@ def _handle_analysis_request(self, analysis_request, context):
106
90
context ,
107
91
)
108
92
93
+ # send utilization data
94
+ if self ._early_disconnection_error (context ):
95
+ return
96
+
97
+ utilization_resp = next (analyzer )
98
+ self ._enqueue_response (
99
+ self ._send_utilization_response ,
100
+ utilization_resp ,
101
+ context
102
+ )
103
+
109
104
# send energy response
110
- if not context .state .connected :
111
- logger .error (
112
- 'Aborting request %d from (%s:%d) early '
113
- 'because the client has disconnected.' ,
114
- context .sequence_number ,
115
- * (context .address ),
116
- )
105
+ if self ._early_disconnection_error (context ):
117
106
return
118
107
119
108
energy_resp = next (analyzer )
@@ -198,3 +187,24 @@ def _send_energy_response(self, energy_resp, context):
198
187
except Exception :
199
188
logger .exception (
200
189
'Exception occurred when sending an energy response.' )
190
+
191
+ def _send_utilization_response (self , utilization_resp , context ):
192
+ # Called from the main executor. Do not call directly!
193
+ try :
194
+ self ._message_sender .send_utilization_response (
195
+ utilization_resp , context )
196
+ except Exception :
197
+ logger .exception (
198
+ 'Exception occurred when sending utilization response.' )
199
+
200
+ def _early_disconnection_error (self , context ):
201
+ if not context .state .connected :
202
+ logger .error (
203
+ 'Aborting request %d from (%s:%d) early '
204
+ 'because the client has disconnected.' ,
205
+ context .sequence_number ,
206
+ * (context .address ),
207
+ )
208
+ return True
209
+
210
+ return False
0 commit comments