diff --git a/tests/test_web/test_webapi.py b/tests/test_web/test_webapi.py index 60d06b9311..e708283a50 100644 --- a/tests/test_web/test_webapi.py +++ b/tests/test_web/test_webapi.py @@ -673,7 +673,7 @@ def track_to_file(self, fname): return original_to_file(self, fname) # mock start to interrupt run() after upload and to_file - def mock_start_interrupt(self): + def mock_start_interrupt(self, *args, **kwargs): # at this point, upload() and to_file() should have been called assert batch_file_saved["saved"], "Batch file should be saved before start()" assert batch_file_saved["has_task_ids"], "Batch file should have task_ids" diff --git a/tidy3d/web/api/asynchronous.py b/tidy3d/web/api/asynchronous.py index 37f625c966..8beef901b8 100644 --- a/tidy3d/web/api/asynchronous.py +++ b/tidy3d/web/api/asynchronous.py @@ -23,6 +23,7 @@ def run_async( parent_tasks: Optional[dict[str, list[str]]] = None, reduce_simulation: Literal["auto", True, False] = "auto", pay_type: Union[PayType, str] = PayType.AUTO, + priority: Optional[int] = None, ) -> BatchData: """Submits a set of Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`] objects to server, starts running, monitors progress, downloads, and loads results as a :class:`.BatchData` object. @@ -52,7 +53,9 @@ def run_async( Whether to reduce structures in the simulation to the simulation domain only. Note: currently only implemented for the mode solver. pay_type: Union[PayType, str] = PayType.AUTO Specify the payment method. - + priority: int = None + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Returns ------ :class:`BatchData` @@ -90,5 +93,5 @@ def run_async( pay_type=pay_type, ) - batch_data = batch.run(path_dir=path_dir) + batch_data = batch.run(path_dir=path_dir, priority=priority) return batch_data diff --git a/tidy3d/web/api/autograd/autograd.py b/tidy3d/web/api/autograd/autograd.py index f11f0446cb..0190dfb595 100644 --- a/tidy3d/web/api/autograd/autograd.py +++ b/tidy3d/web/api/autograd/autograd.py @@ -218,6 +218,7 @@ def run( local_gradient=local_gradient, max_num_adjoint_per_fwd=max_num_adjoint_per_fwd, pay_type=pay_type, + priority=priority, ) return run_webapi( @@ -253,6 +254,7 @@ def run_async( max_num_adjoint_per_fwd: int = MAX_NUM_ADJOINT_PER_FWD, reduce_simulation: typing.Literal["auto", True, False] = "auto", pay_type: typing.Union[PayType, str] = PayType.AUTO, + priority: typing.Optional[int] = None, ) -> BatchData: """Submits a set of Union[:class:`.Simulation`, :class:`.HeatSimulation`, :class:`.EMESimulation`] objects to server, starts running, monitors progress, downloads, and loads results as a :class:`.BatchData` object. @@ -303,6 +305,10 @@ def run_async( :class:`Batch` Interface for submitting several :class:`Simulation` objects to sever. """ + # validate priority if specified + if priority is not None and (priority < 1 or priority > 10): + raise ValueError("Priority must be between '1' and '10' if specified.") + if is_valid_for_autograd_async(simulations): return _run_async( simulations=simulations, @@ -317,6 +323,7 @@ def run_async( local_gradient=local_gradient, max_num_adjoint_per_fwd=max_num_adjoint_per_fwd, pay_type=pay_type, + priority=priority, ) return run_async_webapi( @@ -331,6 +338,7 @@ def run_async( parent_tasks=parent_tasks, reduce_simulation=reduce_simulation, pay_type=pay_type, + priority=priority, ) @@ -1272,10 +1280,11 @@ def _run_tidy3d( verbose = run_kwargs.get("verbose", False) upload_sim_fields_keys(run_kwargs["sim_fields_keys"], task_id=job.task_id, verbose=verbose) path = run_kwargs.get("path", DEFAULT_DATA_PATH) + priority = run_kwargs.get("priority") if task_name.endswith("_adjoint"): path_parts = basename(path).split(".") path = join(dirname(path), path_parts[0] + "_adjoint." + ".".join(path_parts[1:])) - data = job.run(path) + data = job.run(path, priority=priority) return data, job.task_id @@ -1286,6 +1295,7 @@ def _run_async_tidy3d( batch_init_kwargs = parse_run_kwargs(**run_kwargs) path_dir = run_kwargs.pop("path_dir", None) + priority = run_kwargs.get("priority") batch = Batch(simulations=simulations, **batch_init_kwargs) td.log.info(f"running {batch.simulation_type} batch with '_run_async_tidy3d()'") @@ -1305,9 +1315,9 @@ def _run_async_tidy3d( upload_sim_fields_keys(sim_fields_keys, task_id=task_id, verbose=verbose) if path_dir: - batch_data = batch.run(path_dir) + batch_data = batch.run(path_dir, priority=priority) else: - batch_data = batch.run() + batch_data = batch.run(priority=priority) task_ids = {key: job.task_id for key, job in batch.jobs.items()} return batch_data, task_ids @@ -1324,7 +1334,8 @@ def _run_async_tidy3d_bwd( batch = Batch(simulations=simulations, **batch_init_kwargs) td.log.info(f"running {batch.simulation_type} batch with '_run_async_tidy3d_bwd()'") - batch.start() + priority = run_kwargs.get("priority") + batch.start(priority=priority) batch.monitor() vjp_traced_fields_dict = {} diff --git a/tidy3d/web/api/container.py b/tidy3d/web/api/container.py index 7cba298c3d..e66d336cc3 100644 --- a/tidy3d/web/api/container.py +++ b/tidy3d/web/api/container.py @@ -227,21 +227,28 @@ def to_file(self, fname: str) -> None: self = self.updated_copy(task_id_cached=task_id_cached) super(Job, self).to_file(fname=fname) # noqa: UP008 - def run(self, path: str = DEFAULT_DATA_PATH) -> WorkflowDataType: + def run( + self, path: str = DEFAULT_DATA_PATH, priority: Optional[int] = None + ) -> WorkflowDataType: """Run :class:`Job` all the way through and return data. Parameters ---------- - path_dir : str = "./simulation_data.hdf5" - Base directory where data will be downloaded, by default current working directory. - + path : str = "./simulation_data.hdf5" + Path to download results file (.hdf5), including filename. + priority: int = None + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Returns ------- - Union[:class:`.SimulationData`, :class:`.HeatSimulationData`, :class:`.EMESimulationData`] + :class:`WorkflowDataType` Object containing simulation results. """ self.upload() - self.start() + if priority is None: + self.start() + else: + self.start(priority=priority) self.monitor() return self.load(path=path) @@ -280,14 +287,25 @@ def status(self): """Return current status of :class:`Job`.""" return self.get_info().status - def start(self) -> None: + def start(self, priority: Optional[int] = None) -> None: """Start running a :class:`Job`. + Parameters + ---------- + + priority: int = None + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Note ---- To monitor progress of the :class:`Job`, call :meth:`Job.monitor` after started. """ - web.start(self.task_id, solver_version=self.solver_version, pay_type=self.pay_type) + web.start( + self.task_id, + solver_version=self.solver_version, + pay_type=self.pay_type, + priority=priority, + ) def get_run_info(self) -> RunInfo: """Return information about the running :class:`Job`. @@ -581,14 +599,20 @@ class Batch(WebContainer): _job_type = Job - def run(self, path_dir: str = DEFAULT_DATA_DIR) -> BatchData: + def run( + self, + path_dir: str = DEFAULT_DATA_DIR, + priority: Optional[int] = None, + ) -> BatchData: """Upload and run each simulation in :class:`Batch`. Parameters ---------- path_dir : str Base directory where data will be downloaded, by default current working directory. - + priority: int = None + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Returns ------ :class:`BatchData` @@ -612,7 +636,10 @@ def run(self, path_dir: str = DEFAULT_DATA_DIR) -> BatchData: self._check_path_dir(path_dir) self.upload() self.to_file(self._batch_path(path_dir=path_dir)) - self.start() + if priority is None: + self.start() + else: + self.start(priority=priority) self.monitor() return self.load(path_dir=path_dir) @@ -715,9 +742,18 @@ def get_info(self) -> dict[TaskName, TaskInfo]: info_dict[task_name] = task_info return info_dict - def start(self) -> None: + def start( + self, + priority: Optional[int] = None, + ) -> None: """Start running all tasks in the :class:`Batch`. + Parameters + ---------- + + priority: int = None + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Note ---- To monitor the running simulations, can call :meth:`Batch.monitor`. @@ -728,7 +764,10 @@ def start(self) -> None: with ThreadPoolExecutor(max_workers=self.num_workers) as executor: for _, job in self.jobs.items(): - executor.submit(job.start) + if priority is None: + executor.submit(job.start) + else: + executor.submit(job.start, priority=priority) def get_run_info(self) -> dict[TaskName, RunInfo]: """get information about a each of the tasks in the :class:`Batch`. diff --git a/tidy3d/web/api/webapi.py b/tidy3d/web/api/webapi.py index ca07dcf3f5..39597b660d 100644 --- a/tidy3d/web/api/webapi.py +++ b/tidy3d/web/api/webapi.py @@ -137,9 +137,10 @@ def run( reduce_simulation : Literal["auto", True, False] = "auto" Whether to reduce structures in the simulation to the simulation domain only. Note: currently only implemented for the mode solver. pay_type: Union[PayType, str] = PayType.AUTO - Which method to pay the simulation. + Which method to pay the simulation. priority: int = None - Task priority for vGPU queue (1=lowest, 10=highest). + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Returns ------- Union[:class:`.SimulationData`, :class:`.HeatSimulationData`, :class:`.EMESimulationData`] @@ -450,7 +451,8 @@ def start( pay_type: Union[PayType, str] = PayType.AUTO Which method to pay the simulation priority: int = None - Task priority for vGPU queue (1=lowest, 10=highest). + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. Note ---- To monitor progress, can call :meth:`monitor` after starting simulation. diff --git a/tidy3d/web/core/task_core.py b/tidy3d/web/core/task_core.py index 50ef0a484f..3ca7a69e03 100644 --- a/tidy3d/web/core/task_core.py +++ b/tidy3d/web/core/task_core.py @@ -477,7 +477,8 @@ def submit( pay_type: Union[PayType, str] = PayType.AUTO Which method to pay the simulation. priority: int = None - Task priority for vGPU queue (1=lowest, 10=highest). + Priority of the simulation in the Virtual GPU (vGPU) queue (1 = lowest, 10 = highest). + It affects only simulations from vGPU licenses and does not impact simulations using FlexCredits. """ pay_type = PayType(pay_type) if not isinstance(pay_type, PayType) else pay_type