Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -830,9 +830,9 @@ TEST_F(GenericFamilyTest, Dump) {
ASSERT_EQ(RDB_SER_VERSION, 9);
uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13,
0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e};
uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00,
0x02, 0x00, 0x00, 0xfe, 0x13, 0x03, 0xc0, 0xd2, 0x04, 0xff,
0x09, 0x00, 0xb1, 0x0b, 0xae, 0x6c, 0x23, 0x5d, 0x17, 0xaa};
uint8_t EXPECTED_HASH_DUMP[] = {0x10, 0xc, 0xc, 0x0, 0x0, 0x0, 0x2, 0x0,
0x13, 0x1, 0xc4, 0xd2, 0x2, 0xff, 0x9, 0x0,
0x68, 0x4d, 0x73, 0xa4, 0xf, 0x23, 0x4f, 0xc7};

uint8_t EXPECTED_LIST_DUMP[] = {0x12, 0x01, 0x02, '\t', '\t', 0x00, 0x00, 0x00,
0x01, 0x00, 0x14, 0x01, 0xff, '\t', 0x00, 0xfb,
Expand All @@ -843,19 +843,19 @@ TEST_F(GenericFamilyTest, Dump) {
EXPECT_EQ(resp, "OK");
resp = Run({"dump", "z"});
auto dump = resp.GetBuf();
CHECK_EQ(ToSV(dump), ToSV(EXPECTED_STRING_DUMP));
ASSERT_EQ(ToSV(dump), ToSV(EXPECTED_STRING_DUMP));

// Check list dump
EXPECT_EQ(1, CheckedInt({"rpush", "l", "20"}));
resp = Run({"dump", "l"});
dump = resp.GetBuf();
CHECK_EQ(ToSV(dump), ToSV(EXPECTED_LIST_DUMP)) << absl::CHexEscape(resp.GetString());
ASSERT_EQ(ToSV(dump), ToSV(EXPECTED_LIST_DUMP)) << absl::CHexEscape(resp.GetString());

// Check for hash dump
EXPECT_EQ(1, CheckedInt({"hset", "z2", "19", "1234"}));
resp = Run({"dump", "z2"});
dump = resp.GetBuf();
CHECK_EQ(ToSV(dump), ToSV(EXPECTED_HASH_DUMP));
ASSERT_EQ(ToSV(dump), ToSV(EXPECTED_HASH_DUMP));

// Check that when running with none existing key we're getting nil
resp = Run({"dump", "foo"});
Expand All @@ -882,7 +882,7 @@ TEST_F(GenericFamilyTest, Restore) {
// note that value for expiration is just some valid unix time stamp from the pass
resp = Run(
{"restore", "exiting-key", "1665476212900", ToSV(STRING_DUMP_REDIS), "ABSTTL", "REPLACE"});
CHECK_EQ(resp, "OK");
ASSERT_EQ(resp, "OK");
resp = Run({"get", "exiting-key"});
EXPECT_EQ(resp.type, RespExpr::NIL); // it was deleted as a result of restore action

Expand All @@ -893,7 +893,7 @@ TEST_F(GenericFamilyTest, Restore) {
EXPECT_EQ("1234", resp);
resp = Run({"dump", "new-key"});
auto dump = resp.GetBuf();
CHECK_EQ(ToSV(dump), ToSV(STRING_DUMP_REDIS));
ASSERT_EQ(ToSV(dump), ToSV(STRING_DUMP_REDIS));

// test for list
EXPECT_EQ(1, CheckedInt({"rpush", "orig-list", "20"}));
Expand Down
4 changes: 2 additions & 2 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST || rdb_type_ == RDB_TYPE_HASH_LISTPACK) {
unsigned char* lp = lpNew(blob.size());
switch (rdb_type_) {
case RDB_TYPE_HASH_ZIPLIST:
case RDB_TYPE_HASH_ZIPLIST: // legacy format
if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(),
&lp)) {
LOG(ERROR) << "Zset ziplist integrity check failed.";
Expand Down Expand Up @@ -951,7 +951,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
}
return;
} else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) {
} else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) { // legacy format
unsigned char* lp = lpNew(blob.size());
if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) {
LOG(ERROR) << "Zset ziplist integrity check failed.";
Expand Down
57 changes: 11 additions & 46 deletions src/server/rdb_save.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ extern "C" {
#include "redis/rdb.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
#include "redis/zset.h"
}
Expand Down Expand Up @@ -49,15 +48,9 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");

ABSL_RETIRED_FLAG(
bool, list_rdb_encode_v2, true,
"V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb "
"enconding of list uses ziplist encoding compatible with redis 6");

// TODO: to retire this flag in v1.31
ABSL_FLAG(bool, stream_rdb_encode_v2, true,
"V2 uses format, compatible with redis 7.2 and Dragonfly v1.26+, while v1 format "
"is compatible with redis 6");
ABSL_RETIRED_FLAG(bool, stream_rdb_encode_v2, true,
"Retired. Uses format, compatible with redis 7.2 and Dragonfly v1.26+");

namespace dfly {

Expand Down Expand Up @@ -189,13 +182,13 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
break;
case OBJ_ZSET:
if (compact_enc == OBJ_ENCODING_LISTPACK)
return RDB_TYPE_ZSET_ZIPLIST; // we save using the old ziplist encoding.
return RDB_TYPE_ZSET_LISTPACK;
else if (compact_enc == OBJ_ENCODING_SKIPLIST)
return RDB_TYPE_ZSET_2;
break;
case OBJ_HASH:
if (compact_enc == kEncodingListPack)
return RDB_TYPE_HASH_ZIPLIST;
return RDB_TYPE_HASH_LISTPACK;
else if (compact_enc == kEncodingStrMap2) {
if (((StringMap*)pv.RObjPtr())->ExpirationUsed())
return RDB_TYPE_HASH_WITH_EXPIRY; // Incompatible with Redis
Expand All @@ -204,8 +197,7 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
}
break;
case OBJ_STREAM:
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3
: RDB_TYPE_STREAM_LISTPACKS;
return RDB_TYPE_STREAM_LISTPACKS_3;
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
Expand Down Expand Up @@ -459,7 +451,8 @@ error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) {
CHECK_EQ(kEncodingListPack, pv.Encoding());

uint8_t* lp = (uint8_t*)pv.RObjPtr();
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
size_t lp_bytes = lpBytes(lp);
RETURN_ON_ERR(SaveString((uint8_t*)lp, lp_bytes));
}

return error_code{};
Expand Down Expand Up @@ -496,9 +489,11 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) {
return true;
});
} else {
CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding";
CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK));
uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj();
RETURN_ON_ERR(SaveListPackAsZiplist(lp));
size_t lp_bytes = lpBytes(lp);

RETURN_ON_ERR(SaveString((uint8_t*)lp, lp_bytes));
}

return error_code{};
Expand Down Expand Up @@ -665,36 +660,6 @@ error_code RdbSerializer::SaveBinaryDouble(double val) {
return WriteRaw(Bytes{buf, sizeof(buf)});
}

error_code RdbSerializer::SaveListPackAsZiplist(uint8_t* lp) {
uint8_t* lpfield = lpFirst(lp);
int64_t entry_len;
uint8_t* entry;
uint8_t buf[32];
uint8_t* zl = ziplistNew();

while (lpfield) {
entry = lpGet(lpfield, &entry_len, buf);
zl = ziplistPush(zl, entry, entry_len, ZIPLIST_TAIL);
lpfield = lpNext(lp, lpfield);
}
size_t ziplen = ziplistBlobLen(zl);
error_code ec = SaveString(string_view{reinterpret_cast<char*>(zl), ziplen});
zfree(zl);

return ec;
}

error_code RdbSerializer::SavePlainNodeAsZiplist(const quicklistNode* node) {
uint8_t* zl = ziplistNew();
zl = ziplistPush(zl, node->entry, node->sz, ZIPLIST_TAIL);

size_t ziplen = ziplistBlobLen(zl);
error_code ec = SaveString(string_view{reinterpret_cast<char*>(zl), ziplen});
zfree(zl);

return ec;
}

error_code RdbSerializer::SaveStreamPEL(rax* pel, bool nacks) {
/* Number of entries in the PEL. */

Expand Down
2 changes: 0 additions & 2 deletions src/server/rdb_save.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,8 @@ class RdbSerializer : public SerializerBase {

std::error_code SaveLongLongAsString(int64_t value);
std::error_code SaveBinaryDouble(double val);
std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(bool save_active, streamCG* cg);
std::error_code SavePlainNodeAsZiplist(const quicklistNode* node);

// Might preempt
void FlushIfNeeded(FlushState flush_state);
Expand Down
3 changes: 0 additions & 3 deletions src/server/stream_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ using namespace testing;
using namespace std;
using namespace util;

ABSL_DECLARE_FLAG(bool, stream_rdb_encode_v2);

namespace dfly {

const auto kMatchNil = ArgType(RespExpr::NIL);
Expand Down Expand Up @@ -1275,7 +1273,6 @@ TEST_F(StreamFamilyTest, SeenActiveTime) {
EXPECT_THAT(consumers, RespElementsAre("name", "Alice", "seen-time", IntArg(1250), "active-time",
IntArg(1100), "pel-count", IntArg(1), "pending", _));

absl::SetFlag(&FLAGS_stream_rdb_encode_v2, true);
resp = Run({"DUMP", "mystream"});
Run({"del", "mystream"});
resp = Run({"RESTORE", "mystream", "0", resp.GetString()});
Expand Down
Loading