Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions chromadb/test/distributed/test_task_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,71 @@ def test_function_remove_nonexistent(basic_http_client: System) -> None:
# Trying to detach this function again should raise NotFoundError
with pytest.raises(NotFoundError, match="does not exist"):
attached_fn.detach(delete_output_collection=True)


def test_count_function_attach_and_detach_attach_attach(basic_http_client: System) -> None:
"""Test creating and removing a function with the record_counter operator"""
client = ClientCreator.from_system(basic_http_client)
client.reset()

# Create a collection
collection = client.get_or_create_collection(
name="my_document",
metadata={"description": "Sample documents for task processing"},
)

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)

# Verify task creation succeeded
assert attached_fn is not None
initial_version = get_collection_version(client, collection.name)

# Add documents
collection.add(
ids=["doc_{}".format(i) for i in range(0, 300)],
documents=["test document"] * 300,
)

# Verify documents were added
assert collection.count() == 300

wait_for_version_increase(client, collection.name, initial_version)
# Give some time to invalidate the frontend query cache
sleep(60)

result = client.get_collection("my_documents_counts").get("function_output")
assert result["metadatas"] is not None
assert result["metadatas"][0]["total_count"] == 300

# Remove the task
success = attached_fn.detach(
delete_output_collection=True,
)

# Verify task removal succeeded
assert success is True

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)
assert attached_fn is not None

# Create a task that counts records in the collection
attached_fn = collection.attach_function(
name="count_my_docs",
function_id="record_counter", # Built-in operator that counts records
output_collection="my_documents_counts",
params=None,
)
assert attached_fn is not None
Comment on lines +232 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommended

[Maintainability] [CodeDuplication] The new test test_count_function_attach_and_detach_attach_attach is very similar to the existing test_count_function_attach_and_detach test in the same file. A significant portion of the code, including setting up the collection, attaching the function, adding data, and verifying the initial run, is duplicated.

To improve maintainability and reduce redundancy, consider refactoring the common setup and execution logic into a helper function that both tests can call.

Context for Agents
[CodeDuplication] The new test `test_count_function_attach_and_detach_attach_attach` is very similar to the existing `test_count_function_attach_and_detach` test in the same file. A significant portion of the code, including setting up the collection, attaching the function, adding data, and verifying the initial run, is duplicated.

To improve maintainability and reduce redundancy, consider refactoring the common setup and execution logic into a helper function that both tests can call.

File: chromadb/test/distributed/test_task_api.py
Line: 296


12 changes: 6 additions & 6 deletions go/pkg/sysdb/coordinator/create_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (suite *AttachFunctionTestSuite) setupAttachFunctionMocks(ctx context.Conte
// Phase 1: Create attached function in transaction
// Check if any attached function exists for this collection
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{}, nil).Once()

suite.mockMetaDomain.On("DatabaseDb", mock.Anything).Return(suite.mockDatabaseDb).Once()
Expand Down Expand Up @@ -167,7 +167,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_SuccessfulCreation() {
// Setup mocks that will be called within the transaction (using mock.Anything for context)
// Check if any attached function exists for this collection
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{}, nil).Once()

// Look up database
Expand Down Expand Up @@ -285,7 +285,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Alrea

// Inside transaction: check for existing attached functions
suite.mockMetaDomain.On("AttachedFunctionDb", txCtx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{existingAttachedFunction}, nil).Once()

// Validate function by ID
Expand Down Expand Up @@ -364,7 +364,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {

// Phase 1: Create attached function in transaction
suite.mockMetaDomain.On("AttachedFunctionDb", mock.Anything).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{}, nil).Once()

suite.mockMetaDomain.On("DatabaseDb", mock.Anything).Return(suite.mockDatabaseDb).Once()
Expand Down Expand Up @@ -420,7 +420,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_RecoveryFlow() {

// Inside transaction: check for existing attached functions
suite.mockMetaDomain.On("AttachedFunctionDb", txCtx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{incompleteAttachedFunction}, nil).Once()

// Validate function matches
Expand Down Expand Up @@ -511,7 +511,7 @@ func (suite *AttachFunctionTestSuite) TestAttachFunction_IdempotentRequest_Param

// Inside transaction: check for existing attached functions
suite.mockMetaDomain.On("AttachedFunctionDb", txCtx).Return(suite.mockAttachedFunctionDb).Once()
suite.mockAttachedFunctionDb.On("GetByCollectionID", inputCollectionID).
suite.mockAttachedFunctionDb.On("GetReadyOrNotReadyByCollectionID", inputCollectionID).
Return([]*dbmodel.AttachedFunction{existingAttachedFunction}, nil).Once()

// Validate function - returns DIFFERENT function name
Expand Down
6 changes: 4 additions & 2 deletions go/pkg/sysdb/coordinator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
// Check if there's any active (ready, non-deleted) attached function for this collection
// We only allow one active attached function per collection
existingAttachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetByCollectionID(req.InputCollectionId)
existingAttachedFunctions, err := s.catalog.metaDomain.AttachedFunctionDb(txCtx).GetReadyOrNotReadyByCollectionID(req.InputCollectionId)
if err != nil {
log.Error("AttachFunction: failed to check for existing attached function", zap.Error(err))
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +102]

Ok now this error is wrong. If you have more than one ready attached function that's a problem. But it could be possible to have a bunch of partially attached nonready function.

See this comment inline on Graphite.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we allow that? That seems like a buggy state to allow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if they never become ready and are the cumulative result of multiple failed backfill requests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like we're letting our invariants go loose. If there can only be one, better not make them fight for that distinction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invariant is that there can only be one ready function, that reminds me that I need to add validation for that before making a function ready in the commit phase of AttachFunction 2PC.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: line +125]

This line of code should be doing what your change down below intends on doing. I wonder with your addition of GetReadyOrNotReadyByCollectionID whether the right thing to do now is to get rid of the fall-through on line 198.

We also now need to make sure line 121 doesn't error out too early if it happens to read a non-matching unready function first.

See this comment inline on Graphite.

Expand Down Expand Up @@ -194,7 +194,9 @@ func (s *Coordinator) AttachFunction(ctx context.Context, req *coordinatorpb.Att
}

err = s.catalog.metaDomain.AttachedFunctionDb(txCtx).Insert(attachedFunction)
if err != nil {
if err == common.ErrAttachedFunctionAlreadyExists {
// idempotent fall through
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify that the nonready function that exists actually matches the currently requested function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a bunch of code above that does this?

Copy link
Contributor

@tanujnay112 tanujnay112 Dec 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does that only for functions that are ready. You might be able to reuse the above checks by changing GetByCollectionID to a function called GetAnyByCollectionID that returns deleted (consistency with other GetAny methods) and non-ready functions.

} else if err != nil {
log.Error("AttachFunction: failed to insert attached function", zap.Error(err))
return err
}
Expand Down
17 changes: 17 additions & 0 deletions go/pkg/sysdb/metastore/db/dao/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,23 @@ func (s *attachedFunctionDb) GetByCollectionID(inputCollectionID string) ([]*dbm
return attachedFunctions, nil
}

// Returns the non-deleted functions, without regard for `is_ready`. Deleted functions will still
// be excluded.
func (s *attachedFunctionDb) GetReadyOrNotReadyByCollectionID(inputCollectionID string) ([]*dbmodel.AttachedFunction, error) {
var attachedFunctions []*dbmodel.AttachedFunction
err := s.db.
Where("input_collection_id = ?", inputCollectionID).
Where("is_deleted = ?", false).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically all the other GetAny methods return deleted functions too so this is an inconsistency. I can add this to my to-clean-up list for cleanup day.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning "Deleted" breaks this case. What would you call between GetBy and GetAnyBy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it ReadyOrNotReady and documented.

Find(&attachedFunctions).Error

if err != nil {
log.Error("GetReadyOrNotReadyByCollectionID failed", zap.Error(err), zap.String("input_collection_id", inputCollectionID))
return nil, err
}

return attachedFunctions, nil
}

func (s *attachedFunctionDb) SoftDelete(inputCollectionID string, name string) error {
// Update name and is_deleted in a single query
// Format: _deleted_<original_name>_<id>
Expand Down
30 changes: 30 additions & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/mocks/IAttachedFunctionDb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go/pkg/sysdb/metastore/db/dbmodel/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type IAttachedFunctionDb interface {
GetByID(id uuid.UUID) (*AttachedFunction, error)
GetAnyByID(id uuid.UUID) (*AttachedFunction, error) // TODO(tanujnay112): Consolidate all the getters.
GetByCollectionID(inputCollectionID string) ([]*AttachedFunction, error)
GetReadyOrNotReadyByCollectionID(inputCollectionID string) ([]*AttachedFunction, error)
Update(attachedFunction *AttachedFunction) error
Finish(id uuid.UUID) error
SoftDelete(inputCollectionID string, name string) error
Expand Down
Loading