1
1
using DynamicRepository . Filter ;
2
+ using DynamicRepository . MongoDB . Transaction ;
2
3
using DynamicRepository . Transaction ;
3
4
using MongoDB . Bson ;
4
5
using MongoDB . Driver ;
8
9
using System . Linq . Expressions ;
9
10
using System . Threading ;
10
11
using System . Threading . Tasks ;
12
+ using System . Transactions ;
11
13
12
14
namespace DynamicRepository . MongoDB
13
15
{
@@ -57,18 +59,28 @@ namespace DynamicRepository.MongoDB
57
59
/// </summary>
58
60
protected string CollectionName { get ; }
59
61
60
-
62
+ private readonly object _transactionLock = new object ( ) ;
61
63
private MongoDBTransaction _transactionInstance ;
62
- private MongoDBTransaction Transaction
63
- {
64
+ private MongoDBTransaction Transaction
65
+ {
64
66
get
65
67
{
66
- if ( _transactionInstance != null && _transactionInstance . HasBeenDisposed )
68
+ lock ( _transactionLock )
67
69
{
68
- _transactionInstance = null ;
69
- }
70
+ if ( _transactionInstance != null && _transactionInstance . HasBeenDisposed )
71
+ {
72
+ _transactionInstance = null ;
73
+ }
70
74
71
- return _transactionInstance ;
75
+ return _transactionInstance ;
76
+ }
77
+ }
78
+ set
79
+ {
80
+ lock ( _transactionLock )
81
+ {
82
+ _transactionInstance = value ;
83
+ }
72
84
}
73
85
}
74
86
@@ -144,16 +156,45 @@ protected FilterDefinition<Entity> GetIdFilter(Key id)
144
156
return Builders < Entity > . Filter . Eq ( _idPropertyName , id ) ;
145
157
}
146
158
159
+ public TransactionScope StartTransactionScope ( ) => new TransactionScope ( TransactionScopeAsyncFlowOption . Enabled ) ;
160
+
147
161
public ITransaction StartTransaction ( )
148
162
{
149
- _transactionInstance = new MongoDBTransaction ( _mongoDatabase . Client ) ;
163
+ Transaction = new MongoDBTransaction ( _mongoDatabase . Client ) ;
150
164
151
165
return Transaction ;
152
166
}
153
167
154
168
public void RegisterTransaction ( ITransaction transaction )
155
169
{
156
- _transactionInstance = transaction as MongoDBTransaction ;
170
+ Transaction = transaction as MongoDBTransaction ;
171
+ }
172
+
173
+ private void EnlistWithCurrentTransactionScope ( )
174
+ {
175
+ if ( System . Transactions . Transaction . Current != null )
176
+ {
177
+ var ambientTransactionId = System . Transactions . Transaction . Current . TransactionInformation . LocalIdentifier ;
178
+
179
+ if ( AmbientTransactionRegister . AmbientTransactions . ContainsKey ( ambientTransactionId ) )
180
+ {
181
+ RegisterTransaction ( AmbientTransactionRegister . AmbientTransactions [ ambientTransactionId ] ) ;
182
+ }
183
+ else
184
+ {
185
+ StartTransaction ( ) ;
186
+
187
+ AmbientTransactionRegister . AmbientTransactions . TryAdd ( ambientTransactionId , Transaction ) ;
188
+
189
+ System . Transactions . Transaction . Current . TransactionCompleted += ( sender , e ) => {
190
+ AmbientTransactionRegister . AmbientTransactions . TryRemove ( ambientTransactionId , out _ ) ;
191
+ Transaction = null ;
192
+ } ;
193
+
194
+ var enlistment = new MongoDBTransactionScopeEnlistment ( Transaction ) ;
195
+ System . Transactions . Transaction . Current . EnlistVolatile ( enlistment , System . Transactions . EnlistmentOptions . None ) ;
196
+ }
197
+ }
157
198
}
158
199
159
200
/// <summary>
@@ -163,7 +204,19 @@ public void RegisterTransaction(ITransaction transaction)
163
204
/// <returns>Persisted entity if found, otherwise NULL.</returns>
164
205
public Entity Get ( Key id )
165
206
{
166
- var queriedEntity = Collection . Find ( GetIdFilter ( id ) ) . FirstOrDefault ( ) ;
207
+ EnlistWithCurrentTransactionScope ( ) ;
208
+
209
+ Entity queriedEntity ;
210
+ if ( Transaction != null )
211
+ {
212
+ queriedEntity = Collection
213
+ . Find ( Transaction . Session , GetIdFilter ( id ) )
214
+ . FirstOrDefault ( ) ;
215
+ }
216
+ else
217
+ {
218
+ queriedEntity = Collection . Find ( GetIdFilter ( id ) ) . FirstOrDefault ( ) ;
219
+ }
167
220
168
221
return GlobalFilter != null && queriedEntity != null ? new [ ] { queriedEntity } . AsQueryable ( ) . FirstOrDefault ( GlobalFilter ) : queriedEntity ;
169
222
}
@@ -186,7 +239,21 @@ public Task<Entity> GetAsync(Key id)
186
239
/// <returns>Persisted entity if found, otherwise NULL.</returns>
187
240
public async Task < Entity > GetAsync ( Key id , CancellationToken cancellationToken )
188
241
{
189
- var queriedEntity = await ( await Collection . FindAsync ( GetIdFilter ( id ) , cancellationToken : cancellationToken ) ) . FirstOrDefaultAsync ( ) ;
242
+ EnlistWithCurrentTransactionScope ( ) ;
243
+
244
+ Entity queriedEntity ;
245
+ if ( Transaction != null )
246
+ {
247
+ queriedEntity = await ( await Collection . FindAsync ( Transaction . Session , GetIdFilter ( id ) , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) )
248
+ . FirstOrDefaultAsync ( )
249
+ . ConfigureAwait ( false ) ;
250
+ }
251
+ else
252
+ {
253
+ queriedEntity = await ( await Collection . FindAsync ( GetIdFilter ( id ) , cancellationToken : cancellationToken ) . ConfigureAwait ( false ) )
254
+ . FirstOrDefaultAsync ( )
255
+ . ConfigureAwait ( false ) ;
256
+ }
190
257
191
258
return GlobalFilter != null && queriedEntity != null ? new [ ] { queriedEntity } . AsQueryable ( ) . FirstOrDefault ( GlobalFilter ) : queriedEntity ;
192
259
}
@@ -197,6 +264,8 @@ public async Task<Entity> GetAsync(Key id, CancellationToken cancellationToken)
197
264
/// <param name="entity">The new <see cref="Entity"/> instance to be persisted.</param>
198
265
public void Insert ( Entity entity )
199
266
{
267
+ EnlistWithCurrentTransactionScope ( ) ;
268
+
200
269
if ( Transaction != null )
201
270
{
202
271
Collection . InsertOne ( Transaction . Session , entity ) ;
@@ -223,6 +292,8 @@ public Task InsertAsync(Entity entity)
223
292
/// <param name="cancellationToken">A token used for cancelling propagation.</param>
224
293
public Task InsertAsync ( Entity entity , CancellationToken cancellationToken )
225
294
{
295
+ EnlistWithCurrentTransactionScope ( ) ;
296
+
226
297
return Transaction != null ?
227
298
Collection . InsertOneAsync ( Transaction . Session , entity , null , cancellationToken )
228
299
: Collection . InsertOneAsync ( entity , null , cancellationToken ) ;
@@ -234,6 +305,8 @@ public Task InsertAsync(Entity entity, CancellationToken cancellationToken)
234
305
/// <param name="entityToUpdate">The <see cref="Entity"/> instance to be updated.</param>
235
306
public void Update ( Entity entityToUpdate )
236
307
{
308
+ EnlistWithCurrentTransactionScope ( ) ;
309
+
237
310
if ( Transaction != null )
238
311
{
239
312
Collection . ReplaceOne ( Transaction . Session , GetIdFilter ( entityToUpdate ) , entityToUpdate ) ;
@@ -260,6 +333,8 @@ public Task UpdateAsync(Entity entityToUpdate)
260
333
/// <param name="cancellationToken">A token used for cancelling propagation.</param>
261
334
public Task UpdateAsync ( Entity entityToUpdate , CancellationToken cancellationToken )
262
335
{
336
+ EnlistWithCurrentTransactionScope ( ) ;
337
+
263
338
return Transaction != null ?
264
339
Collection . ReplaceOneAsync ( Transaction . Session , GetIdFilter ( entityToUpdate ) , entityToUpdate , cancellationToken : cancellationToken )
265
340
: Collection . ReplaceOneAsync ( GetIdFilter ( entityToUpdate ) , entityToUpdate , cancellationToken : cancellationToken ) ;
@@ -271,6 +346,8 @@ public Task UpdateAsync(Entity entityToUpdate, CancellationToken cancellationTok
271
346
/// <param name="id">The primary key of the <see cref="Entity"/> to be deleted.</param>
272
347
public void Delete ( Key id )
273
348
{
349
+ EnlistWithCurrentTransactionScope ( ) ;
350
+
274
351
if ( Transaction != null )
275
352
{
276
353
Collection . DeleteOne ( Transaction . Session , GetIdFilter ( id ) ) ;
@@ -297,6 +374,8 @@ public Task DeleteAsync(Key id)
297
374
/// <param name="cancellationToken">A token used for cancelling propagation.</param>
298
375
public Task DeleteAsync ( Key id , CancellationToken cancellationToken )
299
376
{
377
+ EnlistWithCurrentTransactionScope ( ) ;
378
+
300
379
return Transaction != null ?
301
380
Collection . DeleteOneAsync ( Transaction . Session , GetIdFilter ( id ) , null , cancellationToken )
302
381
: Collection . DeleteOneAsync ( GetIdFilter ( id ) , cancellationToken ) ;
@@ -308,6 +387,8 @@ public Task DeleteAsync(Key id, CancellationToken cancellationToken)
308
387
/// <param name="entityToDelete">The <see cref="Entity"/> instance to be deleted.</param>
309
388
public void Delete ( Entity entityToDelete )
310
389
{
390
+ EnlistWithCurrentTransactionScope ( ) ;
391
+
311
392
if ( Transaction != null )
312
393
{
313
394
Collection . DeleteOne ( Transaction . Session , GetIdFilter ( entityToDelete ) ) ;
@@ -334,7 +415,16 @@ public Task DeleteAsync(Entity entityToDelete)
334
415
/// <param name="cancellationToken">A token used for cancelling propagation.</param>
335
416
public Task DeleteAsync ( Entity entityToDelete , CancellationToken cancellationToken )
336
417
{
337
- return Collection . DeleteOneAsync ( GetIdFilter ( entityToDelete ) , cancellationToken ) ;
418
+ EnlistWithCurrentTransactionScope ( ) ;
419
+
420
+ if ( Transaction != null )
421
+ {
422
+ return Collection . DeleteOneAsync ( Transaction . Session , GetIdFilter ( entityToDelete ) , null , cancellationToken ) ;
423
+ }
424
+ else
425
+ {
426
+ return Collection . DeleteOneAsync ( GetIdFilter ( entityToDelete ) , cancellationToken ) ;
427
+ }
338
428
}
339
429
340
430
/// <summary>
@@ -350,7 +440,16 @@ public IEnumerable<Entity> ListAll()
350
440
/// </summary>
351
441
public IQueryable < Entity > GetQueryable ( )
352
442
{
353
- return GlobalFilter != null ? Collection . AsQueryable ( ) . Where ( GlobalFilter ) : Collection . AsQueryable ( ) ;
443
+ EnlistWithCurrentTransactionScope ( ) ;
444
+
445
+ if ( Transaction != null )
446
+ {
447
+ return GlobalFilter != null ? Collection . AsQueryable ( Transaction . Session ) . Where ( GlobalFilter ) : Collection . AsQueryable ( Transaction . Session ) ;
448
+ }
449
+ else
450
+ {
451
+ return GlobalFilter != null ? Collection . AsQueryable ( ) . Where ( GlobalFilter ) : Collection . AsQueryable ( ) ;
452
+ }
354
453
}
355
454
356
455
/// <summary>
0 commit comments