Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <rapidjson/stringbuffer.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <shared_mutex>
#include <unordered_map>
Expand Down Expand Up @@ -427,7 +428,9 @@ void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
int64_t now = ::time(nullptr);
for (auto&& rs : to_delete) {
rs->rowset_meta()->set_stale_at(now);
rs_metas.push_back(rs->rowset_meta());
_stale_rs_version_map[rs->version()] = rs;
}
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

#include <gen_cpp/olap_file.pb.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -205,6 +207,15 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
return _rowset_meta_pb.set_creation_time(creation_time);
}

int64_t stale_at() const {
int64_t stale_time = _stale_at_s.load();
return stale_time > 0 ? stale_time : _rowset_meta_pb.creation_time();
}

bool has_stale_at() const { return _stale_at_s.load() > 0; }

void set_stale_at(int64_t stale_at) { _stale_at_s.store(stale_at); }

int64_t partition_id() const { return _rowset_meta_pb.partition_id(); }

void set_partition_id(int64_t partition_id) {
Expand Down Expand Up @@ -402,6 +413,7 @@ class RowsetMeta : public MetadataAdder<RowsetMeta> {
StorageResource _storage_resource;
bool _is_removed_from_rowset_meta = false;
DorisCallOnce<Result<EncryptionAlgorithmPB>> _determine_encryption_once;
std::atomic<int64_t> _stale_at_s {0};
};

} // namespace doris
Expand Down
12 changes: 12 additions & 0 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,14 @@ Status Tablet::_init_once_action() {
}

// init stale rowset
int64_t now = ::time(nullptr);
for (const auto& stale_rs_meta : _tablet_meta->all_stale_rs_metas()) {
Version version = stale_rs_meta->version();

if (!stale_rs_meta->has_stale_at()) {
stale_rs_meta->set_stale_at(now);
}

RowsetSharedPtr rowset;
res = create_rowset(stale_rs_meta, &rowset);
if (!res.ok()) {
Expand Down Expand Up @@ -571,11 +577,13 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
}

std::vector<RowsetMetaSharedPtr> rs_metas_to_delete;
int64_t now = ::time(nullptr);
for (auto& rs : to_delete) {
rs_metas_to_delete.push_back(rs->rowset_meta());
_rs_version_map.erase(rs->version());

if (!same_version) {
rs->rowset_meta()->set_stale_at(now);
// put compaction rowsets in _stale_rs_version_map.
_stale_rs_version_map[rs->version()] = rs;
}
Expand Down Expand Up @@ -631,7 +639,11 @@ Status Tablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, boo
}
std::vector<RowsetMetaSharedPtr> rs_metas;
rs_metas.reserve(to_delete.size());
int64_t now = ::time(nullptr);
for (const auto& rs : to_delete) {
if (move_to_stale) {
rs->rowset_meta()->set_stale_at(now);
}
rs_metas.push_back(rs->rowset_meta());
_rs_version_map.erase(rs->version());
}
Expand Down
7 changes: 3 additions & 4 deletions be/src/olap/version_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ void TimestampedVersionTracker::_init_stale_version_path_map(
} else if (diff > 0) {
return false;
}
// When the version diff is equal, compare the rowset`s create time
return a->creation_time() < b->creation_time();
// When the version diff is equal, compare the rowset`s stale time
return a->stale_at() < b->stale_at();
});

// first_version -> (second_version -> rowset_meta)
Expand Down Expand Up @@ -306,8 +306,7 @@ void TimestampedVersionTracker::add_stale_path_version(

PathVersionListSharedPtr ptr(new TimestampedVersionPathContainer());
for (auto rs : stale_rs_metas) {
TimestampedVersionSharedPtr vt_ptr(
new TimestampedVersion(rs->version(), rs->creation_time()));
TimestampedVersionSharedPtr vt_ptr(new TimestampedVersion(rs->version(), rs->stale_at()));
ptr->add_timestamped_version(vt_ptr);
}

Expand Down
101 changes: 101 additions & 0 deletions be/test/olap/stale_at_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gtest/gtest.h>

#include <ctime>

#include "olap/rowset/rowset_meta.h"
#include "olap/version_graph.h"

namespace doris {

class StaleAtTest : public testing::Test {
public:
void SetUp() override {}
void TearDown() override {}
};

TEST_F(StaleAtTest, TestRowsetMetaStaleAt) {
// Create a RowsetMeta and test stale_at functionality
RowsetMeta rowset_meta;

int64_t creation_time = 1000000;
int64_t stale_at_time = 2000000;

// Set creation time
rowset_meta.set_creation_time(creation_time);

// Initially, stale_at should return creation_time since stale_at is not set
EXPECT_EQ(rowset_meta.stale_at(), creation_time);
EXPECT_FALSE(rowset_meta.has_stale_at());

rowset_meta.set_stale_at(stale_at_time);

EXPECT_EQ(rowset_meta.stale_at(), stale_at_time);
EXPECT_TRUE(rowset_meta.has_stale_at());
}

TEST_F(StaleAtTest, TestTimestampedVersionWithStaleTime) {
// Test that TimestampedVersion works correctly with stale_time
RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();

int64_t creation_time = 1000000;
int64_t stale_at_time = 2000000;

rowset_meta->set_creation_time(creation_time);
rowset_meta->set_stale_at(stale_at_time);

// Create a TimestampedVersion using stale_at
Version version(1, 5);
TimestampedVersionSharedPtr tv_ptr(new TimestampedVersion(version, rowset_meta->stale_at()));

EXPECT_EQ(tv_ptr->get_create_time(), stale_at_time);
EXPECT_EQ(tv_ptr->version(), version);
}

TEST_F(StaleAtTest, TestStalePathVersionWithStaleAt) {
// Test that add_stale_path_version uses stale_at correctly
TimestampedVersionTracker tracker;

std::vector<RowsetMetaSharedPtr> stale_rs_metas;

// Create rowset metas with different creation and stale times
for (int i = 0; i < 3; ++i) {
RowsetMetaSharedPtr rs_meta = std::make_shared<RowsetMeta>();
rs_meta->set_creation_time(1000000 + i * 1000);
rs_meta->set_stale_at(2000000);
rs_meta->set_version(Version(i * 2 + 1, i * 2 + 2));
stale_rs_metas.push_back(rs_meta);
}

// Add stale path version
tracker.add_stale_path_version(stale_rs_metas);

// Check that expired paths are captured correctly using stale_at time
std::vector<int64_t> expired_paths;

// With endtime before stale_at, no paths should be expired
tracker.capture_expired_paths(1999999, &expired_paths);
EXPECT_EQ(expired_paths.size(), 0);

// With endtime after stale_at, paths should be expired
tracker.capture_expired_paths(2000001, &expired_paths);
EXPECT_EQ(expired_paths.size(), 1);
}

} // namespace doris
Loading