diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 23232fb1563599..4feea01ba229c1 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -427,7 +428,9 @@ void CloudTablet::delete_rowsets(const std::vector& to_delete, } std::vector rs_metas; rs_metas.reserve(to_delete.size()); + int64_t now = ::time(nullptr); for (auto&& rs : to_delete) { + rs->rowset_meta()->set_stale_at(now); rs_metas.push_back(rs->rowset_meta()); _stale_rs_version_map[rs->version()] = rs; } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 1ec5b30f0feb9e..d0fab772045910 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -20,6 +20,8 @@ #include +#include +#include #include #include #include @@ -205,6 +207,15 @@ class RowsetMeta : public MetadataAdder { return _rowset_meta_pb.set_creation_time(creation_time); } + int64_t stale_at() const { + int64_t stale_time = _stale_at_s.load(); + return stale_time > 0 ? stale_time : _rowset_meta_pb.creation_time(); + } + + bool has_stale_at() const { return _stale_at_s.load() > 0; } + + void set_stale_at(int64_t stale_at) { _stale_at_s.store(stale_at); } + int64_t partition_id() const { return _rowset_meta_pb.partition_id(); } void set_partition_id(int64_t partition_id) { @@ -402,6 +413,7 @@ class RowsetMeta : public MetadataAdder { StorageResource _storage_resource; bool _is_removed_from_rowset_meta = false; DorisCallOnce> _determine_encryption_once; + std::atomic _stale_at_s {0}; }; } // namespace doris diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index b78347ddf57daf..a79de2964620b8 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -310,8 +310,14 @@ Status Tablet::_init_once_action() { } // init stale rowset + int64_t now = ::time(nullptr); for (const auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) { Version version = stale_rs_meta->version(); + + if (!stale_rs_meta->has_stale_at()) { + stale_rs_meta->set_stale_at(now); + } + RowsetSharedPtr rowset; res = create_rowset(stale_rs_meta, &rowset); if (!res.ok()) { @@ -571,11 +577,13 @@ Status Tablet::modify_rowsets(std::vector& to_add, } std::vector rs_metas_to_delete; + int64_t now = ::time(nullptr); for (auto& rs : to_delete) { rs_metas_to_delete.push_back(rs->rowset_meta()); _rs_version_map.erase(rs->version()); if (!same_version) { + rs->rowset_meta()->set_stale_at(now); // put compaction rowsets in _stale_rs_version_map. _stale_rs_version_map[rs->version()] = rs; } @@ -631,7 +639,11 @@ Status Tablet::delete_rowsets(const std::vector& to_delete, boo } std::vector rs_metas; rs_metas.reserve(to_delete.size()); + int64_t now = ::time(nullptr); for (const auto& rs : to_delete) { + if (move_to_stale) { + rs->rowset_meta()->set_stale_at(now); + } rs_metas.push_back(rs->rowset_meta()); _rs_version_map.erase(rs->version()); } diff --git a/be/src/olap/version_graph.cpp b/be/src/olap/version_graph.cpp index 010e7ce9fd89a5..c5f8aff9d47def 100644 --- a/be/src/olap/version_graph.cpp +++ b/be/src/olap/version_graph.cpp @@ -92,8 +92,8 @@ void TimestampedVersionTracker::_init_stale_version_path_map( } else if (diff > 0) { return false; } - // When the version diff is equal, compare the rowset`s create time - return a->creation_time() < b->creation_time(); + // When the version diff is equal, compare the rowset`s stale time + return a->stale_at() < b->stale_at(); }); // first_version -> (second_version -> rowset_meta) @@ -306,8 +306,7 @@ void TimestampedVersionTracker::add_stale_path_version( PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer()); for (auto rs : stale_rs_metas) { - TimestampedVersionSharedPtr vt_ptr( - new TimestampedVersion(rs->version(), rs->creation_time())); + TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at())); ptr->add_timestamped_version(vt_ptr); } diff --git a/be/test/olap/stale_at_test.cpp b/be/test/olap/stale_at_test.cpp new file mode 100644 index 00000000000000..7fe2b48574fd41 --- /dev/null +++ b/be/test/olap/stale_at_test.cpp @@ -0,0 +1,101 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "olap/rowset/rowset_meta.h" +#include "olap/version_graph.h" + +namespace doris { + +class StaleAtTest : public testing::Test { +public: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(StaleAtTest, TestRowsetMetaStaleAt) { + // Create a RowsetMeta and test stale_at functionality + RowsetMeta rowset_meta; + + int64_t creation_time = 1000000; + int64_t stale_at_time = 2000000; + + // Set creation time + rowset_meta.set_creation_time(creation_time); + + // Initially, stale_at should return creation_time since stale_at is not set + EXPECT_EQ(rowset_meta.stale_at(), creation_time); + EXPECT_FALSE(rowset_meta.has_stale_at()); + + rowset_meta.set_stale_at(stale_at_time); + + EXPECT_EQ(rowset_meta.stale_at(), stale_at_time); + EXPECT_TRUE(rowset_meta.has_stale_at()); +} + +TEST_F(StaleAtTest, TestTimestampedVersionWithStaleTime) { + // Test that TimestampedVersion works correctly with stale_time + RowsetMetaSharedPtr rowset_meta = std::make_shared(); + + int64_t creation_time = 1000000; + int64_t stale_at_time = 2000000; + + rowset_meta->set_creation_time(creation_time); + rowset_meta->set_stale_at(stale_at_time); + + // Create a TimestampedVersion using stale_at + Version version(1, 5); + TimestampedVersionSharedPtr tv_ptr(new TimestampedVersion(version, rowset_meta->stale_at())); + + EXPECT_EQ(tv_ptr->get_create_time(), stale_at_time); + EXPECT_EQ(tv_ptr->version(), version); +} + +TEST_F(StaleAtTest, TestStalePathVersionWithStaleAt) { + // Test that add_stale_path_version uses stale_at correctly + TimestampedVersionTracker tracker; + + std::vector stale_rs_metas; + + // Create rowset metas with different creation and stale times + for (int i = 0; i < 3; ++i) { + RowsetMetaSharedPtr rs_meta = std::make_shared(); + rs_meta->set_creation_time(1000000 + i * 1000); + rs_meta->set_stale_at(2000000); + rs_meta->set_version(Version(i * 2 + 1, i * 2 + 2)); + stale_rs_metas.push_back(rs_meta); + } + + // Add stale path version + tracker.add_stale_path_version(stale_rs_metas); + + // Check that expired paths are captured correctly using stale_at time + std::vector expired_paths; + + // With endtime before stale_at, no paths should be expired + tracker.capture_expired_paths(1999999, &expired_paths); + EXPECT_EQ(expired_paths.size(), 0); + + // With endtime after stale_at, paths should be expired + tracker.capture_expired_paths(2000001, &expired_paths); + EXPECT_EQ(expired_paths.size(), 1); +} + +} // namespace doris \ No newline at end of file