Skip to content

Commit d7977c4

Browse files
JohnSullyJohn Sully
authored andcommitted
Fastsync in-mem initial implementation
1 parent 7ae27f6 commit d7977c4

7 files changed

Lines changed: 94 additions & 47 deletions

File tree

src/SnapshotPayloadParseState.cpp

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,14 +136,33 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
136136
int idb = current_database;
137137
serverAssert(vecqueuedKeys.size() == vecqueuedVals.size());
138138
auto sizePrev = vecqueuedKeys.size();
139-
(*insertsInFlight)++;
140-
std::weak_ptr<std::atomic<int>> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
141139
if (current_database < cserver.dbnum) {
142-
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
143-
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
144-
(*(insertsInFlightTmp.lock()))--;
145-
delete pallocator;
146-
});
140+
if (g_pserver->m_pstorageFactory) {
141+
(*insertsInFlight)++;
142+
std::weak_ptr<std::atomic<int>> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
143+
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
144+
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
145+
(*(insertsInFlightTmp.lock()))--;
146+
delete pallocator;
147+
});
148+
} else {
149+
for (size_t ival = 0; ival < vecqueuedKeys.size(); ++ival) {
150+
size_t offset = 0;
151+
auto spexpire = deserializeExpire(vecqueuedVals[ival], vecqueuedValsCb[ival], &offset);
152+
auto o = deserializeStoredObject(vecqueuedVals[ival] + offset, vecqueuedValsCb[ival] - offset);
153+
sds sdsKey = sdsnewlen(vecqueuedKeys[ival], -static_cast<ssize_t>(vecqueuedKeysCb[ival]));
154+
if (dbMerge(g_pserver->db[idb], sdsKey, o, false)) {
155+
if (spexpire != nullptr)
156+
g_pserver->db[idb]->setExpire(sdsKey, std::move(*spexpire));
157+
} else {
158+
sdsfree(sdsKey);
159+
}
160+
}
161+
vecqueuedKeys.clear();
162+
vecqueuedKeysCb.clear();
163+
vecqueuedVals.clear();
164+
vecqueuedValsCb.clear();
165+
}
147166
} else {
148167
// else drop the data
149168
vecqueuedKeys.clear();

src/config.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2983,6 +2983,7 @@ standardConfig configs[] = {
29832983
createIntConfig("overload-protect-percent", NULL, MODIFIABLE_CONFIG, 0, 200, g_pserver->overload_protect_threshold, 0, INTEGER_CONFIG, NULL, NULL),
29842984
createIntConfig("force-eviction-percent", NULL, MODIFIABLE_CONFIG, 0, 100, g_pserver->force_eviction_percent, 0, INTEGER_CONFIG, NULL, NULL),
29852985
createBoolConfig("enable-async-rehash", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_rehash, 1, NULL, NULL),
2986+
createBoolConfig("enable-keydb-fastsync", NULL, MODIFIABLE_CONFIG, g_pserver->fEnableFastSync, 1, NULL, NULL),
29862987

29872988
#ifdef USE_OPENSSL
29882989
createIntConfig("tls-port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, updateTLSPort), /* TCP port. */

src/db.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ int expireIfNeeded(redisDb *db, robj *key, robj *o);
5656
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
5757

5858
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
59-
sds serializeStoredObjectAndExpire(robj_roptr o);
6059

6160
dictType dictChangeDescType {
6261
dictSdsHash, /* hash function */

src/replication.cpp

Lines changed: 52 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1019,8 +1019,6 @@ class replicationBuffer {
10191019
void addReplica(client *replica) {
10201020
replicas.push_back(replica);
10211021
replicationSetupSlaveForFullResync(replica,getPsyncInitialOffset());
1022-
// Optimize the socket for bulk transfer
1023-
//connDisableTcpNoDelay(replica->conn);
10241022
}
10251023

10261024
bool isActive() const { return !replicas.empty(); }
@@ -1093,7 +1091,7 @@ class replicationBuffer {
10931091
reply->used += size;
10941092
}
10951093

1096-
void addLongWithPrefix(long val, char prefix) {
1094+
void addLongLongWithPrefix(long long val, char prefix) {
10971095
char buf[128];
10981096
int len;
10991097

@@ -1105,15 +1103,19 @@ class replicationBuffer {
11051103
}
11061104

11071105
void addArrayLen(int len) {
1108-
addLongWithPrefix(len, '*');
1106+
addLongLongWithPrefix(len, '*');
11091107
}
11101108

11111109
void addLong(long val) {
1112-
addLongWithPrefix(val, ':');
1110+
addLongLongWithPrefix(val, ':');
1111+
}
1112+
1113+
void addLongLong(long long val) {
1114+
addLongLongWithPrefix(val, ':');
11131115
}
11141116

11151117
void addString(const char *s, unsigned long len) {
1116-
addLongWithPrefix(len, '$');
1118+
addLongLongWithPrefix(len, '$');
11171119
addData(s, len);
11181120
addData("\r\n", 2);
11191121
}
@@ -1149,22 +1151,19 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
11491151
// TODO: This needs to be on a background thread
11501152
int retval = C_OK;
11511153
serverAssert(GlobalLocksAcquired());
1152-
serverLog(LL_NOTICE, "Starting storage provider fast full sync with target: %s", "disk");
1154+
serverLog(LL_NOTICE, "Starting fast full sync with target: %s", "disk");
11531155

11541156
std::shared_ptr<replicationBuffer> spreplBuf = std::make_shared<replicationBuffer>();
11551157
listNode *ln;
11561158
listIter li;
1157-
client *replica = nullptr;
11581159
listRewind(g_pserver->slaves, &li);
1159-
while (replica == nullptr && (ln = listNext(&li))) {
1160+
while ((ln = listNext(&li))) {
11601161
client *replicaCur = (client*)listNodeValue(ln);
11611162
if ((replicaCur->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && (replicaCur->replstate == SLAVE_STATE_WAIT_BGSAVE_START)) {
1162-
replica = replicaCur;
1163-
spreplBuf->addReplica(replica);
1163+
spreplBuf->addReplica(replicaCur);
11641164
replicaCur->replstate = SLAVE_STATE_FASTSYNC_TX;
11651165
}
11661166
}
1167-
serverAssert(replica != nullptr);
11681167

11691168
spreplBuf->addArrayLen(2); // Two sections: Metadata and databases
11701169

@@ -1179,7 +1178,7 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
11791178
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE);
11801179
spreplBuf->addArrayLen(2);
11811180
spreplBuf->addString("repl-offset", 11);
1182-
spreplBuf->addLong(rsi->master_repl_offset);
1181+
spreplBuf->addLongLong(rsi->master_repl_offset);
11831182

11841183
if (dictSize(g_pserver->lua_scripts)) {
11851184
dictEntry *de;
@@ -1195,13 +1194,13 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
11951194
di = NULL; /* So that we don't release it again on error. */
11961195
}
11971196

1198-
std::shared_ptr<std::vector<std::unique_ptr<const StorageCache>>> spvecspsnapshot = std::make_shared<std::vector<std::unique_ptr<const StorageCache>>>();
1197+
std::shared_ptr<std::vector<const redisDbPersistentDataSnapshot*>> spvecsnapshot = std::make_shared<std::vector<const redisDbPersistentDataSnapshot*>>();
11991198
for (int idb = 0; idb < cserver.dbnum; ++idb) {
1200-
spvecspsnapshot->emplace_back(g_pserver->db[idb]->CloneStorageCache());
1199+
spvecsnapshot->emplace_back(g_pserver->db[idb]->createSnapshot(getMvccTstamp(), false));
12011200
}
12021201
aeReleaseLock();
12031202

1204-
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecspsnapshot = std::move(spvecspsnapshot)]{
1203+
g_pserver->asyncworkqueue->AddWorkFunction([spreplBuf = std::move(spreplBuf), spvecsnapshot = std::move(spvecsnapshot)]{
12051204
int retval = C_OK;
12061205
auto timeStart = ustime();
12071206
auto lastLogTime = timeStart;
@@ -1211,15 +1210,16 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
12111210
// Databases
12121211
replBuf.addArrayLen(cserver.dbnum);
12131212
for (int idb = 0; idb < cserver.dbnum; ++idb) {
1214-
auto &spsnapshot = (*spvecspsnapshot)[idb];
1215-
size_t snapshotDeclaredCount = spsnapshot->count();
1213+
auto &spsnapshot = (*spvecsnapshot)[idb];
1214+
size_t snapshotDeclaredCount = spsnapshot->size();
12161215
replBuf.addArrayLen(snapshotDeclaredCount);
12171216
size_t count = 0;
1218-
bool result = spsnapshot->enumerate([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *rgchKey, size_t cchKey, const void *rgchVal, size_t cchVal) -> bool{
1217+
bool result = spsnapshot->iterate_threadsafe([&replBuf, &count, &cbData, &lastLogTime, &cbLastUpdate](const char *strKey, robj_roptr o) -> bool{
12191218
replBuf.addArrayLen(2);
12201219

1221-
replBuf.addString(rgchKey, cchKey);
1222-
replBuf.addString((const char *)rgchVal, cchVal);
1220+
replBuf.addString(strKey, sdslen(strKey));
1221+
sds strT = serializeStoredObjectAndExpire(o);
1222+
replBuf.addString(strT, sdslen(strT));
12231223
++count;
12241224
if ((count % 8092) == 0) {
12251225
auto curTime = ustime();
@@ -1230,7 +1230,8 @@ int rdbSaveSnapshotForReplication(rdbSaveInfo *rsi) {
12301230
lastLogTime = ustime();
12311231
}
12321232
}
1233-
cbData += cchKey + cchVal;
1233+
cbData += sdslen(strKey) + sdslen(strT);
1234+
sdsfree(strT);
12341235
return replBuf.isActive();
12351236
});
12361237

@@ -1302,7 +1303,7 @@ int startBgsaveForReplication(int mincapa) {
13021303
/* Only do rdbSave* when rsiptr is not NULL,
13031304
* otherwise replica will miss repl-stream-db. */
13041305
if (rsiptr) {
1305-
if (mincapa & SLAVE_CAPA_KEYDB_FASTSYNC && g_pserver->m_pstorageFactory)
1306+
if (mincapa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled())
13061307
retval = rdbSaveSnapshotForReplication(rsiptr);
13071308
else if (socket_target)
13081309
retval = rdbSaveToSlavesSockets(rsiptr);
@@ -1481,7 +1482,7 @@ void syncCommand(client *c) {
14811482
}
14821483

14831484
/* CASE 0: Fast Sync */
1484-
if ((c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && g_pserver->m_pstorageFactory) {
1485+
if (c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC && FFastSyncEnabled()) {
14851486
serverLog(LL_NOTICE,"Fast SYNC on next replication cycle");
14861487
/* CASE 1: BGSAVE is in progress, with disk target. */
14871488
} else if (g_pserver->FRdbSaveInProgress() &&
@@ -1672,7 +1673,7 @@ void replconfCommand(client *c) {
16721673
c->slave_capa |= SLAVE_CAPA_PSYNC2;
16731674
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
16741675
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
1675-
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "rocksdb-snapshot-load"))
1676+
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "keydb-fastsync"))
16761677
c->slave_capa |= SLAVE_CAPA_KEYDB_FASTSYNC;
16771678

16781679
fCapaCommand = true;
@@ -1738,10 +1739,14 @@ void replconfCommand(client *c) {
17381739

17391740
if (fCapaCommand) {
17401741
sds reply = sdsnew("+OK");
1741-
if (g_pserver->fActiveReplica)
1742+
if (g_pserver->fActiveReplica) {
17421743
reply = sdscat(reply, " active-replica");
1743-
if (g_pserver->m_pstorageFactory && (c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && !g_pserver->fActiveReplica)
1744-
reply = sdscat(reply, " rocksdb-snapshot-save");
1744+
}
1745+
if ((c->slave_capa & SLAVE_CAPA_KEYDB_FASTSYNC) && FFastSyncEnabled()) {
1746+
reply = sdscat(reply, " keydb-fastsync-save");
1747+
} else {
1748+
c->slave_capa = (c->slave_capa & (~SLAVE_CAPA_KEYDB_FASTSYNC)); // never try to fast sync for this as they won't expect it
1749+
}
17451750
reply = sdscat(reply, "\r\n");
17461751
addReplySds(c, reply);
17471752
} else {
@@ -2515,6 +2520,7 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
25152520
}
25162521
}
25172522

2523+
serverAssert(mi->parseState != nullptr);
25182524
for (int iter = 0; iter < 10; ++iter) {
25192525
if (mi->parseState->shouldThrottle())
25202526
return false;
@@ -2525,11 +2531,16 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
25252531

25262532
auto nread = connRead(conn, mi->bulkreadBuffer+qblen, readlen);
25272533
if (nread <= 0) {
2528-
if (connGetState(conn) != CONN_STATE_CONNECTED)
2534+
if (connGetState(conn) != CONN_STATE_CONNECTED) {
2535+
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
2536+
(nread == -1) ? strerror(errno) : "connection lost");
25292537
cancelReplicationHandshake(mi, true);
2538+
}
25302539
return false;
25312540
}
2541+
g_pserver->stat_net_input_bytes += nread;
25322542
mi->repl_transfer_lastio = g_pserver->unixtime;
2543+
mi->repl_transfer_read += nread;
25332544
sdsIncrLen(mi->bulkreadBuffer,nread);
25342545

25352546
size_t offset = 0;
@@ -2601,8 +2612,9 @@ bool readSnapshotBulkPayload(connection *conn, redisMaster *mi, rdbSaveInfo &rsi
26012612
rsi.repl_stream_db = mi->parseState->getMetaDataLongLong("repl-stream-db");
26022613
rsi.repl_offset = mi->parseState->getMetaDataLongLong("repl-offset");
26032614
sds str = mi->parseState->getMetaDataStr("repl-id");
2604-
if (sdslen(str) == CONFIG_RUN_ID_SIZE+1) {
2615+
if (sdslen(str) == CONFIG_RUN_ID_SIZE) {
26052616
memcpy(rsi.repl_id, str, CONFIG_RUN_ID_SIZE+1);
2617+
rsi.repl_id_is_set = 1;
26062618
}
26072619

26082620
fFinished = true;
@@ -3018,13 +3030,11 @@ void readSyncBulkPayload(connection *conn) {
30183030
/* We need to handle the case where the initial querybuf data was read by fast sync */
30193031
/* This should match the work readQueryFromClient would do for a master client */
30203032
mi->master->querybuf = sdscatsds(mi->master->querybuf, mi->bulkreadBuffer);
3033+
mi->master->pending_querybuf = sdscatsds(mi->master->pending_querybuf, mi->bulkreadBuffer);
3034+
mi->master->read_reploff += sdslen(mi->bulkreadBuffer);
3035+
30213036
sdsfree(mi->bulkreadBuffer);
30223037
mi->bulkreadBuffer = nullptr;
3023-
3024-
mi->master->pending_querybuf = sdscatlen(mi->master->pending_querybuf,
3025-
mi->master->querybuf,sdslen(mi->master->querybuf));
3026-
3027-
mi->master->read_reploff += sdslen(mi->master->querybuf);
30283038
}
30293039
mi->repl_transfer_s = nullptr;
30303040
mi->repl_state = REPL_STATE_CONNECTED;
@@ -3402,7 +3412,7 @@ void parseMasterCapa(redisMaster *mi, sds strcapa)
34023412
// Parse the word
34033413
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
34043414
mi->isActive = true;
3405-
} else if (strncmp(szStart, "rocksdb-snapshot-save", pchEnd - szStart) == 0) {
3415+
} else if (strncmp(szStart, "keydb-fastsync-save", pchEnd - szStart) == 0) {
34063416
mi->isKeydbFastsync = true;
34073417
}
34083418
szStart = pchEnd + 1;
@@ -3550,9 +3560,9 @@ void syncWithMaster(connection *conn) {
35503560
"capa","psync2",
35513561
"capa","activeExpire",
35523562
};
3553-
if (g_pserver->m_pstorageFactory && !g_pserver->fActiveReplica && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
3563+
if (FFastSyncEnabled() && g_pserver->repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
35543564
veccapabilities.push_back("capa");
3555-
veccapabilities.push_back("rocksdb-snapshot-load");
3565+
veccapabilities.push_back("keydb-fastsync");
35563566
}
35573567

35583568
err = sendCommandArgv(conn, veccapabilities.size(), veccapabilities.data(), nullptr);
@@ -3864,6 +3874,10 @@ int cancelReplicationHandshake(redisMaster *mi, int reconnect) {
38643874
delete mi->parseState;
38653875
mi->parseState = nullptr;
38663876
}
3877+
if (mi->bulkreadBuffer) {
3878+
sdsfree(mi->bulkreadBuffer);
3879+
mi->bulkreadBuffer = nullptr;
3880+
}
38673881

38683882
if (mi->repl_state == REPL_STATE_TRANSFER) {
38693883
replicationAbortSyncTransfer(mi);

src/sds.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,10 @@ class sdsstring : public sdsview
418418
sdsstring &operator=(const sdsstring &other)
419419
{
420420
sdsfree(m_str);
421-
m_str = sdsdup(other.m_str);
421+
if (other.m_str != nullptr)
422+
m_str = sdsdup(other.m_str);
423+
else
424+
m_str = nullptr;
422425
return *this;
423426
}
424427

src/server.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2688,6 +2688,7 @@ struct redisServer {
26882688

26892689
int fActiveReplica; /* Can this replica also be a master? */
26902690
int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */
2691+
int fEnableFastSync = true;
26912692

26922693
// Format:
26932694
// Lower 20 bits: a counter incrementing for each command executed in the same millisecond
@@ -3182,6 +3183,7 @@ void trimStringObjectIfNeeded(robj *o);
31823183
robj *deserializeStoredObject(const void *data, size_t cb);
31833184
std::unique_ptr<expireEntry> deserializeExpire(const char *str, size_t cch, size_t *poffset);
31843185
sds serializeStoredObject(robj_roptr o, sds sdsPrefix = nullptr);
3186+
sds serializeStoredObjectAndExpire(robj_roptr o);
31853187

31863188
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
31873189

@@ -3971,6 +3973,10 @@ inline int ielFromEventLoop(const aeEventLoop *eventLoop)
39713973
return iel;
39723974
}
39733975

3976+
inline bool FFastSyncEnabled() {
3977+
return g_pserver->fEnableFastSync && !g_pserver->fActiveReplica;
3978+
}
3979+
39743980
inline int FCorrectThread(client *c)
39753981
{
39763982
return (c->conn == nullptr)

tests/integration/replication.tcl

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ start_server {tags {"repl"}} {
584584
set master [srv 0 client]
585585
$master config set repl-diskless-sync yes
586586
$master config set repl-diskless-sync-delay 1
587+
$master config set enable-keydb-fastsync no
587588
set master_host [srv 0 host]
588589
set master_port [srv 0 port]
589590
set master_pid [srv 0 pid]
@@ -727,6 +728,8 @@ start_server {tags {"repl"}} {
727728
}
728729
}
729730
}
731+
732+
$master config set enable-keydb-fastsync yes
730733
}
731734

732735
if 0 {
@@ -867,8 +870,10 @@ test {replicaof right after disconnection} {
867870
test {Kill rdb child process if its dumping RDB is not useful} {
868871
start_server {tags {"repl"}} {
869872
set slave1 [srv 0 client]
873+
$slave1 config set enable-keydb-fastsync no
870874
start_server {} {
871875
set slave2 [srv 0 client]
876+
$slave2 config set enable-keydb-fastsync no
872877
start_server {} {
873878
set master [srv 0 client]
874879
set master_host [srv 0 host]

0 commit comments

Comments
 (0)