-
Notifications
You must be signed in to change notification settings - Fork 41
Add stale reads to spanner #1639
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Separating out some fixes from #1639 to unblock while the stale reads are still in discussion (The failing tests were making it harder to evaluate performance, so it'd be helpful to get the fixes in sooner) Includes * Sorting observation query test results for determinism in tests * Adding "distinct" to all chaining queries to avoid duplicates. "Match any" is not sufficient if there are multiple paths between nodes
|
Going to rethink this a bit after some discussion with Vishal tl;dr - having an in memory cache could cause inconsistency across shards |
vish-cs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the changes!
|
|
||
| const ( | ||
| // CACHE_DURATION defines how long the CompletionTimestamp is kept in memory before being refetched. | ||
| CACHE_DURATION = 5 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we rename this to TIMESTAMP_CACHE_DURATION just to be explicit
| withStruct func(interface{}), | ||
| ) error { | ||
| iter := sc.client.Single().Query(ctx, stmt) | ||
| timestampBound, err := sc.GetStalenessTimestampBound(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want a feature flag to disable stale reads?
| return nil, err | ||
| } | ||
|
|
||
| timestampBound := spanner.ReadTimestamp(*completionTimestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious what's the difference b/w completion timestamp and timestampBound?
|
|
||
| err = sc.processRows(iter, newStruct, withStruct) | ||
|
|
||
| // Check if the error is due to an expired timestamp (FAILED_PRECONDITION). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious when can that happen...max 7 day timeout?
| // It prioritizes returning a value from an in-memory cache to reduce Spanner traffic. | ||
| func (sc *SpannerClient) getCompletionTimestamp(ctx context.Context) (*time.Time, error) { | ||
| // Check cache | ||
| sc.cacheMutex.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, we need to think through how consistency would be ensured across caches in different mixer instances.
Timestamps are read from the IngestionHistory table, but are cached for the next 5 seconds in memory to avoid a large increase in traffic. I can adjust the expiry as needed, depending on how frequent we expect the incremental data to be refreshed + how fresh we want the data
Since the running spanner instance (dc_graph_2025_09_15) doesn't have incremental ingestion, the timestamp will eventually go stale (it's manually set), so I added a temporary fallback to strong reads. Once we switch to incremental ingestion, we should make this check stronger and revisit if we want these queries to actually fail, since becoming stale indicates that something went wrong in ingestion. (I set the version retention to the max of 7 days)