Skip to content

Prevent multiple processes entering Monitor>>critical: #18105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: Pharo13
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 208 additions & 1 deletion src/Kernel-Tests/MonitorTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,47 @@ SUnit tests for monitors
Class {
#name : 'MonitorTest',
#superclass : 'TestCase',
#instVars : [
'monitor',
'forkedProcesses'
],
#category : 'Kernel-Tests-Processes',
#package : 'Kernel-Tests',
#tag : 'Processes'
}

{ #category : 'running - helpers' }
MonitorTest >> fork: aBlock [
| newProcess |
newProcess := aBlock forkNamed: testSelector.
forkedProcesses add: newProcess.
^newProcess
]

{ #category : 'running - helpers' }
MonitorTest >> fork: aBlock at: priority [

| newProcess |
newProcess := aBlock forkAt: priority named: testSelector.
forkedProcesses add: newProcess.
^newProcess
]

{ #category : 'running' }
MonitorTest >> setUp [
super setUp.

forkedProcesses := OrderedCollection new.
monitor := Monitor new
]

{ #category : 'running' }
MonitorTest >> tearDown [
forkedProcesses do: #terminate.

super tearDown
]

{ #category : 'tests - examples' }
MonitorTest >> testExample1 [
| producer1 producer2 monitor goal work counter goalReached finished |
Expand Down Expand Up @@ -91,9 +127,180 @@ MonitorTest >> testExample2 [
self assert: goal equals: work
]

{ #category : 'tests' }
MonitorTest >> testExecutionCriticalSection [

| actual |
actual := monitor critical: [ #result ].

self assert: actual equals: #result
]

{ #category : 'tests' }
MonitorTest >> testFailedCriticalSectionShouldUnblockWaitingOne [
| lastCriticalExecuted semaphoreToHoldMutex |
lastCriticalExecuted := false.
semaphoreToHoldMutex := Semaphore new.
self fork: [
[
monitor critical: [
semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore"
self error: 'critical section failed' ] ] onErrorDo: [ ] ].
self waitLastProcessLock. "wait until first process grabs the mutex"

self fork: [ monitor critical: [ lastCriticalExecuted := true ] ].
self waitLastProcessLock.

semaphoreToHoldMutex signal.
self waitLastProcessTerminate.
self assert: lastCriticalExecuted
]

{ #category : 'tests' }
MonitorTest >> testTerminatedCriticalSectionShouldUnblockWaitingOne [
| lastCriticalExecuted semaphoreToHoldMutex processHoldingMutex |
lastCriticalExecuted := false.
semaphoreToHoldMutex := Semaphore new.

processHoldingMutex := self fork: [
monitor critical: [ semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore"
self error: 'should not happen' ]].
self waitLastProcessLock.

self fork: [monitor critical: [ lastCriticalExecuted := true ]].
self waitLastProcessLock.

processHoldingMutex terminate.
self waitLastProcessTerminate.
self assert: lastCriticalExecuted
]

{ #category : 'tests' }
MonitorTest >> testTerminatingBlockedCriticalSectionShouldNotUnblockAnotherWaitingSection [
| semaphoreToHoldMutex holdingCriticalExecutedFirst firstWaitingProcess lastCriticalExecuted |
holdingCriticalExecutedFirst := false.
semaphoreToHoldMutex := Semaphore new.
lastCriticalExecuted := false.
self fork: [
monitor critical: [ semaphoreToHoldMutex wait. "here we grab mutex and control it with semaphore"
holdingCriticalExecutedFirst := lastCriticalExecuted not ]].
self waitLastProcessLock.

firstWaitingProcess := self fork: [monitor critical: [ self error: 'should not happen' ]].
self waitLastProcessLock.
self fork: [monitor critical: [ lastCriticalExecuted := true]].
self waitLastProcessLock.
firstWaitingProcess terminate.
self waitLastProcessLock. "check that last process is still waiting"
semaphoreToHoldMutex signal. "here we resume first process execution"
self waitLastProcessTerminate.
self assert: holdingCriticalExecutedFirst.
self assert: lastCriticalExecuted
]

{ #category : 'tests' }
MonitorTest >> testTerminatingBlockedCriticalWhichWasSignalledButNotResumedYet [
| processWaitingForMutex firstCriticalExecuted lastCriticalExecuted semaphoreToHoldMutex |
firstCriticalExecuted := false.
lastCriticalExecuted := false.
semaphoreToHoldMutex := Semaphore new.

self fork: [
monitor critical: [ semaphoreToHoldMutex wait.
firstCriticalExecuted := true ]] at: Processor activeProcess priority + 1.
self waitLastProcessLock.
"for second critical we choose small priority. So it can't be resumed automatically by scheduler in our scenario."
processWaitingForMutex := self fork: [monitor critical: [ self error: 'should not happen' ]] at: Processor activeProcess priority - 1.
self waitLastProcessLock.
self deny: firstCriticalExecuted.
semaphoreToHoldMutex signal.
self assert: firstCriticalExecuted.
processWaitingForMutex terminate. "Here the process waits for monitor and being terminated at the point when monitor was already signalled but process was not resumed.
Correct critical implementation should allow execution of new consequent criticals"
self fork: [ monitor critical: [ lastCriticalExecuted := true ]].
self waitLastProcessTerminate.
self assert: lastCriticalExecuted description: 'consequent last critical should be executed'
]

{ #category : 'tests' }
MonitorTest >> testTwoCriticalsShouldWaitEachOther [

| lastCriticalExecuted firstCriticalExecutedFirst semaphoreToHoldMutex |
lastCriticalExecuted := false.
firstCriticalExecutedFirst := false.
semaphoreToHoldMutex := Semaphore new.

self fork: [
monitor critical: [ semaphoreToHoldMutex wait. "here we grab monitor and control it with semaphore"
firstCriticalExecutedFirst := lastCriticalExecuted not ]].
self waitLastProcessLock.

self fork: [monitor critical: [ lastCriticalExecuted := true ]].
self waitLastProcessLock.

semaphoreToHoldMutex signal.
self waitLastProcessTerminate.
self assert: lastCriticalExecuted.
self assert: firstCriticalExecutedFirst
]

{ #category : 'tests' }
MonitorTest >> testTwoRecursiveCriticalsShouldNotWaitEachOther [

| executed |
executed := false.

self fork: [monitor critical: [ monitor critical: [ executed := true ]]].
self waitLastProcessTerminate.

self assert: executed
]

{ #category : 'tests' }
MonitorTest >> testWaitMaxMilliseconds [
| monitor |

monitor := Monitor new.
monitor critical: [ monitor waitMaxMilliseconds: 10 ]
]

{ #category : 'running - helpers' }
MonitorTest >> waitFor: aBlock [

[ 10 milliSeconds wait. aBlock value ] whileFalse
]

{ #category : 'running - helpers' }
MonitorTest >> waitLastProcessLock [

self waitProcessLock: forkedProcesses last
]

{ #category : 'running - helpers' }
MonitorTest >> waitLastProcessSuspend [

self waitProcessSuspend: forkedProcesses last
]

{ #category : 'running - helpers' }
MonitorTest >> waitLastProcessTerminate [

self waitProcessTermination: forkedProcesses last
]

{ #category : 'running - helpers' }
MonitorTest >> waitProcessLock: aProcess [

self waitFor: [ aProcess suspendingList isEmptyOrNil not ]
]

{ #category : 'running - helpers' }
MonitorTest >> waitProcessSuspend: aProcess [

self waitFor: [ aProcess isSuspended ]
]

{ #category : 'running - helpers' }
MonitorTest >> waitProcessTermination: aProcess [

self waitFor: [ aProcess isTerminated ]
]
33 changes: 28 additions & 5 deletions src/Kernel/Monitor.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,34 @@ Monitor >> critical: aBlock [
in a critical section.
NOTE: All the following synchronization operations are only valid inside the critical section
of the monitor!"

^[
self enter.
aBlock value]
ensure: [self exit]
| requestingProcess caught |

requestingProcess := Processor activeProcess.
caught := false.
^ [ "See Semaphore>>critical: for a description of the structure of the code below."
requestingProcess == ownerProcess ifTrue: [
"Don't move the caught assignment outside the ifTrue:ifFalse, see below"
caught := true.
nestingLevel := nestingLevel + 1.
] ifFalse: [
"Only one process should be able to enter the critical: section at a time,
however there is obviously a window where terminating a process can allow two
processes in at the same time.
Protect against this by rewaiting if someone else owns the monitor."
[ "Set caught immediately before the mutex wait to ensure both are executed as an atomic operation"
caught := true.
mutex wait] doWhileFalse: [ ownerProcess isNil ].
ownerProcess := requestingProcess.
nestingLevel := 1. ].
aBlock value ] ensure:
"The critical section is only exited if:
1. the ensured block was entered (caught), and
2. the process exiting was the owning process
If the process being terminted was waiting on the monitor (caught) and there is no
current owner, the mutex needs to be signalled to reestablish the excess signal."
[ caught ifTrue: [ requestingProcess == ownerProcess
ifTrue: [ self exit ]
ifFalse: [ ownerProcess ifNil: [ mutex signal ] ] ] ].
]

{ #category : 'private' }
Expand Down