diff --git a/benchmarks/kernels/benchmark_w8a8_block_fp8.py b/benchmarks/kernels/benchmark_w8a8_block_fp8.py index 4fcdbadd65e..30d9d1ec80f 100644 --- a/benchmarks/kernels/benchmark_w8a8_block_fp8.py +++ b/benchmarks/kernels/benchmark_w8a8_block_fp8.py @@ -12,12 +12,12 @@ import torch import tqdm -import triton from vllm.model_executor.layers.quantization.utils.fp8_utils import ( _w8a8_block_fp8_matmul, ) from vllm.platforms import current_platform +from vllm.triton_utils import triton from vllm.utils import FlexibleArgumentParser mp.set_start_method("spawn", force=True) diff --git a/tests/conftest.py b/tests/conftest.py index f50e611a471..728aa2a004d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1053,8 +1053,7 @@ def score( return [req_output.outputs.score for req_output in req_outputs] def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: - executor = self.model.llm_engine.model_executor - return executor.apply_model(func) + return self.model.apply_model(func) def __enter__(self): return self diff --git a/tests/models/multimodal/generation/test_qwen2_vl.py b/tests/models/multimodal/generation/test_qwen2_vl.py index a2793b8c8dd..7c806033eb9 100644 --- a/tests/models/multimodal/generation/test_qwen2_vl.py +++ b/tests/models/multimodal/generation/test_qwen2_vl.py @@ -17,11 +17,9 @@ @pytest.fixture(scope="function", autouse=True) -def use_v0_only(monkeypatch): - """ - V1 Test: batch_make_xxxxx_embeddings calls a V0 internal - """ - monkeypatch.setenv('VLLM_USE_V1', '0') +def enable_pickle(monkeypatch): + """`LLM.apply_model` requires pickling a function.""" + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") models = ["Qwen/Qwen2-VL-2B-Instruct"] @@ -126,9 +124,8 @@ def get_image_embeds(model): image_grid_thw_on_device = image_grid_thw.to(visual.device, dtype=torch.int64) return visual(pixel_values_on_device, - grid_thw=image_grid_thw_on_device) + grid_thw=image_grid_thw_on_device).cpu() - # V1 Test: this calls a V0 internal. image_embeds = torch.concat(llm.apply_model(get_image_embeds)) # split into original batches @@ -210,7 +207,7 @@ def get_image_embeds(model): video_grid_thw_on_device = video_grid_thw.to(visual.device, dtype=torch.int64) return visual(pixel_values_on_device, - grid_thw=video_grid_thw_on_device) + grid_thw=video_grid_thw_on_device).cpu() # V1 Test: this calls a V0 internal. video_embeds = torch.concat(llm.apply_model(get_image_embeds)) @@ -329,9 +326,13 @@ def run_embedding_input_test( @pytest.mark.parametrize("max_tokens", [128]) @pytest.mark.parametrize("num_logprobs", [10]) def test_qwen2_vl_image_embeddings_input(vllm_runner, image_assets, model, - size_factors, dtype: str, - max_tokens: int, - num_logprobs: int) -> None: + size_factors, dtype, max_tokens, + num_logprobs, monkeypatch) -> None: + + # Test V1: this test hangs after the first generate_greedy_logprobs call + # TODO: figure out why and re-enable this on V1. + monkeypatch.setenv("VLLM_USE_V1", "0") + images = [asset.pil_image for asset in image_assets] inputs_per_case: list[tuple[ diff --git a/tests/models/quantization/test_awq.py b/tests/models/quantization/test_awq.py index bd696198931..7005e435ecf 100644 --- a/tests/models/quantization/test_awq.py +++ b/tests/models/quantization/test_awq.py @@ -112,7 +112,7 @@ def test_awq_models(vllm_runner, image_assets, source_model, quant_model, monkeypatch) -> None: # Test V1: this test hangs during setup on single-scale input. - # TODO: fixure out why and re-enable this on V1. + # TODO: figure out why and re-enable this on V1. monkeypatch.setenv("VLLM_USE_V1", "0") run_awq_test( vllm_runner, diff --git a/tests/quantization/test_compressed_tensors.py b/tests/quantization/test_compressed_tensors.py index 516bf451381..72a16ec1368 100644 --- a/tests/quantization/test_compressed_tensors.py +++ b/tests/quantization/test_compressed_tensors.py @@ -41,11 +41,9 @@ @pytest.fixture(scope="function", autouse=True) -def use_v0_only(monkeypatch): - """ - This module relies on V0 internals, so set VLLM_USE_V1=0. - """ - monkeypatch.setenv('VLLM_USE_V1', '0') +def enable_pickle(monkeypatch): + """`LLM.apply_model` requires pickling a function.""" + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") @pytest.mark.parametrize( diff --git a/tests/quantization/test_fp8.py b/tests/quantization/test_fp8.py index e5ab7b3dd3c..3848f54b790 100644 --- a/tests/quantization/test_fp8.py +++ b/tests/quantization/test_fp8.py @@ -61,8 +61,8 @@ def test_kv_cache_model_load_and_run(vllm_runner, model_id: str, if use_rocm_aiter: monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1") - # vllm_runner.apply_model() relies on V0 internals. - monkeypatch.setenv("VLLM_USE_V1", "0") + # `LLM.apply_model` requires pickling a function. + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") with vllm_runner(model_id, kv_cache_dtype="fp8") as llm: def check_model(model): @@ -106,8 +106,8 @@ def test_load_fp16_model(vllm_runner, kv_cache_dtype: str, force_marlin: bool, if use_rocm_aiter: monkeypatch.setenv("VLLM_ROCM_USE_AITER", "1") - # vllm_runner.apply_model() relies on V0 internals. - monkeypatch.setenv("VLLM_USE_V1", "0") + # `LLM.apply_model` requires pickling a function. + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") if force_marlin: monkeypatch.setenv("VLLM_TEST_FORCE_FP8_MARLIN", "1") diff --git a/tests/quantization/test_gptq_dynamic.py b/tests/quantization/test_gptq_dynamic.py index 23b999e7c67..00a5946ed01 100644 --- a/tests/quantization/test_gptq_dynamic.py +++ b/tests/quantization/test_gptq_dynamic.py @@ -31,41 +31,46 @@ @pytest.mark.parametrize("model_id, use_marlin_kernel", MODEL_QUANT) def test_gptq_with_dynamic(vllm_runner, model_id: str, use_marlin_kernel: bool, monkeypatch): - # vllm_runner.apply_model() relies on V0 internals. - monkeypatch.setenv("VLLM_USE_V1", "0") - - vllm_model = vllm_runner(model_id, dtype=torch.float16, max_model_len=2048) + # `LLM.apply_model` requires pickling a function. + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") linear_method_cls = GPTQMarlinLinearMethod if use_marlin_kernel else ( GPTQLinearMethod) - for name, submodule in (vllm_model.model.llm_engine.model_executor. - driver_worker.model_runner.model.named_modules()): - if name == "lm_head": - assert isinstance(submodule.quant_method, linear_method_cls) - elif name == 'model.layers.0.self_attn.qkv_proj': - # The first layer is quantized using bits=4, group_size=128 - # desc_act=True - assert isinstance(submodule.quant_method, linear_method_cls) - config = submodule.quant_method.quant_config - assert config.weight_bits == 4 - assert config.group_size == 128 - assert config.desc_act - elif name == 'model.layers.1.self_attn.qkv_proj': - # The second layer is quantized using bits=8, group_size=32 - # desc_act=False - assert isinstance(submodule.quant_method, linear_method_cls) - config = submodule.quant_method.quant_config - assert get_dynamic_override(config, layer_name=name, - key="bits") == 8 - assert get_dynamic_override(config, - layer_name=name, - key="group_size") == 32 - assert not get_dynamic_override( - config, layer_name=name, key="desc_act") - elif (name == 'model.layers.2.self_attn.qkv_proj' - or name == 'model.layers.2.mlp.gate_up_proj'): - # All other layers (layer index >= 2) are not quantized - assert isinstance(submodule.quant_method, UnquantizedLinearMethod) + with vllm_runner(model_id, dtype=torch.float16, max_model_len=2048) as llm: + + def check_model(model): + for name, submodule in model.named_modules(): + if name == "lm_head": + assert isinstance(submodule.quant_method, + linear_method_cls) + elif name == 'model.layers.0.self_attn.qkv_proj': + # The first layer is quantized using bits=4, group_size=128 + # desc_act=True + assert isinstance(submodule.quant_method, + linear_method_cls) + config = submodule.quant_method.quant_config + assert config.weight_bits == 4 + assert config.group_size == 128 + assert config.desc_act + elif name == 'model.layers.1.self_attn.qkv_proj': + # The second layer is quantized using bits=8, group_size=32 + # desc_act=False + assert isinstance(submodule.quant_method, + linear_method_cls) + config = submodule.quant_method.quant_config + assert get_dynamic_override(config, + layer_name=name, + key="bits") == 8 + assert get_dynamic_override(config, + layer_name=name, + key="group_size") == 32 + assert not get_dynamic_override( + config, layer_name=name, key="desc_act") + elif (name == 'model.layers.2.self_attn.qkv_proj' + or name == 'model.layers.2.mlp.gate_up_proj'): + # All other layers (layer index >= 2) are not quantized + assert isinstance(submodule.quant_method, + UnquantizedLinearMethod) - del vllm_model + llm.apply_model(check_model) diff --git a/tests/quantization/test_lm_head.py b/tests/quantization/test_lm_head.py index 11f78a23bb4..8bd9ab382cb 100644 --- a/tests/quantization/test_lm_head.py +++ b/tests/quantization/test_lm_head.py @@ -32,8 +32,8 @@ def test_lm_head( lm_head_quantized: bool, monkeypatch, ) -> None: - # vllm_runner.apply_model() relies on V0 internals. - monkeypatch.setenv("VLLM_USE_V1", "0") + # `LLM.apply_model` requires pickling a function. + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") with vllm_runner(model_id, dtype=torch.float16, max_model_len=2048) as vllm_model: diff --git a/tests/quantization/test_ptpc_fp8.py b/tests/quantization/test_ptpc_fp8.py index 5f78bc30504..088b68510cf 100644 --- a/tests/quantization/test_ptpc_fp8.py +++ b/tests/quantization/test_ptpc_fp8.py @@ -13,6 +13,16 @@ PTPCFp8LinearMethod) from vllm.platforms import current_platform +UNSUPPORTED_STR = ( + "Currently torch._scaled_mm (hipBLASLt) rowwise gemm only " + "support output dtype of bfloat16. torch.float16 is specified.") + + +@pytest.fixture(scope="function", autouse=True) +def enable_pickle(monkeypatch): + """`LLM.apply_model` requires pickling a function.""" + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") + @pytest.mark.skipif(not is_quant_method_supported("ptpc_fp8"), reason="PTPC FP8 is not supported on this GPU type.") @@ -21,14 +31,22 @@ @pytest.mark.parametrize("dtype", ["auto", "bfloat16", "float16"]) @pytest.mark.parametrize("kv_cache_dtype", ["auto", "fp8", "fp8_e4m3"]) def test_ptpc_fp8_rocm(vllm_runner, dtype: str, kv_cache_dtype: str) -> None: - try: - with vllm_runner("facebook/opt-125m", - dtype=dtype, - quantization="ptpc_fp8", - kv_cache_dtype=kv_cache_dtype) as llm: + llm = vllm_runner("facebook/opt-125m", + dtype=dtype, + quantization="ptpc_fp8", + kv_cache_dtype=kv_cache_dtype) + except AssertionError as e: + if str(e) == UNSUPPORTED_STR: + # If the error message matches, the test passes + return + else: + # If the error message does not match, re-raise the exception + raise + + with llm: - model = llm.model.llm_engine.model_executor.driver_worker.model_runner.model # noqa: E501 + def check_model(model): fc1 = model.model.decoder.layers[0].fc1 assert isinstance(fc1.quant_method, PTPCFp8LinearMethod) if kv_cache_dtype == "ptpc_fp8": @@ -40,17 +58,8 @@ def test_ptpc_fp8_rocm(vllm_runner, dtype: str, kv_cache_dtype: str) -> None: if current_platform.has_device_capability(94): # For GPUs with hardware support, we keep weights in fp8 assert fc1.weight.dtype == torch.float8_e4m3fnuz - else: - pytest.skip() - output = llm.generate_greedy("Hello my name is", max_tokens=20) - assert output - except AssertionError as e: - if str( - e - ) == "Currently torch._scaled_mm (hipBLASLt) rowwise gemm only support output dtype of bfloat16. torch.float16 is specified.": # noqa: E501 - # If the error message matches, the test passes - pass - else: - # If the error message does not match, re-raise the exception - raise + llm.apply_model(check_model) + + output = llm.generate_greedy("Hello my name is", max_tokens=20) + assert output diff --git a/tests/quantization/test_quark.py b/tests/quantization/test_quark.py index 3571f773fb0..a89693c461d 100644 --- a/tests/quantization/test_quark.py +++ b/tests/quantization/test_quark.py @@ -14,11 +14,9 @@ @pytest.fixture(scope="function", autouse=True) -def use_v0_only(monkeypatch): - """ - This module relies on V0 internals, so set VLLM_USE_V1=0. - """ - monkeypatch.setenv('VLLM_USE_V1', '0') +def enable_pickle(monkeypatch): + """`LLM.apply_model` requires pickling a function.""" + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") @pytest.mark.parametrize('kv_cache_dtype', ['auto', 'fp8']) @@ -78,13 +76,12 @@ def test_quark_fp8_parity(vllm_runner): } with (vllm_runner(quark_model_id, **llm_kwargs) as quark_handle, vllm_runner(fp8_model_id, **llm_kwargs) as fp8_handle): - quark_model = (quark_handle.model.llm_engine.model_executor. - driver_worker.model_runner.model) - quark_state_dict = quark_model.state_dict() - fp8_model = (fp8_handle.model.llm_engine.model_executor.driver_worker. - model_runner.model) - fp8_state_dict = fp8_model.state_dict() + def get_state_dict(model): + return {k: v.cpu() for k, v in model.state_dict().items()} + + quark_state_dict, = quark_handle.apply_model(get_state_dict) + fp8_state_dict, = fp8_handle.apply_model(get_state_dict) assert fp8_state_dict.keys() == quark_state_dict.keys() diff --git a/tests/quantization/test_register_quantization_config.py b/tests/quantization/test_register_quantization_config.py index 42081a8c68c..8231a15790b 100644 --- a/tests/quantization/test_register_quantization_config.py +++ b/tests/quantization/test_register_quantization_config.py @@ -104,18 +104,21 @@ def test_register_quantization_config(): ]) def test_custom_quant(vllm_runner, model, monkeypatch): """Test infer with the custom quantization method.""" - # vllm_runner.apply_model() relies on V0 internals. - monkeypatch.setenv("VLLM_USE_V1", "0") + # `LLM.apply_model` requires pickling a function. + monkeypatch.setenv("VLLM_ALLOW_INSECURE_SERIALIZATION", "1") + with vllm_runner(model_name=model, quantization="custom_quant", enforce_eager=True) as llm: - model = llm.model.llm_engine.model_executor.driver_worker.model_runner.model # noqa: E501 - layer = model.model.layers[0] - qkv_proj = layer.self_attn.qkv_proj + def check_model(model): + layer = model.model.layers[0] + qkv_proj = layer.self_attn.qkv_proj + + # Check the quantization method is FakeQuantLinearMethod + assert isinstance(qkv_proj.quant_method, FakeQuantLinearMethod) - # Check the quantization method is FakeQuantLinearMethod - assert isinstance(qkv_proj.quant_method, FakeQuantLinearMethod) + output = llm.generate_greedy("Hello my name is", max_tokens=20) + assert output - output = llm.generate_greedy("Hello my name is", max_tokens=20) - assert output + llm.apply_model(check_model) diff --git a/vllm/engine/llm_engine.py b/vllm/engine/llm_engine.py index 8fccf9bd2aa..6c1fe1517a8 100644 --- a/vllm/engine/llm_engine.py +++ b/vllm/engine/llm_engine.py @@ -14,6 +14,7 @@ from typing import Set, Type, Union, cast import torch +import torch.nn as nn from typing_extensions import TypeVar import vllm.envs as envs @@ -61,6 +62,7 @@ from vllm.utils import Counter, Device, resolve_obj_by_qualname, weak_bind from vllm.version import __version__ as VLLM_VERSION from vllm.worker.model_runner_base import InputProcessingError +from vllm.worker.worker_base import WorkerBase logger = init_logger(__name__) _LOCAL_LOGGING_INTERVAL_SEC = 5 @@ -2084,13 +2086,16 @@ def _build_logits_processors( return sampling_params def collective_rpc(self, - method: Union[str, Callable[..., _R]], + method: Union[str, Callable[[WorkerBase], _R]], timeout: Optional[float] = None, args: tuple = (), kwargs: Optional[dict[str, Any]] = None) -> list[_R]: return self.model_executor.collective_rpc(method, timeout, args, kwargs) + def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: + return self.collective_rpc("apply_model", args=(func, )) + if envs.is_set("VLLM_USE_V1") and envs.VLLM_USE_V1: from vllm.v1.engine.llm_engine import LLMEngine as V1LLMEngine diff --git a/vllm/entrypoints/llm.py b/vllm/entrypoints/llm.py index 05e0be61ada..9c1e7b843e4 100644 --- a/vllm/entrypoints/llm.py +++ b/vllm/entrypoints/llm.py @@ -528,9 +528,14 @@ def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: """ Run a function directly on the model inside each worker, returning the result for each of them. + + !!! warning + To reduce the overhead of data transfer, avoid returning large + arrays or tensors from this method. If you must return them, + make sure you move them to CPU first to avoid taking up additional + VRAM! """ - executor = self.llm_engine.model_executor - return executor.apply_model(func) + return self.llm_engine.apply_model(func) def _get_beam_search_lora_requests( self, diff --git a/vllm/executor/executor_base.py b/vllm/executor/executor_base.py index 99e12201c96..579739cfeb7 100644 --- a/vllm/executor/executor_base.py +++ b/vllm/executor/executor_base.py @@ -4,11 +4,10 @@ import asyncio import time from abc import ABC, abstractmethod -from typing import (Any, Awaitable, Callable, Dict, List, Optional, Set, Tuple, - Union) +from typing import Any, Awaitable, Callable, List, Optional, Set, Union import torch.nn as nn -from typing_extensions import TypeVar +from typing_extensions import TypeVar, deprecated import vllm.platforms from vllm.config import VllmConfig @@ -60,10 +59,10 @@ def _init_executor(self) -> None: @abstractmethod def collective_rpc(self, - method: Union[str, Callable[..., _R]], + method: Union[str, Callable[[WorkerBase], _R]], timeout: Optional[float] = None, - args: Tuple = (), - kwargs: Optional[Dict[str, Any]] = None) -> List[_R]: + args: tuple = (), + kwargs: Optional[dict[str, Any]] = None) -> list[_R]: """ Execute an RPC call on all workers. @@ -88,7 +87,7 @@ def collective_rpc(self, """ raise NotImplementedError - def determine_num_available_blocks(self) -> Tuple[int, int]: + def determine_num_available_blocks(self) -> tuple[int, int]: """Determine the number of available blocks for the GPU KV cache and swappable CPU KV cache. @@ -96,9 +95,10 @@ def determine_num_available_blocks(self) -> Tuple[int, int]: ExecutorBase may require modification of the result, e.g. to ensure the selected cache sizes are compatible with all workers. - Returns a Tuple[num_gpu_blocks, num_cpu_blocks], where num_gpu_blocks - are blocks that are "active" on the device and can be appended to. - num_cpu_blocks refers to "swapped" blocks in CPU memory and cannot be + Returns a tuple `(num_gpu_blocks, num_cpu_blocks)`, where + `num_gpu_blocks` are blocks that are "active" on the device and can be + appended to. + `num_cpu_blocks` refers to "swapped" blocks in CPU memory and cannot be appended to. """ results = self.collective_rpc("determine_num_available_blocks") @@ -124,16 +124,15 @@ def initialize_cache(self, num_gpu_blocks: int, num_cpu_blocks) -> None: self.collective_rpc("initialize_cache", args=(num_gpu_blocks, num_cpu_blocks)) + @deprecated("`llm_engine.model_executor.apply_model` will no longer work " + "in V1 Engine. Please replace with `llm_engine.apply_model` " + "and set `VLLM_ALLOW_INSECURE_SERIALIZATION=1`.") def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: """ Run a function directly on the model inside each worker, returning the result for each of them. """ - - def rpc_func(worker: WorkerBase) -> _R: - return func(worker.get_model()) - - return self.collective_rpc(rpc_func) + return self.collective_rpc("apply_model", args=(func, )) def execute_model( self, execute_model_req: ExecuteModelRequest @@ -327,8 +326,8 @@ def _driver_execute_model( def collective_rpc(self, method: Union[str, Callable], timeout: Optional[float] = None, - args: Tuple = (), - kwargs: Optional[Dict] = None) -> List[Any]: + args: tuple = (), + kwargs: Optional[dict[str, Any]] = None) -> list[Any]: return self._run_workers(method, *args, **(kwargs or {})) @abstractmethod diff --git a/vllm/v1/engine/llm_engine.py b/vllm/v1/engine/llm_engine.py index 1932cd10bb1..ba1cc6a4c56 100644 --- a/vllm/v1/engine/llm_engine.py +++ b/vllm/v1/engine/llm_engine.py @@ -5,6 +5,7 @@ from copy import copy from typing import Any, Callable, Optional, Union +import torch.nn as nn from typing_extensions import TypeVar import vllm.envs as envs @@ -32,6 +33,7 @@ StatLoggerFactory) from vllm.v1.metrics.reader import Metric, get_metrics_snapshot from vllm.v1.metrics.stats import IterationStats +from vllm.v1.worker.worker_base import WorkerBase logger = init_logger(__name__) @@ -306,12 +308,15 @@ def pin_lora(self, lora_id: int) -> bool: return self.engine_core.pin_lora(lora_id) def collective_rpc(self, - method: Union[str, Callable[..., _R]], + method: Union[str, Callable[[WorkerBase], _R]], timeout: Optional[float] = None, args: tuple = (), kwargs: Optional[dict[str, Any]] = None) -> list[_R]: return self.engine_core.collective_rpc(method, timeout, args, kwargs) + def apply_model(self, func: Callable[[nn.Module], _R]) -> list[_R]: + return self.collective_rpc("apply_model", args=(func, )) + def __del__(self): if dp_group := getattr(self, "dp_group", None): stateless_destroy_torch_distributed_process_group(dp_group) diff --git a/vllm/worker/worker_base.py b/vllm/worker/worker_base.py index c382b29ad19..35426ab0bca 100644 --- a/vllm/worker/worker_base.py +++ b/vllm/worker/worker_base.py @@ -5,7 +5,8 @@ import os import time from abc import abstractmethod -from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union +from typing import (Any, Callable, Dict, List, Optional, Set, Tuple, Type, + TypeVar, Union) import cloudpickle import torch @@ -28,6 +29,8 @@ logger = init_logger(__name__) +_R = TypeVar("_R") + @warn_for_unimplemented_methods class WorkerBase: @@ -71,6 +74,10 @@ def initialize_cache(self, num_gpu_blocks: int, def get_model(self) -> nn.Module: raise NotImplementedError + def apply_model(self, fn: Callable[[nn.Module], _R]) -> _R: + """Apply a function on the model inside this worker.""" + return fn(self.get_model()) + def load_model(self) -> None: """Load model onto target device.""" raise NotImplementedError