diff --git a/common/Makefile.am b/common/Makefile.am index 4c9a08976..dd693fb65 100644 --- a/common/Makefile.am +++ b/common/Makefile.am @@ -53,10 +53,12 @@ common_libswsscommon_la_SOURCES = \ common/notificationproducer.cpp \ common/linkcache.cpp \ common/portmap.cpp \ + common/publishereventtable.cpp \ common/pubsub.cpp \ common/tokenize.cpp \ common/exec.cpp \ common/saiaclschema.cpp \ + common/subscribereventtable.cpp \ common/subscriberstatetable.cpp \ common/timestamp.cpp \ common/warm_restart.cpp \ diff --git a/common/configdb.cpp b/common/configdb.cpp index ececb2d3f..fe418bf27 100644 --- a/common/configdb.cpp +++ b/common/configdb.cpp @@ -1,9 +1,13 @@ #include #include +#include #include #include "configdb.h" +#include "dbconnector.h" #include "pubsub.h" #include "converter.h" +#include "table.h" +#include "publishereventtable.h" using namespace std; using namespace swss; @@ -19,6 +23,7 @@ void ConfigDBConnector_Native::db_connect(string db_name, bool wait_for_init, bo { m_db_name = db_name; m_key_separator = m_table_name_separator = get_db_separator(db_name); + m_event_tables = SonicDBConfig::getEventTables(m_db_name); SonicV2Connector_Native::connect(m_db_name, retry_on); if (wait_for_init) @@ -62,6 +67,19 @@ void ConfigDBConnector_Native::connect(bool wait_for_init, bool retry_on) db_connect("CONFIG_DB", wait_for_init, retry_on); } +shared_ptr ConfigDBConnector_Native::getTableWriter(const string &table_name) +{ + auto& client = get_redis_client(m_db_name); + if (m_event_tables.find(to_upper(table_name)) != m_event_tables.end()) + { + return make_shared(&client, to_upper(table_name)); + } + else + { + return make_shared
(&client, to_upper(table_name)); + } +} + // Write a table entry to config db. // Remove extra fields in the db which are not in the data. // Args: @@ -71,23 +89,30 @@ void ConfigDBConnector_Native::connect(bool wait_for_init, bool retry_on) // Pass {} as data will delete the entry. void ConfigDBConnector_Native::set_entry(string table, string key, const map& data) { - auto& client = get_redis_client(m_db_name); + auto table_writer = getTableWriter(table); string _hash = to_upper(table) + m_table_name_separator + key; if (data.empty()) { - client.del(_hash); + table_writer->del(key); } else { auto original = get_entry(table, key); - client.hmset(_hash, data.begin(), data.end()); + // Convert map to vector + std::vector values; + values.reserve(data.size()); + for (const auto& kv : data) + { + values.emplace_back(kv.first, kv.second); + } + table_writer->set(key, values); for (auto& it: original) { auto& k = it.first; bool found = data.find(k) != data.end(); if (!found) { - client.hdel(_hash, k); + table_writer->hdel(key, k); } } } @@ -102,15 +127,21 @@ void ConfigDBConnector_Native::set_entry(string table, string key, const map& data) { - auto& client = get_redis_client(m_db_name); - string _hash = to_upper(table) + m_table_name_separator + key; + auto table_writer = getTableWriter(table); if (data.empty()) { - client.del(_hash); + table_writer->del(key); } else { - client.hmset(_hash, data.begin(), data.end()); + // Convert map to vector + std::vector values; + values.reserve(data.size()); + for (const auto& kv : data) + { + values.emplace_back(kv.first, kv.second); + } + table_writer->set(key, values); } } @@ -195,12 +226,12 @@ map> ConfigDBConnector_Native::get_table(string tabl // table: Table name. void ConfigDBConnector_Native::delete_table(string table) { - auto& client = get_redis_client(m_db_name); - string pattern = to_upper(table) + m_table_name_separator + "*"; - const auto& keys = client.keys(pattern); + auto table_writer = getTableWriter(table); + vector keys; + table_writer->getKeys(keys); for (auto& key: keys) { - client.del(key); + table_writer->del(key); } } diff --git a/common/configdb.h b/common/configdb.h index 1611117a8..072490546 100644 --- a/common/configdb.h +++ b/common/configdb.h @@ -2,8 +2,10 @@ #include #include +#include #include "sonicv2connector.h" #include "redistran.h" +#include "table.h" namespace swss { @@ -31,8 +33,11 @@ class ConfigDBConnector_Native : public SonicV2Connector_Native std::string getDbName() const; protected: + std::shared_ptr getTableWriter(const std::string &table_name); + std::string m_table_name_separator = "|"; std::string m_key_separator = "|"; + std::set m_event_tables; std::string m_db_name; }; diff --git a/common/database_config.json b/common/database_config.json index 67b5cda88..186b2b82e 100644 --- a/common/database_config.json +++ b/common/database_config.json @@ -35,7 +35,8 @@ "CONFIG_DB" : { "id" : 4, "separator": "|", - "instance" : "redis" + "instance" : "redis", + "event_tables" : [ "PORT" ] }, "PFC_WD_DB" : { "id" : 5, @@ -50,7 +51,8 @@ "STATE_DB" : { "id" : 6, "separator": "|", - "instance" : "redis" + "instance" : "redis", + "event_tables" : [ "PORT_TABLE", "TRANSCEIVER_INFO" ] }, "SNMP_OVERLAY_DB" : { "id" : 7, diff --git a/common/dbconnector.cpp b/common/dbconnector.cpp index 47fe80d3b..39e63e3f9 100755 --- a/common/dbconnector.cpp +++ b/common/dbconnector.cpp @@ -33,27 +33,42 @@ void SonicDBConfig::parseDatabaseConfig(const string &file, i >> j; for (auto it = j["INSTANCES"].begin(); it!= j["INSTANCES"].end(); it++) { - string instName = it.key(); - string socket; - auto path = it.value().find("unix_socket_path"); - if (path != it.value().end()) - { + string instName = it.key(); + string socket; + auto path = it.value().find("unix_socket_path"); + if (path != it.value().end()) + { socket = *path; - } - string hostname = it.value().at("hostname"); - int port = it.value().at("port"); - inst_entry[instName] = {socket, hostname, port}; + } + string hostname = it.value().at("hostname"); + int port = it.value().at("port"); + inst_entry[instName] = {socket, hostname, port}; } for (auto it = j["DATABASES"].begin(); it!= j["DATABASES"].end(); it++) { - string dbName = it.key(); - string instName = it.value().at("instance"); - int dbId = it.value().at("id"); - string separator = it.value().at("separator"); - db_entry[dbName] = {instName, dbId, separator}; + string dbName = it.key(); + string instName = it.value().at("instance"); + int dbId = it.value().at("id"); + string separator = it.value().at("separator"); + + set eventTables; + auto path = it.value().find("event_tables"); + if (path != it.value().end()) + { + for (auto& eventTable: it.value()["event_tables"]) + { + eventTables.emplace(eventTable); + // printf("Event table: %s %s\n", dbName.c_str(), eventTable.get().c_str()); + } + } + else + { + // printf("Warning: event_tables not found for database %s, using empty set\n", dbName.c_str()); + } + db_entry[dbName] = {instName, dbId, separator, eventTables}; - separator_entry.emplace(dbId, separator); + separator_entry.emplace(dbId, separator); } } @@ -179,8 +194,11 @@ void SonicDBConfig::initialize(const string &file) SonicDBKey empty_key; parseDatabaseConfig(file, inst_entry, db_entry, separator_entry); + // printf("SonicDBConfig::initialize: parsed %zu Redis instances, %zu databases, %zu separators, %lu\n", + // inst_entry.size(), db_entry.size(), separator_entry.size(), db_entry["CONFIG_DB"].eventTables.size()); m_inst_info.emplace(empty_key, std::move(inst_entry)); m_db_info.emplace(empty_key, std::move(db_entry)); + // printf("m_db_info %lu\n", m_db_info[empty_key]["CONFIG_DB"].eventTables.size()); m_db_separator.emplace(empty_key, std::move(separator_entry)); // Set it as the config file is already parsed and init done. @@ -392,6 +410,41 @@ string SonicDBConfig::getSeparator(const DBConnector* db) } } +set SonicDBConfig::getEventTables(const string &dbName, const string &netns, const std::string &containerName) +{ + SonicDBKey key; + // printf("SonicDBConfig::getEventTables: dbName=%s, netns=%s, containerName=%s, %lu\n", + // dbName.c_str(), netns.c_str(), containerName.c_str(), m_db_info[key][dbName].eventTables.size()); + key.netns = netns; + key.containerName = containerName; + // printf("SonicDBConfig::getEventTables: dbName=%s, netns=%s, containerName=%s, %lu\n", + // dbName.c_str(), netns.c_str(), containerName.c_str(), m_db_info[key][dbName].eventTables.size()); + auto ret = getEventTables(dbName, key); + // printf("SonicDBConfig::getEventTables: %lu\n", ret.size()); + return ret; +} + +set SonicDBConfig::getEventTables(const string &dbName, const SonicDBKey &key) +{ + std::lock_guard guard(m_db_info_mutex); + + if (!m_init) + initialize(DEFAULT_SONIC_DB_CONFIG_FILE); + + if (!key.isEmpty()) + { + if (!m_global_init) + { + SWSS_LOG_THROW("Initialize global DB config using API SonicDBConfig::initializeGlobalConfig"); + } + } + // printf("SonicDBConfig::getEventTables2: dbName=%s, %lu\n", + // dbName.c_str(), m_db_info[key][dbName].eventTables.size()); + auto ret = getDbInfo(dbName, key).eventTables; + // printf("SonicDBConfig::getEventTables2: %lu\n", ret.size()); + return ret; +} + string SonicDBConfig::getDbSock(const string &dbName, const string &netns, const std::string &containerName) { SonicDBKey key; diff --git a/common/dbconnector.h b/common/dbconnector.h index 832983ed9..53ab49e7b 100644 --- a/common/dbconnector.h +++ b/common/dbconnector.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,7 @@ class SonicDBInfo std::string instName; int dbId; std::string separator; + std::set eventTables; }; struct SonicDBKey @@ -120,6 +122,8 @@ class SonicDBConfig static std::string getSeparator(int dbId, const std::string &netns = EMPTY_NAMESPACE, const std::string &containerName=EMPTY_CONTAINERNAME); static std::string getSeparator(int dbId, const SonicDBKey &key); static std::string getSeparator(const DBConnector* db); + static std::set getEventTables(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE, const std::string &containerName=EMPTY_CONTAINERNAME); + static std::set getEventTables(const std::string &dbName, const SonicDBKey &key); static std::string getDbSock(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE, const std::string &containerName=EMPTY_CONTAINERNAME); static std::string getDbSock(const std::string &dbName, const SonicDBKey &key); static std::string getDbHostname(const std::string &dbName, const std::string &netns = EMPTY_NAMESPACE, const std::string &containerName=EMPTY_CONTAINERNAME); diff --git a/common/publishereventtable.cpp b/common/publishereventtable.cpp new file mode 100644 index 000000000..25ecc815e --- /dev/null +++ b/common/publishereventtable.cpp @@ -0,0 +1,109 @@ +#include +#include "publishereventtable.h" +#include "rediscommand.h" +#include "schema.h" +#include "table.h" + +using namespace std; +using namespace swss; + +string buildJsonWithKey(const FieldValueTuple &fvHead, const vector &fv) +{ + nlohmann::json j = nlohmann::json::array(); + j.push_back(fvField(fvHead)); + j.push_back(fvValue(fvHead)); + + // we use array to save order + for (const auto &i : fv) + { + j.push_back(fvField(i)); + j.push_back(fvValue(i)); + } + + return j.dump(); +} + +PublisherEventTable::PublisherEventTable(const DBConnector *db, const std::string &tableName) + : Table(db, tableName) +{ + m_channel = getChannelName(m_pipe->getDbId()); +} + +PublisherEventTable::PublisherEventTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered) + : Table(pipeline, tableName, buffered) +{ + m_channel = getChannelName(m_pipe->getDbId()); +} + +PublisherEventTable::~PublisherEventTable() +{ +} + +void PublisherEventTable::set(const string &key, const vector &values, + const string &op, const string &prefix) +{ + if (values.size() == 0) + return; + + RedisCommand cmd; + + cmd.formatHSET(getKeyName(key), values.begin(), values.end()); + m_pipe->push(cmd, REDIS_REPLY_INTEGER); + + + FieldValueTuple opdata(SET_COMMAND, key); + std::string msg = buildJsonWithKey(opdata, values); + + SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); + + RedisCommand command; + command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); + m_pipe->push(command, REDIS_REPLY_INTEGER); + + if (!m_buffered) + { + m_pipe->flush(); + } +} + +void PublisherEventTable::del(const string &key, const string& op, const string& /*prefix*/) +{ + RedisCommand del_key; + del_key.format("DEL %s", getKeyName(key).c_str()); + m_pipe->push(del_key, REDIS_REPLY_INTEGER); + + FieldValueTuple opdata(DEL_COMMAND, key); + std::string msg = buildJsonWithKey(opdata, {}); + + SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); + + RedisCommand command; + command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); + m_pipe->push(command, REDIS_REPLY_INTEGER); + + if (!m_buffered) + { + m_pipe->flush(); + } +} + +void PublisherEventTable::hdel(const string &key, const string &field, const string& op, const string& /*prefix*/) +{ + RedisCommand hdel_cmd; + hdel_cmd.format("HDEL %s %s", getKeyName(key).c_str(), field.c_str()); + m_pipe->push(hdel_cmd, REDIS_REPLY_INTEGER); + + FieldValueTuple opdata(HDEL_COMMAND, key); + std::string msg = buildJsonWithKey(opdata, {FieldValueTuple(field, "")}); + + SWSS_LOG_DEBUG("channel %s, publish: %s", m_channel.c_str(), msg.c_str()); + + RedisCommand command; + command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str()); + m_pipe->push(command, REDIS_REPLY_INTEGER); + + if (!m_buffered) + { + m_pipe->flush(); + } +} diff --git a/common/publishereventtable.h b/common/publishereventtable.h new file mode 100644 index 000000000..ae15f9a51 --- /dev/null +++ b/common/publishereventtable.h @@ -0,0 +1,68 @@ +#pragma once + +#include +#include "dbconnector.h" +#include "table.h" + +namespace swss { + +class PublisherEventTable : public Table { +public: + PublisherEventTable(const DBConnector *db, const std::string &tableName); + PublisherEventTable(RedisPipeline *pipeline, const std::string &tableName, bool buffered); + ~PublisherEventTable() override; + + /* Set an entry in the DB directly (op not in use) */ + virtual void set(const std::string &key, + const std::vector &values, + const std::string &op = "", + const std::string &prefix = EMPTY_PREFIX); + + /* Set an entry in the DB directly and configure ttl for it (op not in use) */ + // explicitly disable the virtual function + virtual void set(const std::string &key, + const std::vector &values, + const std::string &op, + const std::string &prefix, + const int64_t &ttl) + { + throw std::runtime_error("set with ttl is not supported in PublisherEventTable"); + } + + /* Delete an entry in the table */ + virtual void del(const std::string &key, + const std::string &op = "", + const std::string &prefix = EMPTY_PREFIX); + + virtual void hdel(const std::string &key, + const std::string &field, + const std::string &op = "", + const std::string &prefix = EMPTY_PREFIX); + + virtual void hset(const std::string &key, + const std::string &field, + const std::string &value, + const std::string &op = "", + const std::string &prefix = EMPTY_PREFIX) + { + throw std::runtime_error("hset is not supported in PublisherEventTable"); + } + + /* Read a value from the DB directly */ + /* Returns false if the key doesn't exists */ + // virtual bool get(const std::string &key, std::vector &ovalues); + + // void getKeys(std::vector &keys); + + // void setBuffered(bool buffered); + + // void flush(); + + // void dump(TableDump &tableDump); + +private: + std::string m_channel; +}; + +} + diff --git a/common/schema.h b/common/schema.h index c2efb9368..082524274 100644 --- a/common/schema.h +++ b/common/schema.h @@ -632,6 +632,7 @@ namespace swss { #define SET_COMMAND "SET" #define DEL_COMMAND "DEL" +#define HDEL_COMMAND "HDEL" #define EMPTY_PREFIX "" #define CFG_DTEL_TABLE_NAME "DTEL" diff --git a/common/subscribereventtable.cpp b/common/subscribereventtable.cpp new file mode 100644 index 000000000..798bf9ee8 --- /dev/null +++ b/common/subscribereventtable.cpp @@ -0,0 +1,175 @@ +#include +#include +#include +#include +#include "json.h" +#include "dbconnector.h" +#include "rediscommand.h" +#include "table.h" +#include "selectable.h" +#include "redisselect.h" +#include "redisapi.h" +#include "tokenize.h" +#include "subscriberstatetable.h" +#include "subscribereventtable.h" + +using namespace std; + +namespace swss { + +// TODO: reuse +#define REDIS_PUBLISH_MESSAGE_ELEMNTS (3) +#define REDIS_PUBLISH_MESSAGE_INDEX (2) +string processReply(redisReply *reply) +{ + SWSS_LOG_ENTER(); + + if (reply->type != REDIS_REPLY_ARRAY) + { + // SWSS_LOG_ERROR("expected ARRAY redis reply on channel %s, got: %d", m_channel.c_str(), reply->type); + + throw std::runtime_error("getRedisReply operation failed"); + } + + if (reply->elements != REDIS_PUBLISH_MESSAGE_ELEMNTS) + { + // SWSS_LOG_ERROR("expected %d elements in redis reply on channel %s, got: %zu", + // REDIS_PUBLISH_MESSAGE_ELEMNTS, + // m_channel.c_str(), + // reply->elements); + + throw std::runtime_error("getRedisReply operation failed"); + } + + std::string msg = std::string(reply->element[REDIS_PUBLISH_MESSAGE_INDEX]->str); + + SWSS_LOG_DEBUG("got message: %s", msg.c_str()); + + return msg; +} + +SubscriberEventTable::SubscriberEventTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) + : ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName) +{ + m_channel = getChannelName(m_db->getDbId()); + + subscribe(m_db, m_channel); + + vector keys; + m_table.getKeys(keys); + + for (const auto &key: keys) + { + KeyOpFieldsValuesTuple kco; + + kfvKey(kco) = key; + kfvOp(kco) = SET_COMMAND; + + if (!m_table.get(key, kfvFieldsValues(kco))) + { + continue; + } + + m_buffer.push_back(kco); + } +} + +uint64_t SubscriberEventTable::readData() +{ + redisReply *reply = nullptr; + + /* Read data from redis. This call is non blocking. This method + * is called from Select framework when data is available in socket. + * NOTE: All data should be stored in event buffer. It won't be possible to + * read them second time. */ + if (redisGetReply(m_subscribe->getContext(), reinterpret_cast(&reply)) != REDIS_OK) + { + throw std::runtime_error("Unable to read redis reply"); + } + + m_event_buffer.emplace_back(make_shared(reply)); + + /* Try to read data from redis cacher. + * If data exists put it to event buffer. + * NOTE: channel event is not persistent and it won't + * be possible to read it second time. If it is not stored in + * the buffer it will be lost. */ + + reply = nullptr; + int status; + do + { + status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast(&reply)); + if(reply != nullptr && status == REDIS_OK) + { + m_event_buffer.emplace_back(make_shared(reply)); + } + } + while(reply != nullptr && status == REDIS_OK); + + if (status != REDIS_OK) + { + throw std::runtime_error("Unable to read redis reply"); + } + return 0; +} + +bool SubscriberEventTable::hasData() +{ + return m_buffer.size() > 0 || m_event_buffer.size() > 0; +} + +bool SubscriberEventTable::hasCachedData() +{ + return m_buffer.size() + m_event_buffer.size() > 1; +} + +void SubscriberEventTable::pops(deque &vkco, const string& /*prefix*/) +{ + vkco.clear(); + + if (!m_buffer.empty()) + { + vkco.insert(vkco.end(), m_buffer.begin(), m_buffer.end()); + m_buffer.clear(); + return; + } + + while (auto event = popEventBuffer()) + { + + KeyOpFieldsValuesTuple kco; + auto &values = kfvFieldsValues(kco); + string msg = processReply(event.get()->getContext()); + + JSon::readJson(msg, values); + + FieldValueTuple fvHead = values.at(0); + + kfvOp(kco) = fvField(fvHead); + kfvKey(kco) = fvValue(fvHead); + + values.erase(values.begin()); + + vkco.push_back(kco); + } + + m_event_buffer.clear(); + + return; +} + +shared_ptr SubscriberEventTable::popEventBuffer() +{ + if (m_event_buffer.empty()) + { + return nullptr; + } + + auto reply = m_event_buffer.front(); + m_event_buffer.pop_front(); + + return reply; +} + +} diff --git a/common/subscribereventtable.h b/common/subscribereventtable.h new file mode 100644 index 000000000..c9c5c051b --- /dev/null +++ b/common/subscribereventtable.h @@ -0,0 +1,34 @@ +#pragma once + +#include "subscriberstatetable.h" + +namespace swss { + +class SubscriberEventTable : public ConsumerTableBase +{ +public: + SubscriberEventTable(DBConnector *db, const std::string &tableName, int popBatchSize = DEFAULT_POP_BATCH_SIZE, int pri = 0); + + /* Get all available events from pub/sub channel */ + void pops(std::deque &vkco, const std::string &prefix = EMPTY_PREFIX); + + /* Read event from redis channel*/ + uint64_t readData() override; + bool hasData() override; + bool hasCachedData() override; + bool initializedWithData() override + { + return !m_buffer.empty(); + } + +private: + /* Pop event from event buffer. Caller should free resources. */ + std::shared_ptr popEventBuffer(); + + std::string m_channel; + + std::deque> m_event_buffer; + Table m_table; +}; + +} diff --git a/pyext/swsscommon.i b/pyext/swsscommon.i index b3d015e03..f0e1c92a7 100644 --- a/pyext/swsscommon.i +++ b/pyext/swsscommon.i @@ -32,11 +32,13 @@ #include "redisselect.h" #include "redistran.h" #include "producerstatetable.h" +#include "publishereventtable.h" #include "consumertablebase.h" #include "consumerstatetable.h" #include "producertable.h" #include "profileprovider.h" #include "consumertable.h" +#include "subscribereventtable.h" #include "subscriberstatetable.h" #ifdef ENABLE_YANG_MODULES #include "decoratortable.h" @@ -296,6 +298,7 @@ T castSelectableObj(swss::Selectable *temp) %apply std::vector>& OUTPUT {std::vector> &ovalues}; %apply std::string& OUTPUT {std::string &value}; %include "table.h" +%include "publishereventtable.h" #ifdef ENABLE_YANG_MODULES %include "decoratortable.h" #endif @@ -337,6 +340,7 @@ T castSelectableObj(swss::Selectable *temp) %include "consumertable.h" %include "consumerstatetable.h" +%include "subscribereventtable.h" %include "subscriberstatetable.h" #ifdef ENABLE_YANG_MODULES %include "decoratorsubscriberstatetable.h" diff --git a/tests/redis_multi_db_ut_config/database_config.json b/tests/redis_multi_db_ut_config/database_config.json index 7ce7820ae..b13ba271b 100644 --- a/tests/redis_multi_db_ut_config/database_config.json +++ b/tests/redis_multi_db_ut_config/database_config.json @@ -50,7 +50,8 @@ "CONFIG_DB" : { "id" : 4, "separator": "|", - "instance" : "redis" + "instance" : "redis", + "event_tables" : [ "PORT" ] }, "PFC_WD_DB" : { "id" : 5, @@ -65,7 +66,8 @@ "STATE_DB" : { "id" : 6, "separator": "|", - "instance" : "redis" + "instance" : "redis", + "event_tables" : [ "PORT_TABLE", "TRANSCEIVER_INFO" ] }, "SNMP_OVERLAY_DB" : { "id" : 7, diff --git a/tests/redis_subscriber_state_ut.cpp b/tests/redis_subscriber_state_ut.cpp index 34db9d48a..f5c804a5c 100644 --- a/tests/redis_subscriber_state_ut.cpp +++ b/tests/redis_subscriber_state_ut.cpp @@ -8,6 +8,8 @@ #include "common/selectableevent.h" #include "common/table.h" #include "common/subscriberstatetable.h" +#include "common/publishereventtable.h" +#include "common/subscribereventtable.h" using namespace std; using namespace swss; @@ -222,6 +224,86 @@ TEST(SubscriberStateTable, set) } } +TEST(SubscribeEventTable, set) +{ + clearDB(); + + // Test for event_tables + set eventTables = SonicDBConfig::getEventTables("CONFIG_DB"); + EXPECT_EQ(eventTables.size(), 1); + // for (auto &table: eventTables) + // { + // cout << "Event table: " << table << endl; + // } + + /* Prepare producer */ + int index = 0; + DBConnector db("TEST_DB", 0, true); + PublisherEventTable p(&db, testTableName); + string key = "TheKey"; + int maxNumOfFields = 2; + + /* Set operation before any subscriber is created */ + { + vector fields; + FieldValueTuple t("before_subscriber", "long_ago"); + fields.push_back(t); + p.set(key, fields); + } + + /* Prepare subscriber */ + SubscriberEventTable c(&db, testTableName); + Select cs; + Selectable *selectcs; + cs.addSelectable(&c); + + /* Set operation */ + { + vector fields; + for (int j = 0; j < maxNumOfFields; j++) + { + FieldValueTuple t(field(index, j), value(index, j)); + fields.push_back(t); + } + p.set(key, fields); + } + + /* Pop operation */ + { + int ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + KeyOpFieldsValuesTuple kco; + c.pop(kco); + EXPECT_EQ(kfvKey(kco), key); + EXPECT_EQ(kfvOp(kco), "SET"); + + auto fvs = kfvFieldsValues(kco); + EXPECT_EQ(fvs.size(), 1); + EXPECT_EQ(fvs[0].first, "before_subscriber"); + EXPECT_EQ(fvs[0].second, "long_ago"); + + ret = cs.select(&selectcs); + EXPECT_EQ(ret, Select::OBJECT); + c.pop(kco); + EXPECT_EQ(kfvKey(kco), key); + EXPECT_EQ(kfvOp(kco), "SET"); + + fvs = kfvFieldsValues(kco); + EXPECT_EQ(fvs.size(), (unsigned int)(maxNumOfFields)); + + map mm; + for (auto fv: fvs) + { + mm[fvField(fv)] = fvValue(fv); + } + + for (int j = 0; j < maxNumOfFields; j++) + { + EXPECT_EQ(mm[field(index, j)], value(index, j)); + } + } +} + TEST(SubscriberStateTable, set2_pop1_set1_pop1) { clearDB(); diff --git a/tests/test_redis_ut.py b/tests/test_redis_ut.py index bf395a34f..e795f92a6 100644 --- a/tests/test_redis_ut.py +++ b/tests/test_redis_ut.py @@ -84,6 +84,97 @@ def test_SubscriberStateTable(): sel = swsscommon.Select() cst = swsscommon.SubscriberStateTable(db, "testsst") sel.addSelectable(cst) + + # Set one field and pop + fvs = swsscommon.FieldValuePairs([('a','b')]) + t.set("aaa", fvs) + (state, c) = sel.select() + assert state == swsscommon.Select.OBJECT + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + assert len(cfvs) == 1 + assert cfvs[0] == ('a', 'b') + + # Set one field twice and then pop twice. First pop should return the final field/values, second pop should return the same final field/values + fvs = swsscommon.FieldValuePairs([('c','b0')]) + t.set("aaa", fvs) + fvs = swsscommon.FieldValuePairs([('c','b1')]) + t.set("aaa", fvs) + (state, c) = sel.select() + assert state == swsscommon.Select.OBJECT + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + for fv in cfvs: + if fv[0] == 'c': + assert fv[1] == 'b1' + break + else: + assert False, "Field 'c' not found in the final field/values" + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + for fv in cfvs: + print(fv) + if fv[0] == 'c': + assert fv[1] == 'b1' + break + else: + assert False, "Field 'c' not found in the final field/values" + + +def test_SubscriberEventTable(): + db = swsscommon.DBConnector("CONFIG_DB", 0, True) + db.flushdb() + t = swsscommon.PublisherEventTable(db, "TestEventTable") + cst = swsscommon.SubscriberEventTable(db, "TestEventTable") + sel = swsscommon.Select() + sel.addSelectable(cst) + fvs = swsscommon.FieldValuePairs([('a','b')]) + t.set("aaa", fvs) + (state, c) = sel.select() + assert state == swsscommon.Select.OBJECT + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + assert len(cfvs) == 1 + assert cfvs[0] == ('a', 'b') + + # Set one field twice and then pop twice. First pop should return the first set field/value, second pop should return the second field/value + fvs = swsscommon.FieldValuePairs([('c','b0')]) + t.set("aaa", fvs) + fvs = swsscommon.FieldValuePairs([('c','b1')]) + t.set("aaa", fvs) + (state, c) = sel.select() + assert state == swsscommon.Select.OBJECT + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + for fv in cfvs: + if fv[0] == 'c': + assert fv[1] == 'b0' + break + else: + assert False, "Field 'c' not found in the final field/values" + (key, op, cfvs) = cst.pop() + assert key == "aaa" + assert op == "SET" + for fv in cfvs: + print(fv) + if fv[0] == 'c': + assert fv[1] == 'b1' + break + else: + assert False, "Field 'c' not found in the final field/values" + +def test_PublisherEventTable_SubscriberStateTable(): + db = swsscommon.DBConnector("CONFIG_DB", 0, True) + db.flushdb() + t = swsscommon.PublisherEventTable(db, "TestPublisherTable") + cst = swsscommon.SubscriberStateTable(db, "TestPublisherTable") + sel = swsscommon.Select() + sel.addSelectable(cst) fvs = swsscommon.FieldValuePairs([('a','b')]) t.set("aaa", fvs) (state, c) = sel.select()