@@ -85,10 +85,20 @@ public function readLock($id, $staleDuration)
85
85
86
86
while (time () < $ timeoutTimestamp ) {
87
87
$ readerId = new \MongoId ();
88
- $ query = ['_id ' => $ id , 'writing ' => false , 'writePending ' => false ];
88
+ $ query = [
89
+ '_id ' => $ id ,
90
+ '$or ' => [
91
+ //normal case where we have zero or more readers and no writers pending
92
+ ['writing ' => false , 'writePending ' => false ],
93
+ //case where a pending write is stale
94
+ ['writing ' => false , 'writePending ' => true , 'writeStaleTs ' => ['$lte ' => new \MongoDate ()]],
95
+ //case where a writer is stale
96
+ ['writing ' => true , 'writeStaleTs ' => ['$lte ' => new \MongoDate ()]],
97
+ ],
98
+ ];
89
99
$ update = [
90
100
'$push ' => ['readers ' => ['id ' => $ readerId , 'staleTs ' => $ staleTimestamp ]],
91
- '$set ' => ['writeStaleTs ' => null ],
101
+ '$set ' => ['writing ' => false , ' writePending ' => false , ' writeStaleTs ' => null ],
92
102
];
93
103
try {
94
104
if ($ this ->collection ->update ($ query , $ update , ['upsert ' => true ])['n ' ] === 1 ) {
@@ -100,10 +110,6 @@ public function readLock($id, $staleDuration)
100
110
}
101
111
}
102
112
103
- if ($ this ->clearStuckWrite ($ id )) {
104
- continue ;
105
- }
106
-
107
113
usleep ($ this ->pollDuration );
108
114
}
109
115
@@ -119,7 +125,11 @@ public function readLock($id, $staleDuration)
119
125
*/
120
126
public function readUnlock ($ id , \MongoId $ readerId )
121
127
{
122
- $ this ->collection ->update (['_id ' => $ id ], ['$pull ' => ['readers ' => ['id ' => $ readerId ]]]);
128
+ $ this ->collection ->update (
129
+ ['_id ' => $ id ],
130
+ //pull this reader id, or any stale readers
131
+ ['$pull ' => ['readers ' => ['$or ' => [['id ' => $ readerId ], ['staleTs ' => ['$lte ' => new \MongoDate ()]]]]]]
132
+ );
123
133
$ this ->collection ->remove (['_id ' => $ id , 'writing ' => false , 'readers ' => ['$size ' => 0 ]]);
124
134
}
125
135
@@ -144,7 +154,17 @@ public function writeLock($id, $staleDuration)
144
154
$ staleTimestamp = new \MongoDate ((int )min (time () + $ staleDuration , PHP_INT_MAX ));
145
155
146
156
while (time () < $ timeoutTimestamp ) {
147
- $ query = ['_id ' => $ id , 'writing ' => false , 'readers ' => ['$size ' => 0 ]];
157
+ $ query = [
158
+ '_id ' => $ id ,
159
+ '$or ' => [
160
+ //normal case when readers are done
161
+ ['writing ' => false , 'readers ' => ['$size ' => 0 ]],
162
+ //to clean where writer is stuck
163
+ ['writing ' => true , 'writeStaleTs ' => ['$lte ' => new \MongoDate ()]],
164
+ //to clean where all readers are stuck
165
+ ['writing ' => false , 'readers.staleTs ' => ['$not ' => ['$gt ' => new \MongoDate ()]]],
166
+ ],
167
+ ];
148
168
$ update = [
149
169
'_id ' => $ id ,
150
170
'writing ' => true ,
@@ -162,11 +182,10 @@ public function writeLock($id, $staleDuration)
162
182
}
163
183
}
164
184
165
- if ($ this ->clearStuckWrite ($ id ) || $ this ->clearStuckRead ($ id )) {
166
- continue ;
167
- }
168
-
169
- $ this ->collection ->update (['_id ' => $ id ], ['$set ' => ['writePending ' => true ]]);
185
+ $ this ->collection ->update (
186
+ ['_id ' => $ id ],
187
+ ['$set ' => ['writePending ' => true , 'writeStaleTs ' => $ staleTimestamp ]]
188
+ );
170
189
171
190
usleep ($ this ->pollDuration );
172
191
}
@@ -184,19 +203,4 @@ public function writeUnlock($id)
184
203
{
185
204
$ this ->collection ->remove (['_id ' => $ id ]);
186
205
}
187
-
188
- private function clearStuckWrite ($ id )
189
- {
190
- return $ this ->collection ->remove (
191
- ['_id ' => $ id , 'writing ' => true , 'writeStaleTs ' => ['$lte ' => new \MongoDate ()]]
192
- )['n ' ] === 1 ;
193
- }
194
-
195
- private function clearStuckRead ($ id )
196
- {
197
- $ now = new \MongoDate ();
198
- $ query = ['_id ' => $ id , 'writing ' => false , 'readers.staleTs ' => ['$lte ' => $ now ]];
199
- $ update = ['$pull ' => ['readers ' => ['staleTs ' => ['$lte ' => $ now ]]]];
200
- return $ this ->collection ->update ($ query , $ update )['n ' ] === 1 ;
201
- }
202
206
}
0 commit comments