Skip to content

Commit d532f12

Browse files
committed
HTEX submit returns type-aware future
Rather than a stock Future, that typing doesn't know will have a `.parsl_executor_task_id` attached, formalize the attribute with a subclass.
1 parent 9d61d8c commit d532f12

File tree

1 file changed

+8
-3
lines changed

1 file changed

+8
-3
lines changed

parsl/executors/high_throughput/executor.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,12 @@
160160
""" # Documentation for params used by both HTEx and MPIEx
161161

162162

163+
class HTEXFuture(Future):
164+
def __init__(self, task_id) -> None:
165+
super().__init__()
166+
self.parsl_executor_task_id = task_id
167+
168+
163169
class HighThroughputExecutor(BlockProviderExecutor, RepresentationMixin, UsageInformation):
164170
__doc__ = f"""Executor designed for cluster-scale
165171
@@ -670,7 +676,7 @@ def _hold_block(self, block_id):
670676
logger.debug("Sending hold to manager: {}".format(manager['manager']))
671677
self._hold_manager(manager['manager'])
672678

673-
def submit(self, func, resource_specification, *args, **kwargs):
679+
def submit(self, func: Callable, resource_specification: dict, *args, **kwargs) -> HTEXFuture:
674680
"""Submits work to the outgoing_q.
675681
676682
The outgoing_q is an external process listens on this
@@ -702,8 +708,7 @@ def submit(self, func, resource_specification, *args, **kwargs):
702708
args_to_print = tuple([ar if len(ar := repr(arg)) < 100 else (ar[:100] + '...') for arg in args])
703709
logger.debug("Pushing function {} to queue with args {}".format(func, args_to_print))
704710

705-
fut = Future()
706-
fut.parsl_executor_task_id = task_id
711+
fut = HTEXFuture(task_id)
707712
self.tasks[task_id] = fut
708713

709714
try:

0 commit comments

Comments
 (0)