Skip to content

Commit 698e651

Browse files
authored
Merge pull request #61 from weka-io/agent-thread-safe
agent: python3 reboot bug fix and use thread-safe
2 parents fc20ee4 + b26b5f1 commit 698e651

File tree

3 files changed

+53
-26
lines changed

3 files changed

+53
-26
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

77
## [Unreleased]
8+
### Fixed
9+
- make all agent threads thread-safe
10+
- fix dict items iteration when doing reboot due to python3 changes
811

912
## [1.8.7] - 2019-11-21
1013
### Changed

talker_agent/talker.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from logging import getLogger
2121
from logging.handlers import RotatingFileHandler
2222
from configparser import ConfigParser
23+
from threading import Lock
2324

2425
import redis
2526

@@ -59,6 +60,29 @@ def reraise(tp, value, tb=None):
5960
JOBS_EXPIRATION = 15 # 20 * 60 # how long to keep job ids in the EOS registry (exactly-once-semantics)
6061

6162
config = None
63+
first_exception_info = None
64+
safe_thread_lock = Lock()
65+
66+
67+
class SafeThread(threading.Thread):
68+
def __init__(self, *, target, name, args=(), kwargs=None, daemon=None):
69+
super().__init__(None, target, name, args, kwargs, daemon=daemon)
70+
self.exc_info = None
71+
72+
def run(self):
73+
global first_exception_info
74+
try:
75+
self._target(*self._args, **self._kwargs)
76+
except:
77+
exc_info = sys.exc_info()
78+
logger.info("exception in '%s'", self.name, exc_info=exc_info)
79+
with safe_thread_lock:
80+
if not first_exception_info:
81+
first_exception_info = sys.exc_info()
82+
finally:
83+
# Avoid a refcycle if the thread is running a function with
84+
# an argument that has a member that points to the thread.
85+
del self._target, self._args, self._kwargs
6286

6387

6488
class LineTimeout(Exception):
@@ -439,9 +463,7 @@ def _kill():
439463
except Exception as e:
440464
self.logger.error(e)
441465

442-
thread = threading.Thread(target=_kill, name="killer-%s" % self.job_id)
443-
thread.daemon = True
444-
thread.start()
466+
SafeThread(target=_kill, name="killer-%s" % self.job_id, daemon=True).start()
445467
self.reset_timeout(new_timeout=graceful_timeout + 10)
446468

447469

@@ -451,7 +473,7 @@ def __init__(self, *args, **kwargs):
451473
super(RebootJob, self).__init__(*args, **kwargs)
452474

453475
def start(self):
454-
threading.Thread(target=self.reboot_host, name="Reboot").start()
476+
SafeThread(target=self.reboot_host, name="Reboot").start()
455477

456478
def reboot_host(self):
457479
with open(REBOOT_FILENAME, 'w') as f:
@@ -497,8 +519,6 @@ def __init__(self):
497519
self.output_lock = threading.RLock()
498520
self.redis = None
499521
self.host_id = None
500-
self.redis_fetcher = None
501-
self.redis_sender = None
502522
self.job_poller = None
503523
self.fds_poller = select.poll()
504524
self.fds_to_channels = {}
@@ -620,7 +640,7 @@ def stop_for_reboot(self, requested_by):
620640

621641
requested_by.log("Some jobs not yet finished, setting exit code to 'reboot' and proceeding")
622642
with self.pipeline() as pipeline:
623-
for job_id, job in self.current_processes.items():
643+
for job_id, job in list(self.current_processes.items()):
624644
if job_id == requested_by.job_id:
625645
continue
626646
job.set_result('reboot')
@@ -744,35 +764,24 @@ def sync_jobs_progress(self):
744764
else:
745765
time.sleep(CYCLE_DURATION)
746766

747-
def start_worker(self, worker, name):
748-
749-
def safe_run():
750-
try:
751-
return worker()
752-
except: # noqa
753-
self.exc_info = sys.exc_info()
754-
logger.debug("exception in '%s'", name, exc_info=self.exc_info)
755-
756-
t = threading.Thread(target=safe_run, name=name)
757-
t.daemon = True
758-
t.start()
759-
return t
760-
761767
def start(self):
768+
global first_exception_info
769+
first_exception_info = None
770+
762771
self.finalize_previous_session()
763772
if os.path.isfile(JOBS_SEEN):
764773
with open(JOBS_SEEN, "r") as f:
765774
self.seen_jobs = json.load(f)
766775

767-
self.redis_fetcher = self.start_worker(self.fetch_new_jobs, name="RedisFetcher")
768-
self.redis_sender = self.start_worker(self.sync_jobs_progress, name="JobProgress")
776+
SafeThread(target=self.fetch_new_jobs, name="RedisFetcher", daemon=True).start()
777+
SafeThread(target=self.sync_jobs_progress, name="JobProgress", daemon=True).start()
769778

770779
while not self.stop_agent.is_set():
771780
if not self.get_jobs_outputs():
772781
time.sleep(CYCLE_DURATION / 10.0)
773-
if self.exc_info:
782+
if first_exception_info:
774783
logger.debug("re-raising exception from worker")
775-
reraise(*self.exc_info)
784+
reraise(*first_exception_info)
776785
assert False, "exception should have been raised"
777786

778787
def setup(self):
@@ -819,7 +828,7 @@ def unregister_fileno(self, fileno):
819828

820829

821830
def wait_proc(proc, timeout):
822-
t = threading.Thread(target=proc.wait)
831+
t = SafeThread(target=proc.wait, name='wait_proc')
823832
t.start()
824833
t.join(timeout)
825834
return not t.is_alive()

tests/uts/test_agent.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,24 @@ def raise_file_not_found(*args, **kwargs):
3333
raise OSError(2, 'No such file or directory')
3434

3535

36+
class RebootMockException(Exception):
37+
pass
38+
39+
40+
def reboot_mock_exception(*args):
41+
raise RebootMockException("This is reboot mock exception")
42+
43+
3644
JOBS_DIR = '/tmp/talker/jobs'
3745
EXCEPTION_FILENAME = '/tmp/talker/last_exception'
3846
JOBS_SEEN = os.path.join(JOBS_DIR, 'eos.json')
47+
REBOOT_FILENAME = '/tmp/talker/reboot.id'
3948

4049

4150
@patch('talker_agent.talker.JOBS_DIR', JOBS_DIR)
4251
@patch('talker_agent.talker.JOBS_SEEN', JOBS_SEEN)
4352
@patch('talker_agent.talker.EXCEPTION_FILENAME', EXCEPTION_FILENAME)
53+
@patch('talker_agent.talker.REBOOT_FILENAME', REBOOT_FILENAME)
4454
class TestAgent(unittest.TestCase):
4555

4656
def setUp(self):
@@ -167,3 +177,8 @@ def test_max_output_per_channel(self):
167177
res = get_stdout(self.agent.redis, job_id)
168178
expected_val = val.replace('\\n', '\n') * val_repeats
169179
self.assertEqual(res, expected_val)
180+
181+
@patch('talker_agent.talker.RebootJob.reboot_host', reboot_mock_exception)
182+
def test_safe_thread(self):
183+
_ = self.run_cmd_on_agent('reboot', force=True)
184+
self.assert_agent_exception(RebootMockException)

0 commit comments

Comments
 (0)