-
Notifications
You must be signed in to change notification settings - Fork 25
Timescaledb: add column nullability check on insert and switch to psycopg3 #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: development
Are you sure you want to change the base?
Conversation
0b50684
to
32c858a
Compare
32c858a
to
d3d2e7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, but I did have a few comments :)
storey/timescaledb_target.py
Outdated
rows = await cur.fetchall() | ||
|
||
if not rows: | ||
raise ValueError(f"Table '{schema_name}.{self._table}' not found or has no columns") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be nice to tell the user which one it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant telling the user that the table doesn't exist, or that it has no columns. Pretty sure we can tell from whether rows is None
or an empty list. It's nice to avoid ambiguity in the error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
return | ||
|
||
# Get table schema for validation on first use | ||
schema = await self._get_table_schema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we shouldn't just do it in _init()
, so that there's no need to cache it, just initialize it on startup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is that the current architecture deliberately keeps _init() synchronous and defers async operations. If we wanted to do schema initialization in _init(), we'd need to use synchronous database operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the init sequence is synchronous. It would be good to do it there, but it's not very important.
invalid_data = "this_is_not_a_dictionary" | ||
|
||
# Should raise TypeError for non-dictionary data (the error occurs in the parent Writer class) | ||
with pytest.raises(TypeError, match=r"string indices must be integers"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're already testing it, maybe we should raise a better error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have missed this one though since this line wasn't updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the error is raise by Writer::_event_to_writer_entry(), so not sure what to test there
ea770dd
to
8356ba6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good stuff. A couple more things to consider.
integration/test_timescaledb.py
Outdated
except Exception as e: | ||
print(f"Cleanup error: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just let it be raised normally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
integration/test_timescaledb.py
Outdated
# Cleanup | ||
with connection.cursor() as cursor: | ||
cursor.execute(f"DROP TABLE IF EXISTS {validation_table} CASCADE;") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup block will not run in case of a test failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
integration/test_timescaledb.py
Outdated
# Cleanup | ||
with connection.cursor() as cursor: | ||
cursor.execute(f"DROP TABLE IF EXISTS {schema_table} CASCADE;") | ||
cursor.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE;") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup block will not run in case of a test failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
integration/test_timescaledb.py
Outdated
# Cleanup | ||
await target._terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cleanup block will not run in case of a test failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
integration/test_timescaledb.py
Outdated
""" | ||
) | ||
cursor.execute(f"SELECT create_hypertable('{validation_table}', 'time');") | ||
cursor.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't run in case of error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
storey/timescaledb_target.py
Outdated
rows = await cur.fetchall() | ||
|
||
if not rows: | ||
raise ValueError(f"Table '{schema_name}.{self._table}' not found or has no columns") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant telling the user that the table doesn't exist, or that it has no columns. Pretty sure we can tell from whether rows is None
or an empty list. It's nice to avoid ambiguity in the error message.
return | ||
|
||
# Get table schema for validation on first use | ||
schema = await self._get_table_schema() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the init sequence is synchronous. It would be good to do it there, but it's not very important.
invalid_data = "this_is_not_a_dictionary" | ||
|
||
# Should raise TypeError for non-dictionary data (the error occurs in the parent Writer class) | ||
with pytest.raises(TypeError, match=r"string indices must be integers"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have missed this one though since this line wasn't updated.
integration/test_timescaledb.py
Outdated
target = TimescaleDBTarget( | ||
dsn=dsn_url, | ||
table=table_name, | ||
time_col="time", | ||
columns=columns_config, | ||
time_format=time_format, | ||
max_events=10, | ||
) | ||
|
||
# Initialize the target | ||
await target._async_init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, I think it would be better to test within a graph rather than by calling the target's private methods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
No description provided.