Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion runtimes/v1/azure_functions_runtime_v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
function_environment_reload_request,
invocation_request,
function_load_request)
from .utils.threadpool import (
start_threadpool_executor,
stop_threadpool_executor,
get_threadpool_executor,
)

__all__ = ('worker_init_request',
'functions_metadata_request',
'function_environment_reload_request',
'invocation_request',
'function_load_request')
'function_load_request',
'start_threadpool_executor',
'stop_threadpool_executor',
'get_threadpool_executor')
13 changes: 9 additions & 4 deletions runtimes/v1/azure_functions_runtime_v1/handle_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
WORKER_OPEN_TELEMETRY_ENABLED,
PYTHON_ENABLE_DEBUG_LOGGING)
from .utils.executor import get_current_loop, execute_async, run_sync_func
from .utils.threadpool import get_threadpool_executor
from .utils.app_setting_manager import is_envvar_true
from .utils.helpers import change_cwd, get_worker_metadata
from .utils.tracing import serialize_exception
Expand Down Expand Up @@ -144,10 +145,14 @@ async def invocation_request(request):
invoc_request = request.request.invocation_request
invocation_id = invoc_request.invocation_id
function_id = invoc_request.function_id
threadpool = request.properties.get("threadpool")
logger.debug("All variables obtained from proxy worker."
" Invocation ID: %s, Function ID: %s, Threadpool: %s",
invocation_id, function_id, threadpool)
threadpool = get_threadpool_executor()
logger.debug(
"Invocation context prepared. Invocation ID: %s, Function ID: %s, "
"Threadpool id: %s",
invocation_id,
function_id,
id(threadpool) if threadpool else None,
)

try:
fi: FunctionInfo = _functions.get_function(
Expand Down
86 changes: 86 additions & 0 deletions runtimes/v1/azure_functions_runtime_v1/utils/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from .app_setting_manager import get_app_setting
from .constants import (
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
)
from ..logging import logger

_threadpool_executor: Optional[ThreadPoolExecutor] = None


def _validate_thread_count(value: str) -> bool:
try:
int_value = int(value)
except ValueError:
logger.warning('%s must be an integer', PYTHON_THREADPOOL_THREAD_COUNT)
return False

if (int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN
or int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
logger.warning(
'%s must be set to a value between %s and %s. Reverting to '
'default value (%s).',
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
)
return False
return True


def _get_max_workers() -> Optional[int]:
threadpool_count = get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
validator=_validate_thread_count,
)
if threadpool_count is None:
return None
try:
return int(threadpool_count)
except (TypeError, ValueError) as e:
logger.warning(
'Failed to convert %s value "%s" to integer: %s',
PYTHON_THREADPOOL_THREAD_COUNT, threadpool_count, e
)
return None


def start_threadpool_executor() -> None:
global _threadpool_executor
max_workers = _get_max_workers()

if _threadpool_executor is not None:
try:
_threadpool_executor.shutdown(wait=False)
except Exception:
pass

_threadpool_executor = ThreadPoolExecutor(max_workers=max_workers)
logger.debug(
'Started threadpool executor (id=%s) with max_workers=%s',
id(_threadpool_executor),
max_workers,
)


def stop_threadpool_executor() -> None:
global _threadpool_executor
if _threadpool_executor is not None:
try:
_threadpool_executor.shutdown(wait=True)
logger.debug('Stopped threadpool executor (id=%s)',
id(_threadpool_executor))
finally:
_threadpool_executor = None


def get_threadpool_executor() -> Optional[ThreadPoolExecutor]:
return _threadpool_executor
6 changes: 4 additions & 2 deletions runtimes/v1/tests/unittests/test_code_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import unittest

ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent
ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent.parent


class TestCodeQuality(unittest.TestCase):
Expand All @@ -20,10 +20,12 @@ def test_flake8(self):
if not config_path.exists():
raise unittest.SkipTest('could not locate the .flake8 file')

project_path = pathlib.Path(ROOT_PATH,'runtimes','v1' ,'azure_functions_runtime_v1')

try:
subprocess.run(
[sys.executable, '-m', 'flake8', '--config', str(config_path),
'azure_functions_runtime_v1',],
project_path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
Expand Down
50 changes: 50 additions & 0 deletions runtimes/v1/tests/unittests/test_threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from azure_functions_runtime_v1.utils import threadpool as tp


def _reset(): # helper for clean state
if tp.get_threadpool_executor() is not None: # pragma: no cover - cleanup
tp.stop_threadpool_executor()


def test_start_and_get_threadpool():
_reset()
tp.start_threadpool_executor()
ex = tp.get_threadpool_executor()
assert ex is not None
first_id = id(ex)
tp.start_threadpool_executor() # restart replaces
ex2 = tp.get_threadpool_executor()
assert ex2 is not None and id(ex2) != first_id
_reset()


def test_stop_threadpool():
_reset()
tp.start_threadpool_executor()
assert tp.get_threadpool_executor() is not None
tp.stop_threadpool_executor()
assert tp.get_threadpool_executor() is None


def test_validate_thread_count_invalid(monkeypatch):
def fake_get_app_setting(setting, validator):
assert validator("not-int") is False
return "not-int"
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
assert tp._get_max_workers() is None


def test_validate_thread_count_range(monkeypatch):
def fake_get_app_setting(setting, validator):
assert validator("0") is False
return "0"
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
assert tp._get_max_workers() == 0


def test_max_workers_valid(monkeypatch):
def fake_get_app_setting(setting, validator):
assert validator("10") is True
return "10"
monkeypatch.setattr(tp, 'get_app_setting', fake_get_app_setting)
assert tp._get_max_workers() == 10
10 changes: 9 additions & 1 deletion runtimes/v2/azure_functions_runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,17 @@
function_environment_reload_request,
invocation_request,
function_load_request)
from .utils.threadpool import (
start_threadpool_executor,
stop_threadpool_executor,
get_threadpool_executor,
)

__all__ = ('worker_init_request',
'functions_metadata_request',
'function_environment_reload_request',
'invocation_request',
'function_load_request')
'function_load_request',
'start_threadpool_executor',
'stop_threadpool_executor',
'get_threadpool_executor')
14 changes: 10 additions & 4 deletions runtimes/v2/azure_functions_runtime/handle_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
WORKER_OPEN_TELEMETRY_ENABLED,
WORKER_STATUS)
from .utils.executor import get_current_loop, execute_async, run_sync_func
from .utils.threadpool import get_threadpool_executor
from .utils.helpers import change_cwd, get_sdk_version, get_worker_metadata
from .utils.tracing import serialize_exception
from .utils.validators import validate_script_file_name
Expand Down Expand Up @@ -165,10 +166,15 @@ async def invocation_request(request):
invocation_id = invoc_request.invocation_id
function_id = invoc_request.function_id
http_v2_enabled = False
threadpool = request.properties.get("threadpool")
logger.debug("All variables obtained from proxy worker."
" Invocation ID: %s, Function ID: %s, Threadpool: %s",
invocation_id, function_id, threadpool)

threadpool = get_threadpool_executor()
logger.debug(
"Invocation context prepared. Invocation ID: %s, Function ID: %s, "
"Threadpool id: %s",
invocation_id,
function_id,
id(threadpool) if threadpool else None,
)

try:
fi: FunctionInfo = _functions.get_function(
Expand Down
88 changes: 88 additions & 0 deletions runtimes/v2/azure_functions_runtime/utils/threadpool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from typing import Optional

from .app_setting_manager import get_app_setting
from .constants import (
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
)
from ..logging import logger

_threadpool_executor: Optional[ThreadPoolExecutor] = None


def _validate_thread_count(value: str) -> bool:
try:
int_value = int(value)
except ValueError:
logger.warning('%s must be an integer', PYTHON_THREADPOOL_THREAD_COUNT)
return False

if (int_value < PYTHON_THREADPOOL_THREAD_COUNT_MIN
or int_value > PYTHON_THREADPOOL_THREAD_COUNT_MAX):
logger.warning(
'%s must be set to a value between %s and %s. Reverting to '
'default value (%s).',
PYTHON_THREADPOOL_THREAD_COUNT,
PYTHON_THREADPOOL_THREAD_COUNT_MIN,
PYTHON_THREADPOOL_THREAD_COUNT_MAX,
PYTHON_THREADPOOL_THREAD_COUNT_DEFAULT,
)
return False
return True


def _get_max_workers() -> Optional[int]:
threadpool_count = get_app_setting(
setting=PYTHON_THREADPOOL_THREAD_COUNT,
validator=_validate_thread_count,
)
if threadpool_count is None:
return None
try:
return int(threadpool_count)
except (TypeError, ValueError) as e:
logger.warning(
'Failed to convert %s value "%s" to integer: %s',
PYTHON_THREADPOOL_THREAD_COUNT, threadpool_count, e
)
return None


def start_threadpool_executor() -> None:
global _threadpool_executor
max_workers = _get_max_workers()

if _threadpool_executor is not None:
try:
_threadpool_executor.shutdown(wait=False)
except Exception:
pass

_threadpool_executor = ThreadPoolExecutor(max_workers=max_workers)
logger.debug(
'Started threadpool executor (id=%s) with max_workers=%s',
id(_threadpool_executor),
max_workers,
)


def stop_threadpool_executor() -> None:
global _threadpool_executor
if _threadpool_executor is not None:
try:
_threadpool_executor.shutdown(wait=True)
logger.debug(
'Stopped threadpool executor (id=%s)',
id(_threadpool_executor)
)
finally:
_threadpool_executor = None


def get_threadpool_executor() -> Optional[ThreadPoolExecutor]:
return _threadpool_executor
8 changes: 4 additions & 4 deletions runtimes/v2/tests/unittests/test_code_quality.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import sys
import unittest

ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent

ROOT_PATH = pathlib.Path(__file__).parent.parent.parent.parent.parent

class TestCodeQuality(unittest.TestCase):

Expand All @@ -19,16 +18,17 @@ def test_flake8(self):
config_path = ROOT_PATH / '.flake8'
if not config_path.exists():
raise unittest.SkipTest('could not locate the .flake8 file')
project_path = pathlib.Path(ROOT_PATH,'runtimes','v2' ,'azure_functions_runtime')

try:
subprocess.run(
[sys.executable, '-m', 'flake8', '--config', str(config_path),
'azure_functions_runtime',],
project_path],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=str(ROOT_PATH))
except subprocess.CalledProcessError as ex:
output = ex.output.decode()
raise AssertionError(
'flake8 validation failed:\n%s', output) from None
f'flake8 validation failed:\n{output}') from None
Loading
Loading