Skip to content

Commit 89defbe

Browse files
authored
refactor: Moving threadpool configs to the library worker (#1741)
* Moving threadpool configs to the library worker * Added unit tests * Added log warning message * Flake8 fixes
1 parent 3a87cb0 commit 89defbe

File tree

12 files changed

+481
-83
lines changed

12 files changed

+481
-83
lines changed

runtimes/v1/azure_functions_runtime_v1/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,17 @@
66
function_environment_reload_request,
77
invocation_request,
88
function_load_request)
9+
from .utils.threadpool import (
10+
start_threadpool_executor,
11+
stop_threadpool_executor,
12+
get_threadpool_executor,
13+
)
914

1015
__all__ = ('worker_init_request',
1116
'functions_metadata_request',
1217
'function_environment_reload_request',
1318
'invocation_request',
14-
'function_load_request')
19+
'function_load_request',
20+
'start_threadpool_executor',
21+
'stop_threadpool_executor',
22+
'get_threadpool_executor')

runtimes/v1/azure_functions_runtime_v1/handle_event.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
WORKER_OPEN_TELEMETRY_ENABLED,
3333
PYTHON_ENABLE_DEBUG_LOGGING)
3434
from .utils.executor import get_current_loop, execute_async, run_sync_func
35+
from .utils.threadpool import get_threadpool_executor
3536
from .utils.app_setting_manager import is_envvar_true
3637
from .utils.helpers import change_cwd, get_worker_metadata
3738
from .utils.tracing import serialize_exception
@@ -144,10 +145,14 @@ async def invocation_request(request):
144145
invoc_request = request.request.invocation_request
145146
invocation_id = invoc_request.invocation_id
146147
function_id = invoc_request.function_id
147-
threadpool = request.properties.get("threadpool")
148-
logger.debug("All variables obtained from proxy worker."
149-
" Invocation ID: %s, Function ID: %s, Threadpool: %s",
150-
invocation_id, function_id, threadpool)
148+
threadpool = get_threadpool_executor()
149+
logger.debug(
150+
"Invocation context prepared. Invocation ID: %s, Function ID: %s, "
151+
"Threadpool id: %s",
152+
invocation_id,
153+
function_id,
154+
id(threadpool) if threadpool else None,
155+
)
151156

152157
try:
153158
fi: FunctionInfo = _functions.get_function(
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
from __future__ import annotations
2+
3+
from concurrent.futures import ThreadPoolExecutor
4+
from typing import Optional
5+
6+
from .app_setting_manager import get_app_setting
7+
from .constants import (
8+
PYTHON_THREADPOOL_THREAD_COUNT,
9+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
10+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
11+
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
12+
)
13+
from ..logging import logger
14+
15+
_threadpool_executor: Optional[ThreadPoolExecutor] = None
16+
17+
18+
def _validate_thread_count(value: str) -> bool:
19+
try:
20+
int_value = int(value)
21+
except ValueError:
22+
logger.warning('%s must be an integer', PYTHON_THREADPOOL_THREAD_COUNT)
23+
return False
24+
25+
if (int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN
26+
or int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
27+
logger.warning(
28+
'%s must be set to a value between %s and %s. Reverting to '
29+
'default value (%s).',
30+
PYTHON_THREADPOOL_THREAD_COUNT,
31+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
32+
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
33+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
34+
)
35+
return False
36+
return True
37+
38+
39+
def _get_max_workers() -> Optional[int]:
40+
threadpool_count = get_app_setting(
41+
setting=PYTHON_THREADPOOL_THREAD_COUNT,
42+
validator=_validate_thread_count,
43+
)
44+
if threadpool_count is None:
45+
return None
46+
try:
47+
return int(threadpool_count)
48+
except (TypeError, ValueError) as e:
49+
logger.warning(
50+
'Failed to convert %s value "%s" to integer: %s',
51+
PYTHON_THREADPOOL_THREAD_COUNT, threadpool_count, e
52+
)
53+
return None
54+
55+
56+
def start_threadpool_executor() -> None:
57+
global _threadpool_executor
58+
max_workers = _get_max_workers()
59+
60+
if _threadpool_executor is not None:
61+
try:
62+
_threadpool_executor.shutdown(wait=False)
63+
except Exception:
64+
pass
65+
66+
_threadpool_executor = ThreadPoolExecutor(max_workers=max_workers)
67+
logger.debug(
68+
'Started threadpool executor (id=%s) with max_workers=%s',
69+
id(_threadpool_executor),
70+
max_workers,
71+
)
72+
73+
74+
def stop_threadpool_executor() -> None:
75+
global _threadpool_executor
76+
if _threadpool_executor is not None:
77+
try:
78+
_threadpool_executor.shutdown(wait=True)
79+
logger.debug('Stopped threadpool executor (id=%s)',
80+
id(_threadpool_executor))
81+
finally:
82+
_threadpool_executor = None
83+
84+
85+
def get_threadpool_executor() -> Optional[ThreadPoolExecutor]:
86+
return _threadpool_executor

runtimes/v1/tests/unittests/test_code_quality.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import sys
66
import unittest
77

8-
ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent
8+
ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent.parent
99

1010

1111
class TestCodeQuality(unittest.TestCase):
@@ -20,10 +20,12 @@ def test_flake8(self):
2020
if not config_path.exists():
2121
raise unittest.SkipTest('could not locate the .flake8 file')
2222

23+
project_path = pathlib.Path(ROOT_PATH,'runtimes','v1' ,'azure_functions_runtime_v1')
24+
2325
try:
2426
subprocess.run(
2527
[sys.executable, '-m', 'flake8', '--config', str(config_path),
26-
'azure_functions_runtime_v1',],
28+
project_path],
2729
check=True,
2830
stdout=subprocess.PIPE,
2931
stderr=subprocess.PIPE,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from azure_functions_runtime_v1.utils import threadpool as tp
2+
3+
4+
def _reset(): # helper for clean state
5+
if tp.get_threadpool_executor() is not None: # pragma: no cover - cleanup
6+
tp.stop_threadpool_executor()
7+
8+
9+
def test_start_and_get_threadpool():
10+
_reset()
11+
tp.start_threadpool_executor()
12+
ex = tp.get_threadpool_executor()
13+
assert ex is not None
14+
first_id = id(ex)
15+
tp.start_threadpool_executor() # restart replaces
16+
ex2 = tp.get_threadpool_executor()
17+
assert ex2 is not None and id(ex2) != first_id
18+
_reset()
19+
20+
21+
def test_stop_threadpool():
22+
_reset()
23+
tp.start_threadpool_executor()
24+
assert tp.get_threadpool_executor() is not None
25+
tp.stop_threadpool_executor()
26+
assert tp.get_threadpool_executor() is None
27+
28+
29+
def test_validate_thread_count_invalid(monkeypatch):
30+
def fake_get_app_setting(setting, validator):
31+
assert validator("not-int") is False
32+
return "not-int"
33+
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
34+
assert tp._get_max_workers() is None
35+
36+
37+
def test_validate_thread_count_range(monkeypatch):
38+
def fake_get_app_setting(setting, validator):
39+
assert validator("0") is False
40+
return "0"
41+
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
42+
assert tp._get_max_workers() == 0
43+
44+
45+
def test_max_workers_valid(monkeypatch):
46+
def fake_get_app_setting(setting, validator):
47+
assert validator("10") is True
48+
return "10"
49+
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
50+
assert tp._get_max_workers() == 10

runtimes/v2/azure_functions_runtime/__init__.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,17 @@
55
function_environment_reload_request,
66
invocation_request,
77
function_load_request)
8+
from .utils.threadpool import (
9+
start_threadpool_executor,
10+
stop_threadpool_executor,
11+
get_threadpool_executor,
12+
)
813

914
__all__ = ('worker_init_request',
1015
'functions_metadata_request',
1116
'function_environment_reload_request',
1217
'invocation_request',
13-
'function_load_request')
18+
'function_load_request',
19+
'start_threadpool_executor',
20+
'stop_threadpool_executor',
21+
'get_threadpool_executor')

runtimes/v2/azure_functions_runtime/handle_event.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
WORKER_OPEN_TELEMETRY_ENABLED,
4848
WORKER_STATUS)
4949
from .utils.executor import get_current_loop, execute_async, run_sync_func
50+
from .utils.threadpool import get_threadpool_executor
5051
from .utils.helpers import change_cwd, get_sdk_version, get_worker_metadata
5152
from .utils.tracing import serialize_exception
5253
from .utils.validators import validate_script_file_name
@@ -165,10 +166,15 @@ async def invocation_request(request):
165166
invocation_id = invoc_request.invocation_id
166167
function_id = invoc_request.function_id
167168
http_v2_enabled = False
168-
threadpool = request.properties.get("threadpool")
169-
logger.debug("All variables obtained from proxy worker."
170-
" Invocation ID: %s, Function ID: %s, Threadpool: %s",
171-
invocation_id, function_id, threadpool)
169+
170+
threadpool = get_threadpool_executor()
171+
logger.debug(
172+
"Invocation context prepared. Invocation ID: %s, Function ID: %s, "
173+
"Threadpool id: %s",
174+
invocation_id,
175+
function_id,
176+
id(threadpool) if threadpool else None,
177+
)
172178

173179
try:
174180
fi: FunctionInfo = _functions.get_function(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
from __future__ import annotations
2+
3+
from concurrent.futures import ThreadPoolExecutor
4+
from typing import Optional
5+
6+
from .app_setting_manager import get_app_setting
7+
from .constants import (
8+
PYTHON_THREADPOOL_THREAD_COUNT,
9+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
10+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
11+
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
12+
)
13+
from ..logging import logger
14+
15+
_threadpool_executor: Optional[ThreadPoolExecutor] = None
16+
17+
18+
def _validate_thread_count(value: str) -> bool:
19+
try:
20+
int_value = int(value)
21+
except ValueError:
22+
logger.warning('%s must be an integer', PYTHON_THREADPOOL_THREAD_COUNT)
23+
return False
24+
25+
if (int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN
26+
or int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
27+
logger.warning(
28+
'%s must be set to a value between %s and %s. Reverting to '
29+
'default value (%s).',
30+
PYTHON_THREADPOOL_THREAD_COUNT,
31+
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
32+
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
33+
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
34+
)
35+
return False
36+
return True
37+
38+
39+
def _get_max_workers() -> Optional[int]:
40+
threadpool_count = get_app_setting(
41+
setting=PYTHON_THREADPOOL_THREAD_COUNT,
42+
validator=_validate_thread_count,
43+
)
44+
if threadpool_count is None:
45+
return None
46+
try:
47+
return int(threadpool_count)
48+
except (TypeError, ValueError) as e:
49+
logger.warning(
50+
'Failed to convert %s value "%s" to integer: %s',
51+
PYTHON_THREADPOOL_THREAD_COUNT, threadpool_count, e
52+
)
53+
return None
54+
55+
56+
def start_threadpool_executor() -> None:
57+
global _threadpool_executor
58+
max_workers = _get_max_workers()
59+
60+
if _threadpool_executor is not None:
61+
try:
62+
_threadpool_executor.shutdown(wait=False)
63+
except Exception:
64+
pass
65+
66+
_threadpool_executor = ThreadPoolExecutor(max_workers=max_workers)
67+
logger.debug(
68+
'Started threadpool executor (id=%s) with max_workers=%s',
69+
id(_threadpool_executor),
70+
max_workers,
71+
)
72+
73+
74+
def stop_threadpool_executor() -> None:
75+
global _threadpool_executor
76+
if _threadpool_executor is not None:
77+
try:
78+
_threadpool_executor.shutdown(wait=True)
79+
logger.debug(
80+
'Stopped threadpool executor (id=%s)',
81+
id(_threadpool_executor)
82+
)
83+
finally:
84+
_threadpool_executor = None
85+
86+
87+
def get_threadpool_executor() -> Optional[ThreadPoolExecutor]:
88+
return _threadpool_executor

runtimes/v2/tests/unittests/test_code_quality.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
import sys
66
import unittest
77

8-
ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent
9-
8+
ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent.parent
109

1110
class TestCodeQuality(unittest.TestCase):
1211

@@ -19,16 +18,17 @@ def test_flake8(self):
1918
config_path = ROOT_PATH / '.flake8'
2019
if not config_path.exists():
2120
raise unittest.SkipTest('could not locate the .flake8 file')
21+
project_path = pathlib.Path(ROOT_PATH,'runtimes','v2' ,'azure_functions_runtime')
2222

2323
try:
2424
subprocess.run(
2525
[sys.executable, '-m', 'flake8', '--config', str(config_path),
26-
'azure_functions_runtime',],
26+
project_path],
2727
check=True,
2828
stdout=subprocess.PIPE,
2929
stderr=subprocess.PIPE,
3030
cwd=str(ROOT_PATH))
3131
except subprocess.CalledProcessError as ex:
3232
output = ex.output.decode()
3333
raise AssertionError(
34-
'flake8 validation failed:\n%s', output) from None
34+
f'flake8 validation failed:\n{output}') from None

0 commit comments

Comments
 (0)