From 242bbacc36844238b2fa2a0884e867db1c9f0766 Mon Sep 17 00:00:00 2001 From: Alistair Grant Date: Thu, 17 Apr 2025 09:52:15 +0200 Subject: [PATCH 1/2] Prevent multiple processes entering Monitor>>critical: when a process is terminated. fix: https://github.com/pharo-project/pharo/issues/18104 --- src/Kernel-Tests/MonitorTest.class.st | 209 +++++++++++++++++++++++++- src/Kernel/Monitor.class.st | 32 +++- 2 files changed, 236 insertions(+), 5 deletions(-) diff --git a/src/Kernel-Tests/MonitorTest.class.st b/src/Kernel-Tests/MonitorTest.class.st index 83a31648e1c..ff5ce7d6a07 100644 --- a/src/Kernel-Tests/MonitorTest.class.st +++ b/src/Kernel-Tests/MonitorTest.class.st @@ -4,11 +4,47 @@ SUnit tests for monitors Class { #name : 'MonitorTest', #superclass : 'TestCase', + #instVars : [ + 'monitor', + 'forkedProcesses' + ], #category : 'Kernel-Tests-Processes', #package : 'Kernel-Tests', #tag : 'Processes' } +{ #category : 'running - helpers' } +MonitorTest >> fork: aBlock [ + | newProcess | + newProcess := aBlock forkNamed: testSelector. + forkedProcesses add: newProcess. + ^newProcess +] + +{ #category : 'running - helpers' } +MonitorTest >> fork: aBlock at: priority [ + + | newProcess | + newProcess := aBlock forkAt: priority named: testSelector. + forkedProcesses add: newProcess. + ^newProcess +] + +{ #category : 'running' } +MonitorTest >> setUp [ + super setUp. + + forkedProcesses := OrderedCollection new. + monitor := Monitor new +] + +{ #category : 'running' } +MonitorTest >> tearDown [ + forkedProcesses do: #terminate. + + super tearDown +] + { #category : 'tests - examples' } MonitorTest >> testExample1 [ | producer1 producer2 monitor goal work counter goalReached finished | @@ -91,9 +127,180 @@ MonitorTest >> testExample2 [ self assert: goal equals: work ] +{ #category : 'tests' } +MonitorTest >> testExecutionCriticalSection [ + + | actual | + actual := monitor critical: [ #result ]. + + self assert: actual equals: #result +] + +{ #category : 'tests' } +MonitorTest >> testFailedCriticalSectionShouldUnblockWaitingOne [ + | lastCriticalExecuted semaphoreToHoldMutex | + lastCriticalExecuted := false. + semaphoreToHoldMutex := Semaphore new. + self fork: [ + [ + monitor critical: [ + semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore" + self error: 'critical section failed' ] ] onErrorDo: [ ] ]. + self waitLastProcessLock. "wait until first process grabs the mutex" + + self fork: [ monitor critical: [ lastCriticalExecuted := true ] ]. + self waitLastProcessLock. + + semaphoreToHoldMutex signal. + self waitLastProcessTerminate. + self assert: lastCriticalExecuted +] + +{ #category : 'tests' } +MonitorTest >> testTerminatedCriticalSectionShouldUnblockWaitingOne [ + | lastCriticalExecuted semaphoreToHoldMutex processHoldingMutex | + lastCriticalExecuted := false. + semaphoreToHoldMutex := Semaphore new. + + processHoldingMutex := self fork: [ + monitor critical: [ semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore" + self error: 'should not happen' ]]. + self waitLastProcessLock. + + self fork: [monitor critical: [ lastCriticalExecuted := true ]]. + self waitLastProcessLock. + + processHoldingMutex terminate. + self waitLastProcessTerminate. + self assert: lastCriticalExecuted +] + +{ #category : 'tests' } +MonitorTest >> testTerminatingBlockedCriticalSectionShouldNotUnblockAnotherWaitingSection [ + | semaphoreToHoldMutex holdingCriticalExecutedFirst firstWaitingProcess lastCriticalExecuted | + holdingCriticalExecutedFirst := false. + semaphoreToHoldMutex := Semaphore new. + lastCriticalExecuted := false. + self fork: [ + monitor critical: [ semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore" + holdingCriticalExecutedFirst := lastCriticalExecuted not ]]. + self waitLastProcessLock. + + firstWaitingProcess := self fork: [monitor critical: [ self error: 'should not happen' ]]. + self waitLastProcessLock. + self fork: [monitor critical: [ lastCriticalExecuted := true]]. + self waitLastProcessLock. + firstWaitingProcess terminate. + self waitLastProcessLock. "check that last process is still waiting" + semaphoreToHoldMutex signal. "here we resume first process execution" + self waitLastProcessTerminate. + self assert: holdingCriticalExecutedFirst. + self assert: lastCriticalExecuted +] + +{ #category : 'tests' } +MonitorTest >> testTerminatingBlockedCriticalWhichWasSignalledButNotResumedYet [ + | processWaitingForMutex firstCriticalExecuted lastCriticalExecuted semaphoreToHoldMutex | + firstCriticalExecuted := false. + lastCriticalExecuted := false. + semaphoreToHoldMutex := Semaphore new. + + self fork: [ + monitor critical: [ semaphoreToHoldMutex wait. + firstCriticalExecuted := true ]] at: Processor activeProcess priority + 1. + self waitLastProcessLock. + "for second critical we choose small priority. So it can't be resumed automatically by scheduler in our scenario." + processWaitingForMutex := self fork: [monitor critical: [ self error: 'should not happen' ]] at: Processor activeProcess priority - 1. + self waitLastProcessLock. + self deny: firstCriticalExecuted. + semaphoreToHoldMutex signal. + self assert: firstCriticalExecuted. + processWaitingForMutex terminate. "Here the process waits for monitor and being terminated at the point when monitor was already signalled but process was not resumed. + Correct critical implementation should allow execution of new consequent criticals" + self fork: [ monitor critical: [ lastCriticalExecuted := true ]]. + self waitLastProcessTerminate. + self assert: lastCriticalExecuted description: 'consequent last critical should be executed' +] + +{ #category : 'tests' } +MonitorTest >> testTwoCriticalsShouldWaitEachOther [ + + | lastCriticalExecuted firstCriticalExecutedFirst semaphoreToHoldMutex | + lastCriticalExecuted := false. + firstCriticalExecutedFirst := false. + semaphoreToHoldMutex := Semaphore new. + + self fork: [ + monitor critical: [ semaphoreToHoldMutex wait. "here we grab monitor and control it with semaphore" + firstCriticalExecutedFirst := lastCriticalExecuted not ]]. + self waitLastProcessLock. + + self fork: [monitor critical: [ lastCriticalExecuted := true ]]. + self waitLastProcessLock. + + semaphoreToHoldMutex signal. + self waitLastProcessTerminate. + self assert: lastCriticalExecuted. + self assert: firstCriticalExecutedFirst +] + +{ #category : 'tests' } +MonitorTest >> testTwoRecursiveCriticalsShouldNotWaitEachOther [ + + | executed | + executed := false. + + self fork: [monitor critical: [ monitor critical: [ executed := true ]]]. + self waitLastProcessTerminate. + + self assert: executed +] + { #category : 'tests' } MonitorTest >> testWaitMaxMilliseconds [ - | monitor | + monitor := Monitor new. monitor critical: [ monitor waitMaxMilliseconds: 10 ] ] + +{ #category : 'running - helpers' } +MonitorTest >> waitFor: aBlock [ + + [ 10 milliSeconds wait. aBlock value ] whileFalse +] + +{ #category : 'running - helpers' } +MonitorTest >> waitLastProcessLock [ + + self waitProcessLock: forkedProcesses last +] + +{ #category : 'running - helpers' } +MonitorTest >> waitLastProcessSuspend [ + + self waitProcessSuspend: forkedProcesses last +] + +{ #category : 'running - helpers' } +MonitorTest >> waitLastProcessTerminate [ + + self waitProcessTermination: forkedProcesses last +] + +{ #category : 'running - helpers' } +MonitorTest >> waitProcessLock: aProcess [ + + self waitFor: [ aProcess suspendingList isEmptyOrNil not ] +] + +{ #category : 'running - helpers' } +MonitorTest >> waitProcessSuspend: aProcess [ + + self waitFor: [ aProcess isSuspended ] +] + +{ #category : 'running - helpers' } +MonitorTest >> waitProcessTermination: aProcess [ + + self waitFor: [ aProcess isTerminated ] +] diff --git a/src/Kernel/Monitor.class.st b/src/Kernel/Monitor.class.st index 0577e449bf8..198ed22f0df 100644 --- a/src/Kernel/Monitor.class.st +++ b/src/Kernel/Monitor.class.st @@ -107,11 +107,35 @@ Monitor >> critical: aBlock [ in a critical section. NOTE: All the following synchronization operations are only valid inside the critical section of the monitor!" + | requestingProcess blockValue caught | + + requestingProcess := Processor activeProcess. + caught := false. + [ + "See Semaphore>>critical: for a description of the structure of the code below." + requestingProcess == ownerProcess ifTrue: [ + "Don't move the caught assignment outside the ifTrue:ifFalse, see below" + caught := true. + nestingLevel := nestingLevel + 1. + ] ifFalse: [ + "Set caught immediately before the mutex wait to ensure the process isn't interrupted" + caught := true. + mutex wait. + ownerProcess := requestingProcess. + nestingLevel := 1. + ]. - ^[ - self enter. - aBlock value] - ensure: [self exit] + "The critical section is only exited if: + 1. the ensured block was entered (caught), and + 2. the process exiting was the owning process + If the process being terminted was waiting on the monitor and there is no + current owner, the mutex needs to be signalled to reestablish the excess signal." + blockValue := aBlock value ] ensure: + [ caught ifTrue: [ requestingProcess == ownerProcess + ifTrue: [ self exit ] + ifFalse: [ (ownerProcess == nil and: [ nestingLevel = 0 ]) + ifTrue: [ mutex signal ] ] ] ]. + ^ blockValue ] { #category : 'private' } From 24474a2dc6d3a15cd79f0c76a0132fa3949b32b6 Mon Sep 17 00:00:00 2001 From: Alistair Grant Date: Mon, 28 Apr 2025 06:22:20 +0200 Subject: [PATCH 2/2] Monitor>>critical: handle the case where the requestingProcess gets the signal, but the ownerProcess isn't yet set. fix: https://github.com/pharo-project/pharo/issues/18104 --- src/Kernel/Monitor.class.st | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/src/Kernel/Monitor.class.st b/src/Kernel/Monitor.class.st index 198ed22f0df..29f19522ac6 100644 --- a/src/Kernel/Monitor.class.st +++ b/src/Kernel/Monitor.class.st @@ -107,35 +107,34 @@ Monitor >> critical: aBlock [ in a critical section. NOTE: All the following synchronization operations are only valid inside the critical section of the monitor!" - | requestingProcess blockValue caught | + | requestingProcess caught | requestingProcess := Processor activeProcess. caught := false. - [ - "See Semaphore>>critical: for a description of the structure of the code below." + ^ [ "See Semaphore>>critical: for a description of the structure of the code below." requestingProcess == ownerProcess ifTrue: [ "Don't move the caught assignment outside the ifTrue:ifFalse, see below" caught := true. nestingLevel := nestingLevel + 1. ] ifFalse: [ - "Set caught immediately before the mutex wait to ensure the process isn't interrupted" + "Only one process should be able to enter the critical: section at a time, + however there is obviously a window where terminating a process can allow two + processes in at the same time. + Protect against this by rewaiting if someone else owns the monitor." + [ "Set caught immediately before the mutex wait to ensure both are executed as an atomic operation" caught := true. - mutex wait. + mutex wait] doWhileFalse: [ ownerProcess isNil ]. ownerProcess := requestingProcess. - nestingLevel := 1. - ]. - - "The critical section is only exited if: - 1. the ensured block was entered (caught), and - 2. the process exiting was the owning process - If the process being terminted was waiting on the monitor and there is no - current owner, the mutex needs to be signalled to reestablish the excess signal." - blockValue := aBlock value ] ensure: + nestingLevel := 1. ]. + aBlock value ] ensure: + "The critical section is only exited if: + 1. the ensured block was entered (caught), and + 2. the process exiting was the owning process + If the process being terminted was waiting on the monitor (caught) and there is no + current owner, the mutex needs to be signalled to reestablish the excess signal." [ caught ifTrue: [ requestingProcess == ownerProcess ifTrue: [ self exit ] - ifFalse: [ (ownerProcess == nil and: [ nestingLevel = 0 ]) - ifTrue: [ mutex signal ] ] ] ]. - ^ blockValue + ifFalse: [ ownerProcess ifNil: [ mutex signal ] ] ] ]. ] { #category : 'private' }