Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions parsl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

"""
import logging
import multiprocessing as _multiprocessing
import os
import platform

from parsl.app.app import bash_app, join_app, python_app
from parsl.config import Config
Expand All @@ -32,9 +29,6 @@
from parsl.monitoring import MonitoringHub
from parsl.version import VERSION

if platform.system() == 'Darwin':
_multiprocessing.set_start_method('fork', force=True)

__author__ = 'The Parsl Team'
__version__ = VERSION

Expand Down Expand Up @@ -74,6 +68,3 @@


logging.getLogger('parsl').addHandler(logging.NullHandler())

if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'
16 changes: 8 additions & 8 deletions parsl/monitoring/db_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,9 @@ def start(self,
exception_happened = False

while (not self._kill_event.is_set() or
self.pending_priority_queue.qsize() != 0 or self.pending_resource_queue.qsize() != 0 or
self.pending_node_queue.qsize() != 0 or self.pending_block_queue.qsize() != 0 or
resource_queue.qsize() != 0):
not self.pending_priority_queue.empty() or not self.pending_resource_queue.empty() or
not self.pending_node_queue.empty() or not self.pending_block_queue.empty() or
not resource_queue.empty()):

"""
WORKFLOW_INFO and TASK_INFO messages (i.e. priority messages)
Expand All @@ -357,9 +357,9 @@ def start(self,
try:
logger.debug("""Checking STOP conditions: {}, {}, {}, {}, {}, {}""".format(
self._kill_event.is_set(),
self.pending_priority_queue.qsize() != 0, self.pending_resource_queue.qsize() != 0,
self.pending_node_queue.qsize() != 0, self.pending_block_queue.qsize() != 0,
resource_queue.qsize() != 0))
not self.pending_priority_queue.empty(), not self.pending_resource_queue.empty(),
not self.pending_node_queue.empty(), not self.pending_block_queue.empty(),
not resource_queue.empty()))

# This is the list of resource messages which can be reprocessed as if they
# had just arrived because the corresponding first task message has been
Expand Down Expand Up @@ -558,9 +558,9 @@ def start(self,
def _migrate_logs_to_internal(self, logs_queue: mpq.Queue, kill_event: threading.Event) -> None:
logger.info("Starting _migrate_logs_to_internal")

while not kill_event.is_set() or logs_queue.qsize() != 0:
while not kill_event.is_set() or not logs_queue.empty():
logger.debug("Checking STOP conditions: kill event: %s, queue has entries: %s",
kill_event.is_set(), logs_queue.qsize() != 0)
kill_event.is_set(), not logs_queue.empty())

try:
x = logs_queue.get(timeout=0.1)
Expand Down
4 changes: 2 additions & 2 deletions parsl/monitoring/monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@

from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import (
SizedQueue,
SpawnEvent,
SpawnProcess,
SpawnQueue,
join_terminate_close_proc,
)
from parsl.utils import RepresentationMixin
Expand Down Expand Up @@ -126,7 +126,7 @@ def start(self, dfk_run_dir: str, config_run_dir: Union[str, os.PathLike]) -> No
self.monitoring_hub_active = True

self.resource_msgs: Queue[TaggedMonitoringMessage]
self.resource_msgs = SizedQueue()
self.resource_msgs = SpawnQueue()

self.dbm_exit_event: ms.Event
self.dbm_exit_event = SpawnEvent()
Expand Down
4 changes: 2 additions & 2 deletions parsl/monitoring/radios/udp_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from parsl.monitoring.radios.base import MonitoringRadioReceiver
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.multiprocessing import (
SizedQueue,
SpawnEvent,
SpawnProcess,
SpawnQueue,
join_terminate_close_proc,
)
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -198,7 +198,7 @@ def start_udp_receiver(*,
hmac_digest: str) -> UDPRadioReceiver:

udp_comm_q: Queue[Union[int, str]]
udp_comm_q = SizedQueue(maxsize=10)
udp_comm_q = SpawnQueue(maxsize=10)

router_exit_event = SpawnEvent()

Expand Down
4 changes: 2 additions & 2 deletions parsl/monitoring/radios/zmq_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
from parsl.monitoring.radios.multiprocessing import MultiprocessingQueueRadioSender
from parsl.monitoring.types import TaggedMonitoringMessage
from parsl.multiprocessing import (
SizedQueue,
SpawnEvent,
SpawnProcess,
SpawnQueue,
join_terminate_close_proc,
)
from parsl.process_loggers import wrap_with_logs
Expand Down Expand Up @@ -158,7 +158,7 @@ def start_zmq_receiver(*,
port_range: Tuple[int, int],
logdir: str,
worker_debug: bool) -> ZMQRadioReceiver:
comm_q = SizedQueue(maxsize=10)
comm_q = SpawnQueue(maxsize=10)

router_exit_event = SpawnEvent()

Expand Down
49 changes: 0 additions & 49 deletions parsl/multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import logging
import multiprocessing
import multiprocessing.queues
import platform
from multiprocessing.context import ForkProcess as ForkProcessType
from multiprocessing.context import SpawnProcess as SpawnProcessType
from typing import Callable
Expand All @@ -21,54 +20,6 @@
SpawnQueue = SpawnContext.Queue


class MacSafeQueue(multiprocessing.queues.Queue):
""" Multiprocessing queues do not have qsize attributes on MacOS.
This is slower but more portable version of the multiprocessing Queue
that adds a explicit counter

Reference : https://github.com/keras-team/autokeras/commit/4ddd568b06b4045ace777bc0fb7bc18573b85a75
"""

def __init__(self, *args, **kwargs):
if 'ctx' not in kwargs:
kwargs['ctx'] = multiprocessing.get_context('spawn')
super().__init__(*args, **kwargs)
self._counter = multiprocessing.Value('i', 0)

def put(self, *args, **kwargs):
# logger.critical("Putting item {}".format(args))
x = super().put(*args, **kwargs)
with self._counter.get_lock():
self._counter.value += 1
return x

def get(self, *args, **kwargs):
x = super().get(*args, **kwargs)
with self._counter.get_lock():
self._counter.value -= 1
# logger.critical("Getting item {}".format(x))
return x

def qsize(self):
return self._counter.value

def empty(self):
return not self._counter.value


# SizedQueue should be constructable using the same calling
# convention as multiprocessing.Queue but that entire signature
# isn't expressible in mypy 0.790
SizedQueue: Callable[..., multiprocessing.Queue]


if platform.system() != 'Darwin':
import multiprocessing
SizedQueue = SpawnQueue
else:
SizedQueue = MacSafeQueue


def join_terminate_close_proc(process: SpawnProcessType, *, timeout: int = 30) -> None:
"""Increasingly aggressively terminate a process.

Expand Down
62 changes: 0 additions & 62 deletions parsl/tests/test_regression/test_854.py

This file was deleted.