@@ -1155,7 +1155,7 @@ uint32_t PassiveStream::setDead(end_stream_status_t status) {
1155
1155
uint32_t unackedBytes = clearBuffer ();
1156
1156
LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Setting stream to dead state,"
1157
1157
" last_seqno is %llu, unackedBytes is %u, status is %s" ,
1158
- consumer->logHeader (), vb_, last_seqno, unackedBytes,
1158
+ consumer->logHeader (), vb_, last_seqno. load () , unackedBytes,
1159
1159
getEndStreamStatusStr (status));
1160
1160
return unackedBytes;
1161
1161
}
@@ -1189,7 +1189,7 @@ void PassiveStream::reconnectStream(RCPtr<VBucket> &vb,
1189
1189
start_seqno, end_seqno_, snap_start_seqno_, snap_end_seqno_);
1190
1190
1191
1191
LockHolder lh (streamMutex);
1192
- last_seqno = start_seqno;
1192
+ last_seqno. store ( start_seqno) ;
1193
1193
pushToReadyQ (new StreamRequest (vb_, new_opaque, flags_, start_seqno,
1194
1194
end_seqno_, vb_uuid_, snap_start_seqno_,
1195
1195
snap_end_seqno_));
@@ -1216,29 +1216,29 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
1216
1216
{
1217
1217
MutationResponse* m = static_cast <MutationResponse*>(resp);
1218
1218
uint64_t bySeqno = m->getBySeqno ();
1219
- if (bySeqno <= last_seqno) {
1219
+ if (bySeqno <= last_seqno. load () ) {
1220
1220
LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous (out of "
1221
1221
" sequence) mutation received, with opaque: %ld, its "
1222
1222
" seqno (%llu) is not greater than last received seqno "
1223
1223
" (%llu); Dropping mutation!" , consumer->logHeader (),
1224
- vb_, opaque_, bySeqno, last_seqno);
1224
+ vb_, opaque_, bySeqno, last_seqno. load () );
1225
1225
delete m;
1226
1226
return ENGINE_ERANGE;
1227
1227
}
1228
- last_seqno = bySeqno;
1228
+ last_seqno. store ( bySeqno) ;
1229
1229
break ;
1230
1230
}
1231
1231
case DCP_SNAPSHOT_MARKER:
1232
1232
{
1233
1233
SnapshotMarker* s = static_cast <SnapshotMarker*>(resp);
1234
1234
uint64_t snapStart = s->getStartSeqno ();
1235
1235
uint64_t snapEnd = s->getEndSeqno ();
1236
- if (snapStart < last_seqno && snapEnd <= last_seqno) {
1236
+ if (snapStart < last_seqno. load () && snapEnd <= last_seqno. load () ) {
1237
1237
LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous snapshot "
1238
1238
" marker received, with opaque: %ld, its start (%llu), and"
1239
1239
" end (%llu) are less than last received seqno (%llu); "
1240
1240
" Dropping marker!" , consumer->logHeader (), vb_, opaque_,
1241
- snapStart, snapEnd, last_seqno);
1241
+ snapStart, snapEnd, last_seqno. load () );
1242
1242
delete s;
1243
1243
return ENGINE_ERANGE;
1244
1244
}
@@ -1333,19 +1333,20 @@ ENGINE_ERROR_CODE PassiveStream::processMutation(MutationResponse* mutation) {
1333
1333
return ENGINE_NOT_MY_VBUCKET;
1334
1334
}
1335
1335
1336
- if (mutation->getBySeqno () > cur_snapshot_end) {
1336
+ if (mutation->getBySeqno () > cur_snapshot_end. load () ) {
1337
1337
LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous mutation [sequence "
1338
1338
" number (%llu) greater than current snapshot end seqno (%llu)] "
1339
1339
" being processed; Dropping the mutation!" , consumer->logHeader (),
1340
- vb_, mutation->getBySeqno (), cur_snapshot_end);
1340
+ vb_, mutation->getBySeqno (), cur_snapshot_end. load () );
1341
1341
return ENGINE_ERANGE;
1342
1342
}
1343
1343
1344
1344
ENGINE_ERROR_CODE ret;
1345
1345
if (saveSnapshot) {
1346
1346
LockHolder lh = vb->getSnapshotLock ();
1347
1347
ret = commitMutation (mutation, vb->isBackfillPhase ());
1348
- vb->setCurrentSnapshot_UNLOCKED (cur_snapshot_start, cur_snapshot_end);
1348
+ vb->setCurrentSnapshot_UNLOCKED (cur_snapshot_start.load (),
1349
+ cur_snapshot_end.load ());
1349
1350
saveSnapshot = false ;
1350
1351
lh.unlock ();
1351
1352
} else {
@@ -1385,19 +1386,20 @@ ENGINE_ERROR_CODE PassiveStream::processDeletion(MutationResponse* deletion) {
1385
1386
return ENGINE_NOT_MY_VBUCKET;
1386
1387
}
1387
1388
1388
- if (deletion->getBySeqno () > cur_snapshot_end) {
1389
+ if (deletion->getBySeqno () > cur_snapshot_end. load () ) {
1389
1390
LOG (EXTENSION_LOG_WARNING, " %s (vb %d) Erroneous deletion [sequence "
1390
1391
" number (%llu) greater than current snapshot end seqno (%llu)] "
1391
1392
" being processed; Dropping the deletion!" , consumer->logHeader (),
1392
- vb_, deletion->getBySeqno (), cur_snapshot_end);
1393
+ vb_, deletion->getBySeqno (), cur_snapshot_end. load () );
1393
1394
return ENGINE_ERANGE;
1394
1395
}
1395
1396
1396
1397
ENGINE_ERROR_CODE ret;
1397
1398
if (saveSnapshot) {
1398
1399
LockHolder lh = vb->getSnapshotLock ();
1399
1400
ret = commitDeletion (deletion, vb->isBackfillPhase ());
1400
- vb->setCurrentSnapshot_UNLOCKED (cur_snapshot_start, cur_snapshot_end);
1401
+ vb->setCurrentSnapshot_UNLOCKED (cur_snapshot_start.load (),
1402
+ cur_snapshot_end.load ());
1401
1403
saveSnapshot = false ;
1402
1404
lh.unlock ();
1403
1405
} else {
@@ -1435,9 +1437,9 @@ ENGINE_ERROR_CODE PassiveStream::commitDeletion(MutationResponse* deletion,
1435
1437
void PassiveStream::processMarker (SnapshotMarker* marker) {
1436
1438
RCPtr<VBucket> vb = engine->getVBucket (vb_);
1437
1439
1438
- cur_snapshot_start = marker->getStartSeqno ();
1439
- cur_snapshot_end = marker->getEndSeqno ();
1440
- cur_snapshot_type = ( marker->getFlags () & MARKER_FLAG_DISK) ? disk : memory;
1440
+ cur_snapshot_start. store ( marker->getStartSeqno () );
1441
+ cur_snapshot_end. store ( marker->getEndSeqno () );
1442
+ cur_snapshot_type. store (( marker->getFlags () & MARKER_FLAG_DISK) ? disk : memory) ;
1441
1443
saveSnapshot = true ;
1442
1444
1443
1445
if (vb) {
@@ -1474,8 +1476,8 @@ void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1474
1476
}
1475
1477
1476
1478
void PassiveStream::handleSnapshotEnd (RCPtr<VBucket>& vb, uint64_t byseqno) {
1477
- if (byseqno == cur_snapshot_end) {
1478
- if (cur_snapshot_type == disk && vb->isBackfillPhase ()) {
1479
+ if (byseqno == cur_snapshot_end. load () ) {
1480
+ if (cur_snapshot_type. load () == disk && vb->isBackfillPhase ()) {
1479
1481
vb->setBackfillPhase (false );
1480
1482
uint64_t id = vb->checkpointManager .getOpenCheckpointId () + 1 ;
1481
1483
vb->checkpointManager .checkAndAddNewCheckpoint (id, vb);
@@ -1499,7 +1501,7 @@ void PassiveStream::handleSnapshotEnd(RCPtr<VBucket>& vb, uint64_t byseqno) {
1499
1501
}
1500
1502
cur_snapshot_ack = false ;
1501
1503
}
1502
- cur_snapshot_type = none;
1504
+ cur_snapshot_type. store ( none) ;
1503
1505
vb->setCurrentSnapshot (byseqno, byseqno);
1504
1506
}
1505
1507
}
@@ -1523,18 +1525,18 @@ void PassiveStream::addStats(ADD_STAT add_stat, const void *c) {
1523
1525
snprintf (buf, bsize, " %s:stream_%d_items_ready" , name_.c_str (), vb_);
1524
1526
add_casted_stat (buf, itemsReady.load () ? " true" : " false" , add_stat, c);
1525
1527
snprintf (buf, bsize, " %s:stream_%d_last_received_seqno" , name_.c_str (), vb_);
1526
- add_casted_stat (buf, last_seqno, add_stat, c);
1528
+ add_casted_stat (buf, last_seqno. load () , add_stat, c);
1527
1529
snprintf (buf, bsize, " %s:stream_%d_ready_queue_memory" , name_.c_str (), vb_);
1528
1530
add_casted_stat (buf, getReadyQueueMemory (), add_stat, c);
1529
1531
1530
1532
snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_type" , name_.c_str (), vb_);
1531
- add_casted_stat (buf, snapshotTypeToString (cur_snapshot_type), add_stat, c);
1533
+ add_casted_stat (buf, snapshotTypeToString (cur_snapshot_type. load () ), add_stat, c);
1532
1534
1533
- if (cur_snapshot_type != none) {
1535
+ if (cur_snapshot_type. load () != none) {
1534
1536
snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_start" , name_.c_str (), vb_);
1535
- add_casted_stat (buf, cur_snapshot_start, add_stat, c);
1537
+ add_casted_stat (buf, cur_snapshot_start. load () , add_stat, c);
1536
1538
snprintf (buf, bsize, " %s:stream_%d_cur_snapshot_end" , name_.c_str (), vb_);
1537
- add_casted_stat (buf, cur_snapshot_end, add_stat, c);
1539
+ add_casted_stat (buf, cur_snapshot_end. load () , add_stat, c);
1538
1540
}
1539
1541
}
1540
1542
0 commit comments