Skip to content

Introduce peer checkpoints so CouchDB can safely remove deleted documents entirely #5558

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 27 commits into
base: main
Choose a base branch
from

Conversation

rnewson
Copy link
Member

@rnewson rnewson commented Jun 4, 2025

Overview

Apache CouchDB retains some information (at minimum, doc id, doc revision tree and a deleted flag) for all deleted documents forever, in order that replication is guaranteed to converge. This is excessively pessimistic and we would like to improve matters.

This PR introduces a number of changes to achieve its goal;

  1. database shards (.couch files under shards/ directory) gained an additional header property called drop_seq. Once set to a positive, non-negative integer, any deleted document with a lower update sequence is skipped entirely at next compaction.

  2. The notion of a peer checkpoint document. These are all local docs and their ids must have prefix `_local/peer-checkpoint-'.

  3. All indexers (mrview, search, nouveau) and the replicator have been taught to create and update peer checkpoints with the update sequence they have seen at appropriate times (i.e, after they have made every effort to commit the changes they've seen to durable storage).

  4. A new endpoint POST /$dbname/_update_drop_seq which gathers information about the shards of the database, update sequences from all peer checkpoint documents, and the internal shard sync documents, and computes the drop_seq for each shard, and then sends RPC requests to those databases to update the drop_seq.

Testing recommendations

There are some simple tests in the eunit and elixir suites which will be run via the normal Makefile targets.

Additionally there is a stateful property-based test that exercises the code more comprehensively which can be started with make elixir-cluster. This will start a 3 node cluster with nouveau server running and perform random permutations of all relevant operations that could alter which deleted documents are dropped (making docs, deleting docs, creating indexes, creating and updating peer checkpoints, splitting shards).

Related Issues or Pull Requests

N/A

Checklist

  • Code is written and works correctly
  • Changes are covered by tests
  • Any new configurable parameters are documented in rel/overlay/etc/default.ini
  • [TODO] Documentation changes were made in the src/docs folder
  • Documentation changes were backported (separated PR) to affected branches

@rnewson rnewson force-pushed the auto-delete-3 branch 2 times, most recently from 57c035c to ade8dd1 Compare June 10, 2025 16:05
@rnewson rnewson force-pushed the auto-delete-3 branch 3 times, most recently from 9d75bbd to da49f86 Compare June 18, 2025 15:07
increment_drop_count(#st{header = Header} = St, Inc) when is_integer(Inc), Inc >= 0 ->
CurrentDropCount = get_drop_count(St),
NewSt = St#st{
header = couch_bt_engine_header:set(Header, [{drop_count, CurrentDropCount + Inc}]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_drop_count/1 is implemented in terms of couch_bt_engine_header:get/2 which uses undefined as default value. An attempt to do undefined + Inc would cause an exception. Which would terminate the compaction process. I don't know whether we can have a state where the drop_count is not present in the header when we are calling the increment_drop_count.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, have been focused on drop seq itself for too long and neglected upgrade implications. will fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we call upgrade on the header as we open so drop_seq/count get filled in as 0. Experimentally I have confirmed this with 1) on main, create a database 2) switch to this branch 3) GET /dbname shows drop_count of 0 (and not a crash from trying to do undefined + undefined.

@@ -210,7 +215,17 @@ finish_update(State) ->

commit(State) ->
Header = {State#mrst.sig, couch_mrview_util:make_header(State)},
couch_file:write_header(State#mrst.fd, Header).
ok = couch_file:sync(State#mrst.fd),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why you do the sync here before and after. Although it feels like we are breaking the abstraction boundaries (and bypassing ioq). The decision to do sync belongs to couch application or more specifically to the storage engine. As we do in couch_bt_engine:commit_data/1.
Unfortunately we don't have something like it for indexes.
So I was thinking maybe we should update couch_file to keep the old header checksum in the state. The checksum is a prefix of the header so it is easy to get access to (we need to know the size of the checksum though). However maintaining the Checksum to be able to bypass the IO is just one side of the coin. The other is the need to wrap the write_header in two sync calls to do a commit. This is where I stopped my train of thought. Because we would need to be able to pass the NeedsCommit flag.

Maybe we should add a separate call to couch_file. We might have couch_file:write_header/2 which we wouldn't do the sync (no change) and separate couch_file:commit_header/2 which would do sync calls.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the "pluggable storage engine" abstraction has only ever applied to primary data. And, at this stage, I think we would remove the abstraction rather than extend it to indexes, so I don't agree that "The decision to do sync belongs to couch application or more specifically to the storage engine".

I'd prefer not to have the file:sync the .mrview files at all but it seems the simplest way to be as sure as we realistically can be that the index updates are durable (and thus allowing us to permanently drop deleted documents that have already been applied to it).

In nouveau this is achieved differently as index commits there have to be infrequent for performance reasons. I just don't see any easy way to do the same approach for mrview.

Copy link
Contributor

@iilyak iilyak Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer not to have the file:sync the .mrview files at all but it seems the simplest way to be as sure as we realistically can be that the index updates are durable

Adding couch_file:commit_header/2 seems easy and it wouldn't bypass ioq.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your bypassing ioq point or how moving this to a new function in couch_file would prevent it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the implementation of write_header](https://github.com/apache/couchdb/blob/4adf245b99d5ac2d8ff7f5f49821c4911c6d7688/src/couch/src/couch_file.erl#L435C1-L440C69)

write_header(Fd, Data) ->
    Bin = ?term_to_bin(Data),
    Checksum = generate_checksum(Bin),
    % now we assemble the final header binary and write to disk
    FinalBin = <<Checksum/binary, Bin/binary>>,
    ioq:call(Fd, {write_header, FinalBin}, erlang:get(io_priority)).

As you can see the call reaches the couch_file's gen_server via ioq.

The sync doesn't go via ioq

sync(Fd) ->
    case gen_server:call(Fd, sync, infinity) of
        ok ->
            ok;
        {error, Reason} ->
            erlang:error({fsync_error, Reason})
    end.

Here is what I propose

commit_header(Fd, Data) ->
    Bin = ?term_to_bin(Data),
    Checksum = generate_checksum(Bin),
    % now we assemble the final header binary and write to disk
    FinalBin = <<Checksum/binary, Bin/binary>>,
    ioq:call(Fd, {commit_header, FinalBin}, erlang:get(io_priority)).

...

handle_call({commit_header, _Bin}, From, #file{fd = Fd} = File) ->
    sync(Fd),
    Result = handle_call({write_header, Bin}, From, File),
    sync(Fd),
    Result;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation replaces three calls to gen_server with a single call and ensure durable write of the header.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, ok. I think that's a separate PR to this effort though, as couch_bt_engine:commit_data does the same thing as I do here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The #5576 is ready for review

end
end),
receive
{'DOWN', OpenRef, _, _, Revs} ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shape of this message is the same as the one on line 530. You probably want to check the shape of Revs. Otherwise the {checkpoint_fetch_failure, Reason} would end up in the term.

17> {_, OpenRef} = spawn_monitor(fun() -> throw(fff) end).
{<0.112.0>,#Ref<0.2499101716.2367684609.212273>}
=ERROR REPORT==== 18-Jun-2025::12:09:20.981098 ===
Error in process <0.112.0> with exit value:
{{nocatch,fff},[{shell,apply_fun,3,[{file,"shell.erl"},{line,914}]}]}

18> flush().
Shell got {'DOWN',#Ref<0.2499101716.2367684609.212273>,process,<0.112.0>,
                  {{nocatch,fff},
                   [{shell,apply_fun,3,[{file,"shell.erl"},{line,914}]}]}}

MrArgs = #mrargs{
view_type = map,
include_docs = true,
start_key = <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, "-">>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we have many places in CouchDB codebase which do something like this. However it makes it harder to read for newcomers who are often unfamiliar with the fact that we rely on ASCII ordering.

Would you mind adding a define or helper function (with inline) so it is self documenting. I didn't finish reading the PR, maybe there is a better place to define these helpers.

% approach using define
-define LOWEST_KEY(SubType, <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, "-">>).

-define HIGHEST_KEY(SubType, <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, ".">>).

...
start_key = ?LOWEST_KEY(SubType),
end_key = ?HIGHEST_KEY(SubType),
% approach with inline function
-compile({inline, [
    start_key/1,
    end_key/1
]}).
start_key(SubType) -> 
    <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, "-">>.

end_key(Partition) -> 
    <<?LOCAL_DOC_PREFIX, "peer-checkpoint-", SubType/binary, ".">>.

...
start_key = start_key(SubType),
end_key = end_key(SubType),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I like that.

Targets
),
State#state{targets = Targets1}.

spread(Amount, N) when is_integer(Amount), Amount >= 0, is_integer(N), N > 0 ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice.

[B, E] = Worker#shard.range,
BHex = couch_util:to_hex(<<B:32/integer>>),
EHex = couch_util:to_hex(<<E:32/integer>>),
Range = list_to_binary([BHex, "-", EHex]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an optimized version of to_hex which avoids extra conversion.

BHex = couch_util:to_hex_bin(<<B:32/integer>>),
EHex = couch_util:to_hex_bin(<<E:32/integer>>),
<<BHex/binary,"-",EHex/binary>>,

merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
->
true = uuids_match([Uuid1, Uuid2]),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the merge_peers is used in many maps:merge_with invocations. All of them would crash if uuids_match/2 result is not true. Is it intentional (like an assertion)?

Copy link
Member Author

@rnewson rnewson Jun 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is an assertion, yes. (to guard against bugs in my code if I had a mix up, the uuids serve as a validation that I'm talking about the right shard file)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add comment.

% Assert we are talking about the right shards file...
% ...
true = uuids_match([Uuid1, Uuid2]),

or maybe use ?assert macro.

-include_lib("stdlib/include/assert.hrl").

merge_peers(_Key, {Uuid1, Val1}, {Uuid2, Val2}) when
    is_binary(Uuid1), is_binary(Uuid2), is_integer(Val1), is_integer(Val2)
->
    ?assert(uuids_match([Uuid1, Uuid2]), "UUIDs belong to different shard files"),

if
ExpectedUuidPrefix /= ActualUuidPrefix ->
{error, uuid_mismatch};
NewDropSeq < CurrentDropSeq ->
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implies we are okay with setting drop_seq to the same value again, in which case we can save ourselves writing a new header maybe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's so you can call _update_drop_seq repeatedly without getting an error (if you make no other changes that would increment NewDropSeq). I think it might be ok to make this an error but I don't see much need to do so.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I didn’t mean to raise an error, but to not write a new header if the drop seqs are the same because presumably that value already made it to a header the last time we were here. but no bother if edge case.

@@ -796,6 +807,10 @@ set_security(_, _) ->
set_user_ctx(#db{} = Db, UserCtx) ->
{ok, Db#db{user_ctx = UserCtx}}.

set_drop_seq(#db{main_pid = Pid} = Db, UuidPrefix, DropSeq) ->
check_is_admin(Db),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this inherit its user context from the _calculate_drop_seq HTTP API call? Do we need to adjust this if get here via ken?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ken will not call /dbname/_update_drop_seq, only users will. ken will update indexes and those updates will update peer checkpoint docs, but that's all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah boo I meant smoosh for when we no longer require folks to call the endpoint manually

substitute_splits(Shards, UuidMap, PeerCheckpoints) ->
maps:fold(
fun({[PS, PE], Node}, {Uuid, Seq}, Acc) ->
ShardsInRange = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could re-use this functionality from mem3.

%% mem3.erl

filter_shards_by_range(Range, Shards) ->
    mem3_shards:filter_shards_by_range(Range, Shards)

%% mem3_shards.erl

-export([filter_shards_by_range/2]).

Copy link
Member Author

@rnewson rnewson Jun 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, I don't think it's the same calculation. for splits I want to find the ranges that are subsets of the peer checkpoints range, it wouldn't be enough just for them to overlap.

case
lists:search(
fun({SU, SS, _TU, _TS}) ->
uuids_match([Uuid, SU]) andalso SS =< Seq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor optimization. It would be faster if you would swap the order of checks.

SS =< Seq andalso uuids_match([Uuid, SU])

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good idea. it's the SS =< Seq condition that matters ("find the most recent checkpoint that includes the seq I care about"), the uuid check is a safeguard.

@rnewson
Copy link
Member Author

rnewson commented Jun 24, 2025

noting we may need a downgrade option as this work adds two items to the db_header (drop_seq and drop_count)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants