|
31 | 31 |
|
32 | 32 | logger = logging.getLogger(__name__) |
33 | 33 |
|
| 34 | + |
34 | 35 | _MAX_CONSECUTIVE_REDIRECTS = 10 |
35 | 36 | _MONITOR_OAUTH_SCOPE = "https://monitor.azure.com//.default" |
36 | 37 | _requests_lock = threading.Lock() |
37 | 38 | _requests_map = {} |
38 | 39 | _REACHED_INGESTION_STATUS_CODES = (200, 206, 402, 408, 429, 439, 500) |
39 | 40 | REDIRECT_STATUS_CODES = (307, 308) |
40 | 41 | RETRYABLE_STATUS_CODES = ( |
| 42 | + 206, # Partial success |
41 | 43 | 401, # Unauthorized |
42 | 44 | 403, # Forbidden |
43 | 45 | 408, # Request Timeout |
44 | | - 429, # Too many requests |
| 46 | + 429, # Too Many Requests - retry after |
45 | 47 | 500, # Internal server error |
46 | 48 | 502, # Bad Gateway |
47 | 49 | 503, # Service unavailable |
48 | 50 | 504, # Gateway timeout |
49 | 51 | ) |
50 | | -THROTTLE_STATUS_CODES = (402, 439) |
| 52 | +THROTTLE_STATUS_CODES = ( |
| 53 | + 402, # Quota, too Many Requests over extended time |
| 54 | + 439, # Quota, too Many Requests over extended time (legacy) |
| 55 | +) |
51 | 56 |
|
52 | 57 |
|
53 | 58 | class TransportStatusCode: |
@@ -190,6 +195,46 @@ def _transmit(self, envelopes): |
190 | 195 | if self._check_stats_collection(): |
191 | 196 | _update_requests_map('success') |
192 | 197 | return TransportStatusCode.SUCCESS |
| 198 | + elif status_code == 206: # Partial Content |
| 199 | + data = None |
| 200 | + try: |
| 201 | + data = json.loads(text) |
| 202 | + except Exception as ex: |
| 203 | + if not self._is_stats_exporter(): |
| 204 | + logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501 |
| 205 | + if self._check_stats_collection(): |
| 206 | + _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 |
| 207 | + return TransportStatusCode.DROP |
| 208 | + if data: |
| 209 | + try: |
| 210 | + resend_envelopes = [] |
| 211 | + for error in data['errors']: |
| 212 | + if _status_code_is_retryable(error['statusCode']): |
| 213 | + resend_envelopes.append(envelopes[error['index']]) |
| 214 | + if self._check_stats_collection(): |
| 215 | + _update_requests_map('retry', value=error['statusCode']) # noqa: E501 |
| 216 | + else: |
| 217 | + if not self._is_stats_exporter(): |
| 218 | + logger.error( |
| 219 | + 'Data drop %s: %s %s.', |
| 220 | + error['statusCode'], |
| 221 | + error['message'], |
| 222 | + envelopes[error['index']], |
| 223 | + ) |
| 224 | + if self.storage and resend_envelopes: |
| 225 | + self.storage.put(resend_envelopes) |
| 226 | + except Exception as ex: |
| 227 | + if not self._is_stats_exporter(): |
| 228 | + logger.error( |
| 229 | + 'Error while processing %s: %s %s.', |
| 230 | + status_code, |
| 231 | + text, |
| 232 | + ex, |
| 233 | + ) |
| 234 | + if self._check_stats_collection(): |
| 235 | + _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 |
| 236 | + return TransportStatusCode.DROP |
| 237 | + # cannot parse response body, fallback to retry |
193 | 238 | elif _status_code_is_redirect(status_code): # Redirect |
194 | 239 | # for statsbeat, these are not tracked as success nor failures |
195 | 240 | self._consecutive_redirects += 1 |
@@ -255,45 +300,6 @@ def _transmit(self, envelopes): |
255 | 300 | if self._check_stats_collection(): |
256 | 301 | _update_requests_map('retry', value=status_code) |
257 | 302 | return TransportStatusCode.RETRY |
258 | | - elif status_code == 206: # Partial Content |
259 | | - data = None |
260 | | - try: |
261 | | - data = json.loads(text) |
262 | | - except Exception as ex: |
263 | | - if not self._is_stats_exporter(): |
264 | | - logger.warning('Error while reading response body %s for partial content.', ex) # noqa: E501 |
265 | | - if self._check_stats_collection(): |
266 | | - _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 |
267 | | - return TransportStatusCode.DROP |
268 | | - if data: |
269 | | - try: |
270 | | - resend_envelopes = [] |
271 | | - for error in data['errors']: |
272 | | - if _status_code_is_retryable(error['statusCode']): |
273 | | - resend_envelopes.append(envelopes[error['index']]) |
274 | | - if self._check_stats_collection(): |
275 | | - _update_requests_map('retry', value=error['statusCode']) # noqa: E501 |
276 | | - else: |
277 | | - logger.error( |
278 | | - 'Data drop %s: %s %s.', |
279 | | - error['statusCode'], |
280 | | - error['message'], |
281 | | - envelopes[error['index']], |
282 | | - ) |
283 | | - if self.storage and resend_envelopes: |
284 | | - self.storage.put(resend_envelopes) |
285 | | - except Exception as ex: |
286 | | - if not self._is_stats_exporter(): |
287 | | - logger.error( |
288 | | - 'Error while processing %s: %s %s.', |
289 | | - status_code, |
290 | | - text, |
291 | | - ex, |
292 | | - ) |
293 | | - if self._check_stats_collection(): |
294 | | - _update_requests_map('exception', value=ex.__class__.__name__) # noqa: E501 |
295 | | - return TransportStatusCode.DROP |
296 | | - # cannot parse response body, fallback to retry |
297 | 303 | else: |
298 | 304 | # 400 and 404 will be tracked as failure count |
299 | 305 | # 400 - Invalid - The server cannot or will not process the request due to the invalid telemetry (invalid data, iKey) # noqa: E501 |
@@ -332,21 +338,18 @@ def _statsbeat_failure_reached_threshold(): |
332 | 338 | return state.get_statsbeat_initial_failure_count() >= 3 |
333 | 339 |
|
334 | 340 |
|
335 | | -def _update_requests_map(type, value=None): |
336 | | - if value is None: |
337 | | - value = 0 |
| 341 | +def _update_requests_map(type_name, value=None): |
| 342 | + # value is either None, duration, status_code or exc_name |
338 | 343 | with _requests_lock: |
339 | | - if type == "count": |
340 | | - _requests_map['count'] = _requests_map.get('count', 0) + 1 # noqa: E501 |
341 | | - elif type == "duration": # value will be duration |
342 | | - _requests_map['duration'] = _requests_map.get('duration', 0) + value # noqa: E501 |
343 | | - elif type == "success": |
344 | | - _requests_map['success'] = _requests_map.get('success', 0) + 1 # noqa: E501 |
345 | | - else: |
346 | | - # value will be a key (status_code/error message) |
| 344 | + if type_name == "success" or type_name == "count": # success, count |
| 345 | + _requests_map[type_name] = _requests_map.get(type_name, 0) + 1 |
| 346 | + elif type_name == "duration": # value will be duration |
| 347 | + _requests_map[type_name] = _requests_map.get(type_name, 0) + value # noqa: E501 |
| 348 | + else: # exception, failure, retry, throttle |
| 349 | + # value will be a key (status_code/exc_name) |
347 | 350 | prev = 0 |
348 | | - if _requests_map.get(type): |
349 | | - prev = _requests_map.get(type).get(value, 0) |
| 351 | + if _requests_map.get(type_name): |
| 352 | + prev = _requests_map.get(type_name).get(value, 0) |
350 | 353 | else: |
351 | | - _requests_map[type] = {} |
352 | | - _requests_map[type][value] = prev + 1 |
| 354 | + _requests_map[type_name] = {} |
| 355 | + _requests_map[type_name][value] = prev + 1 |
0 commit comments