diff --git a/.github/workflows/linux.yml b/.github/workflows/linux.yml new file mode 100644 index 0000000..4413e9f --- /dev/null +++ b/.github/workflows/linux.yml @@ -0,0 +1,55 @@ +# Github Workflow Test for FishStore that will run all CMake tests + +name: CMake + +on: [ push ] + +env: + # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) + BUILD_TYPE: Release + +jobs: + build: + # The CMake configure and build commands are platform agnostic and should work equally + # well on Windows or Mac. You can convert this to a matrix build if you need + # cross-platform coverage. + # See: https://docs.github.com/en/free-pro-team@latest/actions/learn-github-actions/managing-complex-workflows#using-a-build-matrix + runs-on: ubuntu-22.04 + + steps: + - name: Checkout reposistory + uses: actions/checkout@master + + - name: Checkout submodules + run: git submodule update --init --recursive + + - name: Install dependencies + run: sudo apt install -y g++ libaio-dev uuid-dev libtbb-dev + + - name: Create Build Environment + # Some projects don't allow in-source building, so create a separate build directory + # We'll use this as our working directory for all subsequent commands + run: cmake -E make_directory ${{runner.workspace}}/build + + - name: Configure CMake + # Use a bash shell so we can use the same syntax for environment variable + # access regardless of the host operating system + shell: bash + working-directory: ${{runner.workspace}}/build + # Note the current convention is to use the -S and -B options here to specify source + # and build directories, but this is only available with CMake 3.13 and higher. + # The CMake binaries on the Github Actions machines are (as of this writing) 3.12 + run: cmake $GITHUB_WORKSPACE -DCMAKE_BUILD_TYPE=$BUILD_TYPE + + - name: Build + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute the build. You can specify a specific target with "--target " + run: cmake --build . --config $BUILD_TYPE + + - name: Test + working-directory: ${{runner.workspace}}/build + shell: bash + # Execute tests defined by the CMake configuration. + # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail + run: ctest -j 1 --interactive-debug-mode 0 --output-on-failure -R .*_test \ No newline at end of file diff --git a/.gitignore b/.gitignore index c4c809f..fd35748 100644 --- a/.gitignore +++ b/.gitignore @@ -330,4 +330,8 @@ ASALocalRun/ # MFractors (Xamarin productivity tool) working folder .mfractor/ + +# Build folder /build +/cmake-build-debug +/cmake-build-release diff --git a/CMakeLists.txt b/CMakeLists.txt index 2fd5049..d688d0c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,39 +18,26 @@ if (MSVC) set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO") set(CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} /DEBUG /OPT:REF /OPT:NOICF /INCREMENTAL:NO") else() - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og -g -D_DEBUG") - set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -Og -g -D_DEBUG -fPIC") + set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -fPIC") endif() #Always set _DEBUG compiler directive when compiling bits regardless of target OS set_directory_properties(PROPERTIES COMPILE_DEFINITIONS_DEBUG "_DEBUG") ##### BEGIN GOOGLE TEST INSTALLATION ##### -# Copied from https://github.com/google/googletest/tree/master/googletest#incorporating-into-an-existing-cmake-project +# Copied from http://google.github.io/googletest/quickstart-cmake.html # Download and unpack googletest at configure time -configure_file(CMakeLists.txt.in googletest-download/CMakeLists.txt) -execute_process(COMMAND ${CMAKE_COMMAND} -G "${CMAKE_GENERATOR}" . - RESULT_VARIABLE result - WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download ) -if(result) - message(FATAL_ERROR "CMake step for googletest failed: ${result}") -endif() -execute_process(COMMAND ${CMAKE_COMMAND} --build . - RESULT_VARIABLE result - WORKING_DIRECTORY ${CMAKE_BINARY_DIR}/googletest-download ) -if(result) - message(FATAL_ERROR "Build step for googletest failed: ${result}") -endif() - -# Prevent overriding the parent project's compiler/linker -# settings on Windows +include(FetchContent) +FetchContent_Declare( + googletest + URL https://github.com/google/googletest/archive/03597a01ee50ed33e9dfd640b249b4be3799d395.zip +) +# For Windows: Prevent overriding the parent project's compiler/linker settings set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) +FetchContent_MakeAvailable(googletest) -# Add googletest directly to our build. This defines -# the gtest and gtest_main targets. -add_subdirectory(${CMAKE_BINARY_DIR}/googletest-src - ${CMAKE_BINARY_DIR}/googletest-build - EXCLUDE_FROM_ALL) +include(GoogleTest) ##### END GOOGLE TEST INSTALLATION ##### diff --git a/examples/online_demo-dir/online_demo.cc b/examples/online_demo-dir/online_demo.cc index a3998aa..1f07a4a 100644 --- a/examples/online_demo-dir/online_demo.cc +++ b/examples/online_demo-dir/online_demo.cc @@ -184,7 +184,7 @@ int main(int argc, char* argv[]) { int n_threads = atoi(argv[2]); std::ifstream fin(argv[1]); std::vector batches; - const uint32_t json_batch_size = 1; + const uint32_t json_batch_size = 8; uint32_t json_batch_cnt = 0; size_t line_cnt = 0; size_t record_cnt = 0; diff --git a/src/adapters/README.md b/src/adapters/README.md index dd1b268..8bab965 100644 --- a/src/adapters/README.md +++ b/src/adapters/README.md @@ -9,7 +9,7 @@ using store_t = fishstore::core::FishStore; # General Parser Interface [`parser_api.h`](parser_api.h) provides a general parser interface which extension developer should comply on. Generally speaking, a parser should be able to construct with a given list of field names, parse a batch of documents (or a single document) returning an interator to iterate through all the records and all required fields. For each parsed field, user should be able to get the value in its corresponding format through interfaces like `GetAsInt()` or `GetAsDouble()`. Such functions return values in `NullableInt` and `NullableDouble` defined under `fishstore::adapter` scope. -**Note that `NullableInt` and `NullableStringRef` defined under `fishstore::core` scope has different interfaces thatn that in `fishstore::adapter`. Please do not be confused with them.** +**Note that `NullableInt` and `NullableStringRef` defined under `fishstore::core` scope has different interfaces than that in `fishstore::adapter`. Please do not be confused with them.** # Parser Adapter @@ -37,4 +37,4 @@ User should explicitly define the parser type in `parser_t`, parsed field type i There are a few known limitations with simdjson parser wrapper and adapter: - Note that simdjson currently only supports parsing one JSON record at a time. Thus, users can only feed one record in raw text to `BatchInsert()` at a time. As a result, user need to implement their own logic to delimit record boundaries within a batch in application level. -- `SIMDJsonParser` and `SIMDJsonAdapter` only supports object-based field names (e.g., `actor.id`, `payload.action.type`). Arrays (like `a[0].b`) and wildcards `a.*.b` are not supported. +- `SIMDJsonParser` and `SIMDJsonAdapter` only supports object-based field names (e.g., `actor.id`, `payload.action.type`), and arrays (like `a[0].b`), although all field names must start with an object (`[0].xyz` is not allowed). Wildcards `a.*.b` are not supported. diff --git a/src/adapters/common_utils.h b/src/adapters/common_utils.h index 20d6799..ff0ab8b 100644 --- a/src/adapters/common_utils.h +++ b/src/adapters/common_utils.h @@ -2,53 +2,61 @@ // Licensed under the MIT license. #pragma once + #include namespace fishstore { -namespace adapter { - -class StringRef { -public: - StringRef() : ptr(nullptr), size(0) {} - StringRef(const char* ptr_, size_t size_) : ptr(ptr_), size(size_) {} - - const char* Data() const { - return ptr; - } - - const size_t Length() const { - return size; - } - -private: - const char* ptr; - size_t size; -}; - -template -struct Nullable { - Nullable() : has_value(false), value() {} - Nullable(const T& value_) : has_value(true), value(value_) {} - - bool HasValue() const { - return has_value; - } - const T& Value() { - return value; - } - -private: - bool has_value; - T value; -}; - -using NullableInt = Nullable; -using NullableLong = Nullable; -using NullableFloat = Nullable; -using NullableDouble = Nullable; -using NullableBool = Nullable; -using NullableStringRef = Nullable; -using NullableString = Nullable; + namespace adapter { -} + class StringRef { + public: + StringRef() : ptr(nullptr), size(0) {} + + StringRef(const char *ptr_, size_t size_) : ptr(ptr_), size(size_) {} + + const char *Data() const { + return ptr; + } + + const size_t Length() const { + return size; + } + + private: + const char *ptr; + size_t size; + }; + + template + struct Nullable { + Nullable() : has_value(false), value() {} + + Nullable(const T &value_) : has_value(true), value(value_) {} + + // Constructs a Nullable, which may have a value, and if it does, then + // it will have the given value + Nullable(const bool has_value_, const T &value_) : has_value(has_value_), value(value_) {} + + bool HasValue() const { + return has_value; + } + + const T &Value() { + return value; + } + + private: + bool has_value; + T value; + }; + + using NullableInt = Nullable; + using NullableLong = Nullable; + using NullableFloat = Nullable; + using NullableDouble = Nullable; + using NullableBool = Nullable; + using NullableStringRef = Nullable; + using NullableString = Nullable; + + } } diff --git a/src/adapters/simdjson_adapter.h b/src/adapters/simdjson_adapter.h index 23651c3..a4b9afc 100644 --- a/src/adapters/simdjson_adapter.h +++ b/src/adapters/simdjson_adapter.h @@ -6,172 +6,262 @@ #include #include #include +#include #include #ifdef _MSC_VER #define NOMINMAX #endif -#include -#include + +#include #include "adapters/common_utils.h" +constexpr size_t DEFAULT_BATCH_SIZE = 1 << 24; + using namespace simdjson; namespace fishstore { -namespace adapter { - -class SIMDJsonField { -public: - SIMDJsonField(int64_t id_, const ParsedJson::Iterator& it_) - : field_id(id_), iter(it_) {} - - inline int64_t FieldId() const { - return field_id; - } - - inline NullableBool GetAsBool() const { - switch (iter.get_type()) { - case 't': - return NullableBool(true); - case 'f': - return NullableBool(false); - default: - return NullableBool(); - } - } - - inline NullableInt GetAsInt() const { - if (iter.is_integer()) { - return NullableInt(static_cast(iter.get_integer())); - } else return NullableInt(); - } - - inline NullableLong GetAsLong() const { - if (iter.is_integer()) { - return NullableLong(iter.get_integer()); - } else return NullableLong(); - } - - inline NullableFloat GetAsFloat() const { - if (iter.is_double()) { - return NullableFloat(static_cast(iter.get_double())); - } else return NullableFloat(); - } - - inline NullableDouble GetAsDouble() const { - if (iter.is_double()) { - return NullableDouble(iter.get_double()); - } else return NullableDouble(); - } - - inline NullableString GetAsString() const { - if (iter.is_string()) { - return NullableString(std::string(iter.get_string(), iter.get_string_length())); - } else return NullableString(); - } - - inline NullableStringRef GetAsStringRef() const { - if (iter.is_string()) { - return NullableStringRef(StringRef(iter.get_string(), iter.get_string_length())); - } else return NullableStringRef(); - } - -private: - int64_t field_id; - ParsedJson::Iterator iter; -}; - -class SIMDJsonRecord { -public: - friend class SIMDJsonParser; - - SIMDJsonRecord() : original(), fields() {} - - SIMDJsonRecord(const char* data, size_t length) - : original(data, length) { - fields.clear(); - } - - inline const std::vector& GetFields() const { - return fields; - } - - inline StringRef GetRawText() const { - return original; - } - -public: - StringRef original; - std::vector fields; -}; - -class SIMDJsonParser { -public: - SIMDJsonParser(const std::vector& field_names, const size_t alloc_bytes = 1LL << 25) - : fields(field_names) { - auto success = pj.allocate_capacity(alloc_bytes); - assert(success); - has_next = false; - } - - inline void Load(const char* buffer, size_t length) { - record.original = StringRef(buffer, length); - record.fields.clear(); - auto ok = json_parse(buffer, length, pj); - if (ok != 0 || !pj.is_valid()) { - printf("Parsing failed...\n"); - has_next = false; - } else { - has_next = true; + namespace adapter { + + // Represents a SimdJson field + class SIMDJsonField { + public: + // constructs a SimdJsonField with a given simdjson value + SIMDJsonField(int64_t id_, const simdjson_result &value_) + : field_id(id_), simd_value(value_) {} + + inline int64_t FieldId() const { + return field_id; + } + + inline NullableBool GetAsBool() const { + bool val; + bool has_value = (simd_value.get(val) == error_code::SUCCESS); + return {has_value, val}; + } + + inline NullableInt GetAsInt() const { + int64_t val; + bool has_value = (simd_value.get(val) == error_code::SUCCESS); + return {has_value, static_cast(val)}; + } + + inline NullableLong GetAsLong() const { + int64_t val; + bool has_value = (simd_value.get(val) == error_code::SUCCESS); + return {has_value, val}; + } + + inline NullableFloat GetAsFloat() const { + double val; + bool has_value = (simd_value.get(val) == error_code::SUCCESS); + return {has_value, static_cast(val)}; + } + + inline NullableDouble GetAsDouble() const { + double val; + bool has_value = (simd_value.get(val) == error_code::SUCCESS); + return {has_value, val}; + } + + inline NullableString GetAsString() const { + + // the simdjson get method only supports std::string_view, so we must + // turn it into a std::string after + std::string_view temp{}; + bool has_value = (simd_value.get(temp) == error_code::SUCCESS); + return {has_value, std::string(temp)}; + } + + inline NullableStringRef GetAsStringRef() const { + std::string_view temp{}; + bool has_value = (simd_value.get(temp) == error_code::SUCCESS); + StringRef str_ref{temp.data(), temp.length()}; + return {has_value, str_ref}; + } + + private: + int64_t field_id; + mutable simdjson_result simd_value; + }; + + // represents the type of field that will be looked up (object or array) + enum class SIMDJsonFieldType { + OBJECT, + ARRAY + }; + + // represents the type of field that will be looked up and + // the name or index that should be looked up + // this is stored in a vector in SIMDJsonFieldLookup + struct SIMDJsonFieldLookupElement { + SIMDJsonFieldType type; + union { + std::string_view key_name; + size_t array_index; + }; + }; + + // helper class used to look up simdjson fields + class SIMDJsonFieldLookup { + public: + // creates a simdjson field from a string + SIMDJsonFieldLookup() = default; + + explicit SIMDJsonFieldLookup(std::string &lookup_str) { + char *start = lookup_str.data(); + char *end = lookup_str.data(); + while (true) { + if (*end == '.' || *end == '\0') { // end of a field + size_t str_len = end - start; + if (str_len == 0) { + break; + } + std::string_view field = {start, str_len}; + SIMDJsonFieldLookupElement item = {.type=SIMDJsonFieldType::OBJECT, .key_name=field}; + lookups.push_back(item); + + if (*end == '\0') { + break; + } + + + start = end + 1; + } else if (*start == '[') { + // strtol will move the end pointer right after the last number, so to the ']'. + size_t index = std::strtol(++start, &end, 10); + SIMDJsonFieldLookupElement item = {.type=SIMDJsonFieldType::ARRAY, .array_index=index}; + lookups.push_back(item); + start = end + 1; + } + end++; + } + } + + simdjson_result find(ondemand::object source) const { + auto fields_it = lookups.begin(); + + // always starts with an object + auto ret = source.find_field_unordered(fields_it->key_name); + fields_it++; + + // iterate through the entire lookups + while (fields_it != lookups.end()) { + // if object, or array find the correct value + if (fields_it->type == SIMDJsonFieldType::OBJECT) { + ret = ret.find_field_unordered(fields_it->key_name); + } else if (fields_it->type == SIMDJsonFieldType::ARRAY) { + ret = ret.at(fields_it->array_index); + } + fields_it++; + } + + return ret; + } + + private: + std::vector lookups; + }; + + class SIMDJsonRecord { + public: + friend class SIMDJsonParser; + + SIMDJsonRecord() = default; + + SIMDJsonRecord(ondemand::document_reference doc, const std::vector &lookups) { + obj = doc.get_object(); + + auto ref = obj.raw_json().value(); + raw_text = {ref.data(), ref.length()}; + obj.reset(); + + + int i = 0; + for (const auto &lookup: lookups) { + const auto value = lookup.find(obj); + // check the value was found if not, don't add to vector + if (value.error() == simdjson::SUCCESS) { + fields.emplace_back(i, value); + } + ++i; + } + } + + inline const std::vector &GetFields() const { + return fields; + } + + inline StringRef GetRawText() const { + return raw_text; + } + + public: + mutable ondemand::object obj; + StringRef raw_text; + std::vector fields; + }; + + class SIMDJsonParser { + public: + SIMDJsonParser(std::vector field_names_) : field_names(std::move(field_names_)) { + field_lookups.reserve(field_names.size()); + for (auto &item: field_names) { + field_lookups.emplace_back(item); + } + } + + inline void Load(const char *buffer, size_t length) { + if (parser.iterate_many(buffer, length, DEFAULT_BATCH_SIZE).get(docs) != simdjson::SUCCESS) + return; + docs_it = docs.begin(); + } + + inline bool HasNext() { + return docs_it != docs.end(); + } + + inline const SIMDJsonRecord &NextRecord() { + assert(docs_it != docs.end()); + record = SIMDJsonRecord(*docs_it, field_lookups); + ++docs_it; + return record; + } + + private: + std::vector field_lookups; + + // keep these around for memory safety reasons + std::vector field_names; + ondemand::parser parser; + ondemand::document_stream docs; + ondemand::document_stream::iterator docs_it; + + SIMDJsonRecord record; + }; + + class SIMDJsonAdapter { + public: + typedef SIMDJsonParser parser_t; + typedef SIMDJsonField field_t; + typedef SIMDJsonRecord record_t; + + inline static parser_t *NewParser(const std::vector &fields) { + return new parser_t{fields}; + } + + inline static void Load(parser_t *const parser, const char *payload, size_t length, size_t offset = 0) { + assert(offset <= length); + parser->Load(payload + offset, length - offset); + } + + inline static bool HasNext(parser_t *const parser) { + return parser->HasNext(); + } + + inline static const record_t &NextRecord(parser_t *const parser) { + return parser->NextRecord(); + } + }; } - } - - inline bool HasNext() { - return has_next; - } - - inline const SIMDJsonRecord& NextRecord() { - ParsedJson::Iterator it(pj); - for (auto field_id = 0; field_id < fields.size(); ++field_id) { - if (it.move_to(fields[field_id])) { - record.fields.emplace_back(SIMDJsonField{field_id, it}); - } - } - has_next = false; - return record; - } - -private: - std::vector fields; - ParsedJson pj; - SIMDJsonRecord record; - bool has_next; -}; - -class SIMDJsonAdapter { -public: - typedef SIMDJsonParser parser_t; - typedef SIMDJsonField field_t; - typedef SIMDJsonRecord record_t; - - inline static parser_t* NewParser(const std::vector& fields) { - return new parser_t{ fields }; - } - - inline static void Load(parser_t* const parser, const char* payload, size_t length, size_t offset = 0) { - assert(offset <= length); - parser->Load(payload + offset, length - offset); - } - - inline static bool HasNext(parser_t* const parser) { - return parser->HasNext(); - } - - inline static const record_t& NextRecord(parser_t* const parser) { - return parser->NextRecord(); - } -}; - -} -} - +} \ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 03ffcfb..8b2ec8a 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -17,7 +17,7 @@ FUNCTION(ADD_FISHSTORE_TEST TEST_NAME HEADERS) else() target_compile_options(${TEST_NAME} PRIVATE -UNDEBUG) endif() - add_test(${TEST_NAME} ${CMAKE_BINARY_DIR}/${TEST_NAME}) + gtest_discover_tests(${TEST_NAME} ${CMAKE_BINARY_DIR}/${TEST_NAME}) ENDFUNCTION() ADD_FISHSTORE_TEST(in_memory_test "") @@ -30,3 +30,5 @@ ADD_FISHSTORE_TEST(checkpoint_queue_test "checkpoint_test.h") if (MSVC) ADD_FISHSTORE_TEST(checkpoint_threadpool_test "checkpoint_test.h") endif() + +ADD_FISHSTORE_TEST(simdjson_adapter_test "") \ No newline at end of file diff --git a/test/checkpoint_test.h b/test/checkpoint_test.h index cfe9384..fee7120 100644 --- a/test/checkpoint_test.h +++ b/test/checkpoint_test.h @@ -10,294 +10,297 @@ using store_t = FishStore; const size_t n_records = 1500000; const size_t n_threads = 4; -const char* pattern = - "{\"id\": \"%zu\", \"name\": \"name%zu\", \"gender\": \"%s\", \"school\": {\"id\": \"%zu\", \"name\": \"school%zu\"}}"; +const char *pattern = + "{\"id\": \"%zu\", \"name\": \"name%zu\", \"gender\": \"%s\", \"school\": {\"id\": \"%zu\", \"name\": \"school%zu\"}}"; Guid log_token, index_token; class JsonGeneralScanContext : public IAsyncContext { public: - JsonGeneralScanContext(uint16_t psf_id, const char* value, uint32_t expected) - : hash_{ Utility::HashBytesWithPSFID(psf_id, value, strlen(value)) }, - psf_id_{ psf_id }, - value_size_{ static_cast(strlen(value)) }, - value_{ value }, - cnt{ 0 }, - expected{ expected } { - } - - JsonGeneralScanContext(uint16_t psf_id, const char* value, size_t length, uint32_t expected) - : hash_{ Utility::HashBytesWithPSFID(psf_id, value, length) }, - psf_id_{ psf_id }, - value_size_{ static_cast(length) }, - value_{ value }, - cnt{ 0 }, - expected{ expected } { - } - - JsonGeneralScanContext(const JsonGeneralScanContext& other) - : hash_{ other.hash_ }, - psf_id_{ other.psf_id_ }, - value_size_{ other.value_size_ }, - cnt{ other.cnt }, - expected{ other.expected } { - set_from_deep_copy(); - char* res = (char*)malloc(value_size_); - memcpy(res, other.value_, value_size_); - value_ = res; - } - - ~JsonGeneralScanContext() { - if (from_deep_copy()) free((void*)value_); - } - - inline void Touch(const char* payload, uint32_t payload_size) { - ++cnt; - } - - inline void Finalize() { - ASSERT_EQ(cnt, expected); - } - - inline KeyHash get_hash() const { - return hash_; - } - - inline bool check(const KeyPointer* kpt) { - return kpt->mode == 0 && kpt->general_psf_id == psf_id_ && - kpt->value_size == value_size_ && - !memcmp(kpt->get_value(), value_, value_size_); - } + JsonGeneralScanContext(uint16_t psf_id, const char *value, uint32_t expected) + : hash_{Utility::HashBytesWithPSFID(psf_id, value, strlen(value))}, + psf_id_{psf_id}, + value_size_{static_cast(strlen(value))}, + value_{value}, + cnt{0}, + expected{expected} { + } + + JsonGeneralScanContext(uint16_t psf_id, const char *value, size_t length, uint32_t expected) + : hash_{Utility::HashBytesWithPSFID(psf_id, value, length)}, + psf_id_{psf_id}, + value_size_{static_cast(length)}, + value_{value}, + cnt{0}, + expected{expected} { + } + + JsonGeneralScanContext(const JsonGeneralScanContext &other) + : hash_{other.hash_}, + psf_id_{other.psf_id_}, + value_size_{other.value_size_}, + cnt{other.cnt}, + expected{other.expected} { + set_from_deep_copy(); + char *res = (char *) malloc(value_size_); + memcpy(res, other.value_, value_size_); + value_ = res; + } + + ~JsonGeneralScanContext() { + if (from_deep_copy()) free((void *) value_); + } + + inline void Touch(const char *payload, uint32_t payload_size) { + ++cnt; + } + + inline void Finalize() { + ASSERT_EQ(cnt, expected); + } + + inline KeyHash get_hash() const { + return hash_; + } + + inline bool check(const KeyPointer *kpt) { + return kpt->mode == 0 && kpt->general_psf_id == psf_id_ && + kpt->value_size == value_size_ && + !memcmp(kpt->get_value(), value_, value_size_); + } protected: - Status DeepCopy_Internal(IAsyncContext*& context_copy) { - return IAsyncContext::DeepCopy_Internal(*this, context_copy); - } + Status DeepCopy_Internal(IAsyncContext *&context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } private: - KeyHash hash_; - uint16_t psf_id_; - uint32_t value_size_; - const char* value_; - uint32_t cnt, expected; + KeyHash hash_; + uint16_t psf_id_; + uint32_t value_size_; + const char *value_; + uint32_t cnt, expected; }; class JsonFullScanContext : public IAsyncContext { public: - JsonFullScanContext(const std::vector& field_names, - const general_psf_t& pred, const char* value, uint32_t expected) - : eval_{ pred }, - cnt{ 0 }, - expected{ expected }, - value_{ value }, - value_size_{ static_cast(strlen(value)) }, - field_names{field_names}, - parser{field_names} { - } - - JsonFullScanContext(const JsonFullScanContext& other) - : field_names{ other.field_names }, - eval_{ other.eval_ }, - value_size_{ other.value_size_ }, - cnt{ other.cnt }, - expected{ other.expected }, - parser{other.field_names} { - set_from_deep_copy(); - char* res = (char*)malloc(value_size_); - memcpy(res, other.value_, value_size_); - value_ = res; - } - - ~JsonFullScanContext() { - if (from_deep_copy()) { - free((void*)value_); + JsonFullScanContext(const std::vector &field_names, + const general_psf_t &pred, const char *value, uint32_t expected) + : eval_{pred}, + cnt{0}, + expected{expected}, + value_{value}, + value_size_{static_cast(strlen(value))}, + field_names{field_names}, + parser{field_names} { + } + + JsonFullScanContext(const JsonFullScanContext &other) + : field_names{other.field_names}, + eval_{other.eval_}, + value_size_{other.value_size_}, + cnt{other.cnt}, + expected{other.expected}, + parser{other.field_names} { + set_from_deep_copy(); + char *res = (char *) malloc(value_size_); + memcpy(res, other.value_, value_size_); + value_ = res; + } + + ~JsonFullScanContext() { + if (from_deep_copy()) { + free((void *) value_); + } + } + + inline void Touch(const char *payload, uint32_t payload_size) { + ++cnt; } - } - - inline void Touch(const char* payload, uint32_t payload_size) { - ++cnt; - } - - inline void Finalize() { - ASSERT_EQ(cnt, expected); - } - - inline bool check(const char* payload, uint32_t payload_size) { - parser.Load(payload, payload_size); - auto& record = parser.NextRecord(); - tsl::hopscotch_map field_map(field_names.size()); - for (auto& field : record.GetFields()) { - field_map.emplace(static_cast(field.FieldId()), field); + + inline void Finalize() { + ASSERT_EQ(cnt, expected); } - std::vector args; - args.reserve(field_names.size()); - for (uint16_t i = 0; i < field_names.size(); ++i) { - auto it = field_map.find(i); - if (it == field_map.end()) return false; - args.emplace_back(it->second); + + inline bool check(const char *payload, uint32_t payload_size) { + parser.Load(payload, payload_size); + auto &record = parser.NextRecord(); + tsl::hopscotch_map field_map(field_names.size()); + for (auto &field: record.GetFields()) { + field_map.emplace(static_cast(field.FieldId()), field); + } + std::vector args; + args.reserve(field_names.size()); + for (uint16_t i = 0; i < field_names.size(); ++i) { + auto it = field_map.find(i); + if (it == field_map.end()) return false; + args.emplace_back(it->second); + } + auto res = eval_(args); + if (res.is_null) return false; + bool pass = (res.size == value_size_ && !strncmp(res.payload, value_, value_size_)); + if (res.need_free) delete res.payload; + return pass; } - auto res = eval_(args); - if (res.is_null) return false; - bool pass = (res.size == value_size_ && !strncmp(res.payload, value_, value_size_)); - if (res.need_free) delete res.payload; - return pass; - } protected: - Status DeepCopy_Internal(IAsyncContext * &context_copy) { - return IAsyncContext::DeepCopy_Internal(*this, context_copy); - } + Status DeepCopy_Internal(IAsyncContext *&context_copy) { + return IAsyncContext::DeepCopy_Internal(*this, context_copy); + } private: - adapter_t::parser_t parser; - std::vector field_names; - general_psf_t eval_; - uint32_t cnt, expected; - uint32_t value_size_; - const char* value_; + adapter_t::parser_t parser; + std::vector field_names; + general_psf_t eval_; + uint32_t cnt, expected; + uint32_t value_size_; + const char *value_; }; TEST(CLASS, Checkpoint_Concurrent) { - std::experimental::filesystem::remove_all("test"); - std::experimental::filesystem::create_directories("test"); - std::vector guids(n_threads); - - { - store_t store{ 8192, 201326592, "test" }; - store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); - std::vector actions; - actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); - actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); - uint64_t safe_register_address, safe_unregister_address; - safe_unregister_address = store.ApplyParserShift( - actions, [&safe_register_address](uint64_t safe_address) { - safe_register_address = safe_address; - }); + std::experimental::filesystem::remove_all("test"); + std::experimental::filesystem::create_directories("test"); + std::vector guids(n_threads); + + { + store_t store{8192, 201326592, "test"}; + store.StartSession(); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); + std::vector actions; + actions.push_back({REGISTER_GENERAL_PSF, id_proj}); + actions.push_back({REGISTER_GENERAL_PSF, gender_proj}); + uint64_t safe_register_address, safe_unregister_address; + safe_unregister_address = store.ApplyParserShift( + actions, [&safe_register_address](uint64_t safe_address) { + safe_register_address = safe_address; + }); + + store.CompleteAction(true); + + static auto checkpoint_callback = [](Status result) { + ASSERT_EQ(result, Status::Ok); + }; + + static auto hybrid_log_persistence_callback = + [](Status result, uint64_t persistent_serial_num, uint32_t persistent_offset) { + ASSERT_EQ(result, Status::Ok); + }; + + + std::atomic_size_t cnt{0}; + std::vector thds; + for (size_t i = 0; i < n_threads; ++i) { + thds.emplace_back([&store, &cnt, &guids](size_t start) { + guids[start] = store.StartSession(); + char buf[1024]; + uint64_t serial_num = 0; + size_t op_cnt = 0; + for (size_t i = start; i < n_records; i += n_threads) { + auto n = sprintf(buf, pattern, i, i, (i % 2) ? "male" : "female", i % 10, i % 10); + cnt += store.BatchInsert(buf, n, serial_num); + ++op_cnt; + if (op_cnt % 256 == 0) store.Refresh(); + ++serial_num; + } + store.StopSession(); + }, i); + } - store.CompleteAction(true); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + store.CheckpointIndex(checkpoint_callback, index_token); + store.CompleteAction(true); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + store.CheckpointHybridLog(hybrid_log_persistence_callback, log_token); + store.CompleteAction(true); - static auto checkpoint_callback = [](Status result) { - ASSERT_EQ(result, Status::Ok); - }; + for (auto &thd: thds) { + thd.join(); + } - static auto hybrid_log_persistence_callback = - [](Status result, uint64_t persistent_serial_num, uint32_t persistent_offset) { - ASSERT_EQ(result, Status::Ok); - }; + ASSERT_EQ(cnt, n_records); + + actions.clear(); + actions.push_back({DEREGISTER_GENERAL_PSF, id_proj}); + actions.push_back({DEREGISTER_GENERAL_PSF, gender_proj}); + safe_unregister_address = store.ApplyParserShift( + actions, [&safe_register_address](uint64_t safe_address) { + safe_register_address = safe_address; + }); + store.CompleteAction(true); + + store.StopSession(); + } - std::atomic_size_t cnt{ 0 }; + store_t new_store{8192, 201326592, "test"}; + uint32_t version; + std::vector recovered_session_ids; + new_store.Recover(index_token, log_token, version, recovered_session_ids); + + new_store.StartSession(); + std::vector> sessions(n_threads); std::vector thds; - for (size_t i = 0; i < n_threads; ++i) { - thds.emplace_back([&store, &cnt, &guids](size_t start) { - guids[start] = store.StartSession(); + auto new_worker_thd = [&](int thread_no) { + uint64_t serial_num; + uint32_t offset; + try { + std::tie(serial_num, offset) = new_store.ContinueSession(guids[thread_no]); + } catch (const std::invalid_argument &ex) { + return; // finished too quickly + } + size_t begin_line = thread_no; + auto callback = [](IAsyncContext *ctxt, Status result) { + assert(false); + }; + new_store.Refresh(); + uint32_t op_cnt = 0; char buf[1024]; - uint64_t serial_num = 0; - size_t op_cnt = 0; - for (size_t i = start; i < n_records; i += n_threads) { - auto n = sprintf(buf, pattern, i, i, (i % 2) ? "male" : "female", i % 10, i % 10); - cnt += store.BatchInsert(buf, n, serial_num); - ++op_cnt; - if (op_cnt % 256 == 0) store.Refresh(); - ++serial_num; + bool flag = true; + for (size_t i = begin_line + serial_num * n_threads; i < n_records; i += n_threads) { + auto n = sprintf(buf, pattern, i, i, (i % 2) ? "male" : "female", i % 10, i % 10); + if (flag) { + new_store.BatchInsert(buf, n, serial_num, offset); + flag = false; + } else { + new_store.BatchInsert(buf, n, serial_num); + } + ++op_cnt; + if (op_cnt % 256 == 0) new_store.Refresh(); + ++serial_num; } - store.StopSession(); - }, i); - } + new_store.CompleteAction(true); + new_store.StopSession(); + }; - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - store.CheckpointIndex(checkpoint_callback, index_token); - store.CompleteAction(true); - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - store.CheckpointHybridLog(hybrid_log_persistence_callback, log_token); - store.CompleteAction(true); + for (int i = 0; i < n_threads; ++i) { + thds.emplace_back(std::thread(new_worker_thd, i)); + } - for (auto& thd : thds) { - thd.join(); + for (int i = 0; i < n_threads; ++i) { + thds[i].join(); } - ASSERT_EQ(cnt, n_records); - - actions.clear(); - actions.push_back({ DEREGISTER_GENERAL_PSF, id_proj }); - actions.push_back({ DEREGISTER_GENERAL_PSF, gender_proj }); - safe_unregister_address = store.ApplyParserShift( - actions, [&safe_register_address](uint64_t safe_address) { - safe_register_address = safe_address; - }); - - store.CompleteAction(true); - - store.StopSession(); - } - - store_t new_store{ 8192, 201326592, "test" }; - uint32_t version; - std::vector recovered_session_ids; - new_store.Recover(index_token, log_token, version, recovered_session_ids); - - new_store.StartSession(); - std::vector> sessions(n_threads); - std::vector thds; - auto new_worker_thd = [&](int thread_no) { - uint64_t serial_num; - uint32_t offset; - std::tie(serial_num, offset) = new_store.ContinueSession(guids[thread_no]); - size_t begin_line = thread_no; - auto callback = [](IAsyncContext* ctxt, Status result) { - assert(false); + auto callback = [](IAsyncContext *ctxt, Status result) { + ASSERT_EQ(result, Status::Ok); }; - new_store.Refresh(); - uint32_t op_cnt = 0; - char buf[1024]; - bool flag = true; - for (size_t i = begin_line + serial_num * n_threads; i < n_records; i += n_threads) { - auto n = sprintf(buf, pattern, i, i, (i % 2) ? "male" : "female", i % 10, i % 10); - if (flag) { - new_store.BatchInsert(buf, n, serial_num, offset); - flag = false; - } - else { - new_store.BatchInsert(buf, n, serial_num); - } - ++op_cnt; - if (op_cnt % 256 == 0) new_store.Refresh(); - ++serial_num; - } + JsonGeneralScanContext context2{1, "male", n_records / 2}; + auto res = new_store.Scan(context2, callback, 0); + ASSERT_EQ(res, Status::Pending); + new_store.CompletePending(true); + + std::vector actions; + uint64_t safe_register_address, safe_unregister_address; + actions.push_back({DEREGISTER_GENERAL_PSF, 0}); + actions.push_back({DEREGISTER_GENERAL_PSF, 1}); + safe_unregister_address = new_store.ApplyParserShift( + actions, [&safe_register_address](uint64_t safe_address) { + safe_register_address = safe_address; + }); + new_store.CompleteAction(true); + new_store.StopSession(); - }; - - for (int i = 0; i < n_threads; ++i) { - thds.emplace_back(std::thread(new_worker_thd, i)); - } - - for (int i = 0; i < n_threads; ++i) { - thds[i].join(); - } - - auto callback = [](IAsyncContext* ctxt, Status result) { - ASSERT_EQ(result, Status::Ok); - }; - JsonGeneralScanContext context2{ 1, "male", n_records / 2 }; - auto res = new_store.Scan(context2, callback, 0); - ASSERT_EQ(res, Status::Pending); - new_store.CompletePending(true); - - std::vector actions; - uint64_t safe_register_address, safe_unregister_address; - actions.push_back({ DEREGISTER_GENERAL_PSF, 0 }); - actions.push_back({ DEREGISTER_GENERAL_PSF, 1 }); - safe_unregister_address = new_store.ApplyParserShift( - actions, [&safe_register_address](uint64_t safe_address) { - safe_register_address = safe_address; - }); - - new_store.CompleteAction(true); - - new_store.StopSession(); } diff --git a/test/in_memory_test.cc b/test/in_memory_test.cc index 6a4f1fa..15370da 100644 --- a/test/in_memory_test.cc +++ b/test/in_memory_test.cc @@ -160,8 +160,8 @@ class JsonFullScanContext : public IAsyncContext { TEST(InMemFishStore, Ingest_Serial) { store_t store{ 1LL << 24, 1LL << 30, "" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -213,8 +213,8 @@ TEST(InMemFishStore, Ingest_Serial) { TEST(InMemFishStore, Ingest_Concurrent) { store_t store{ 1LL << 24, 1LL << 30, "" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -279,8 +279,8 @@ TEST(InMemFishStore, Ingest_Concurrent) { TEST(InMemFishStore, FullScan) { store_t store{ 1LL << 24, 1LL << 30, "" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -319,12 +319,12 @@ TEST(InMemFishStore, FullScan) { ASSERT_EQ(result, Status::Ok); }; - JsonFullScanContext context1{ {"/id"}, fishstore::core::projection, "1234", 1 }; + JsonFullScanContext context1{ {"id"}, fishstore::core::projection, "1234", 1 }; auto res = store.FullScan(context1, callback, 0); ASSERT_EQ(res, Status::Ok); store.CompletePending(); - JsonFullScanContext context2{ {"/gender"}, fishstore::core::projection, "male", n_records / 2 }; + JsonFullScanContext context2{ {"gender"}, fishstore::core::projection, "male", n_records / 2 }; res = store.FullScan(context2, callback, 0); ASSERT_EQ(res, Status::Ok); store.CompletePending(); diff --git a/test/ingest_test.h b/test/ingest_test.h index 55d8a3d..85cd497 100644 --- a/test/ingest_test.h +++ b/test/ingest_test.h @@ -161,8 +161,8 @@ TEST(CLASS, Ingest_Serial) { std::experimental::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -217,8 +217,8 @@ TEST(CLASS, Ingest_Concurrent) { { store_t store{ 8192, 201326592, "test" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -286,8 +286,8 @@ TEST(CLASS, FullScan) { std::experimental::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); - auto id_proj = store.MakeProjection("/id"); - auto gender_proj = store.MakeProjection("/gender"); + auto id_proj = store.MakeProjection("id"); + auto gender_proj = store.MakeProjection("gender"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, id_proj }); actions.push_back({ REGISTER_GENERAL_PSF, gender_proj }); @@ -335,12 +335,12 @@ TEST(CLASS, FullScan) { ASSERT_EQ(result, Status::Ok); }; - JsonFullScanContext context1{ {"/id"}, fishstore::core::projection, "1234", 1 }; + JsonFullScanContext context1{ {"id"}, fishstore::core::projection, "1234", 1 }; auto res = store.FullScan(context1, callback, 0); ASSERT_EQ(res, Status::Pending); store.CompletePending(true); - JsonFullScanContext context2{ {"/gender"}, fishstore::core::projection, "male", n_records / 2 }; + JsonFullScanContext context2{ {"gender"}, fishstore::core::projection, "male", n_records / 2 }; res = store.FullScan(context2, callback, 0); ASSERT_EQ(res, Status::Pending); store.CompletePending(true); diff --git a/test/register_test.cc b/test/register_test.cc index 3e8af94..2e9b92b 100644 --- a/test/register_test.cc +++ b/test/register_test.cc @@ -175,7 +175,7 @@ TEST(Registration, Register_Concurrent) { std::experimental::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); - auto school_id_proj = store.MakeProjection("/school/id"); + auto school_id_proj = store.MakeProjection("school.id"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, school_id_proj }); @@ -220,7 +220,7 @@ TEST(Registration, Register_Concurrent) { auto res = store.Scan(context1, callback, 0, safe_register_address); store.CompletePending(true); - JsonFullScanContext context2{ {"/school/id"}, fishstore::core::projection, "1" }; + JsonFullScanContext context2{ {"school.id"}, fishstore::core::projection, "1" }; res = store.FullScan(context2, callback, 0, safe_register_address); store.CompletePending(true); @@ -243,7 +243,7 @@ TEST(Registration, Deregister_Concurrent) { std::experimental::filesystem::create_directories("test"); store_t store{ 8192, 201326592, "test" }; store.StartSession(); - auto school_id_proj = store.MakeProjection("/school/id"); + auto school_id_proj = store.MakeProjection("school.id"); std::vector actions; actions.push_back({ REGISTER_GENERAL_PSF, school_id_proj }); uint64_t safe_register_address, safe_unregister_address; @@ -295,7 +295,7 @@ TEST(Registration, Deregister_Concurrent) { auto res = store.Scan(context1, callback, 0, 0, safe_unregister_address); store.CompletePending(true); - JsonFullScanContext context2{ {"/school/id"}, fishstore::core::projection, "1" }; + JsonFullScanContext context2{ {"school.id"}, fishstore::core::projection, "1" }; res = store.FullScan(context2, callback, 0, 0, safe_unregister_address); store.CompletePending(true); diff --git a/test/simdjson_adapter_test.cc b/test/simdjson_adapter_test.cc new file mode 100644 index 0000000..62fdb13 --- /dev/null +++ b/test/simdjson_adapter_test.cc @@ -0,0 +1,52 @@ +// A basic testing framework for simdjson seperate from FishStore +// +// Created by Max Norfolk on 7/23/23. + +#include "gtest/gtest.h" +#include "adapters/simdjson_adapter.h" + +using adapter_t = fishstore::adapter::SIMDJsonAdapter; +using parser_t = fishstore::adapter::SIMDJsonParser; + +std::string raw_json = R"XX( +{"id":3, "school":{"id":6}, "random":"garbage13"} +{ "school":{"id":7}, "id":6, "random":"garbage24"} +{"id":100, "school":{"id":3}, "random":"garbage35"} +)XX"; + +TEST(SimdJsonTests, BasicTest1) { + parser_t parser({"id", "school.id"}); + parser.Load(raw_json.data(), raw_json.length()); + + ASSERT_TRUE(parser.HasNext()); + + auto rec = parser.NextRecord(); + auto &fields = rec.GetFields(); + ASSERT_EQ(fields.size(), 2); + + + // get raw text first + auto raw_text = rec.GetRawText(); + std::string raw_text_str = {raw_text.Data(), raw_text.Length()}; + + // verify raw text + const std::string correct_text = R"({"id":3, "school":{"id":6}, "random":"garbage13"})"; + ASSERT_EQ(raw_text_str, correct_text); + + // check int + auto n = fields[0].GetAsInt(); + ASSERT_TRUE(n.HasValue()); + ASSERT_EQ(n.Value(), 3); + + // get raw again + raw_text = rec.GetRawText(); + raw_text_str = {raw_text.Data(), raw_text.Length()}; + ASSERT_EQ(raw_text_str, correct_text); + +} + + +int main(int argc, char *argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/third_party/hopscotch-map b/third_party/hopscotch-map index 7ef9cc4..9415490 160000 --- a/third_party/hopscotch-map +++ b/third_party/hopscotch-map @@ -1 +1 @@ -Subproject commit 7ef9cc4aca326cbe725553ca330d74076f936ff2 +Subproject commit 9415490f2fb2a7127d2b72e5472c11dfc12f6463 diff --git a/third_party/simdjson b/third_party/simdjson index ee66fb1..939844d 160000 --- a/third_party/simdjson +++ b/third_party/simdjson @@ -1 +1 @@ -Subproject commit ee66fb1c602e17563606c6f6eecc225dac5455cc +Subproject commit 939844d79fe183a690cd5df5d50617cd828d7adc