Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,39 @@ e.g
}
```

#### Performance bust via watches_columns

If your system runs under high load and performs many SQL updates β€” for example, on many-to-many or related tables β€” that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch,
you can use the watched_columns parameter to specify which columns should trigger document updates.

This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index.

Imagine your table `author` has many columns and its often updating, but
you need only `name` for searching, so this approach can help you.

```json
{
"table": "book",
"columns": [
"isbn",
"title",
"description"
],
"children": [
{
"table": "author",
"columns": [
"name"
],
"watched_columns": [
"name"
]
}
]
}
```


PGSync addresses the following challenges:
- What if we update the author's name in the database?
- What if we wanted to add another author for an existing book?
Expand Down
27 changes: 26 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,32 @@ Example spec
}
]

### Environment variables
#### Watched columns in config.

If your system runs under high load and performs many SQL updates β€” for example, on many-to-many or related tables β€” that often don’t actually change any data, or if you have large tables that are frequently updated but you only need certain fields reflected in OpenSearch/Elasticsearch,
you can use the watched_columns parameter to specify which columns should trigger document updates.

This prevents unnecessary re-indexing and significantly reduces load on both the database and the search index.

.. code-block::
{
"database": "[database name]",
"index": "[Elasticsearch or OpenSearch index]",
"nodes": {
"table": "[table A]",
"schema": "[table A schema]",
"columns": [
"column 1 from table A",
"column 2 from table A",
],
"watched_columns": [
"column 1 from table A",
"column 2 from table A",
],



### Environment variables

Setup environment variables required for the application

Expand Down
2 changes: 2 additions & 0 deletions pgsync/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ def create_view(
schema: str,
tables: t.Set,
user_defined_fkey_tables: dict,
watched_columns_for_table: t.Dict[str, t.List[str]]
) -> None:
create_view(
self.engine,
Expand All @@ -786,6 +787,7 @@ def create_view(
tables,
user_defined_fkey_tables,
self._materialized_views(schema),
watched_columns_for_table,
)

def drop_view(self, schema: str) -> None:
Expand Down
2 changes: 2 additions & 0 deletions pgsync/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"schema",
"table",
"transform",
"watched_columns",
]

# Relationship attributes
Expand Down Expand Up @@ -198,6 +199,7 @@
"indices",
"primary_keys",
"table_name",
"watched_columns",
]

# Primary key delimiter
Expand Down
5 changes: 5 additions & 0 deletions pgsync/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class Node(object):
parent: t.Optional[Node] = None
base_tables: t.Optional[list] = None
is_through: bool = False
watched_columns: t.Optional[list] = None

def __post_init__(self):
self.model: sa.sql.Alias = self.models(self.table, self.schema)
Expand Down Expand Up @@ -284,6 +285,7 @@ class Tree(threading.local):

def __post_init__(self):
self.tables: t.Set[str] = set()
self.watched_columns_tables: t.Set[str] = set()
self.__nodes: t.Dict[Node] = {}
self.__schemas: t.Set[str] = set()
self.root: t.Optional[Node] = None
Expand Down Expand Up @@ -324,11 +326,14 @@ def build(self, nodes: dict) -> Node:
columns=nodes.get("columns", []),
relationship=nodes.get("relationship", {}),
base_tables=nodes.get("base_tables", []),
watched_columns=nodes.get("watched_columns", []),
)
if self.root is None:
self.root = node

self.tables.add(node.table)
if node.watched_columns:
self.watched_columns_tables.add(node.table)
for through_node in node.relationship.throughs:
through_node.is_through = True
self.tables.add(through_node.table)
Expand Down
30 changes: 28 additions & 2 deletions pgsync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def __init__(
self.query_builder: QueryBuilder = QueryBuilder(verbose=verbose)
self.count: dict = dict(xlog=0, db=0, redis=0)
self.tasks: t.List[asyncio.Task] = []
self.skipped_xmins: t.List[int] = []
self.lock_skipped_xmins: threading.Lock = threading.Lock()
self.lock: threading.Lock = threading.Lock()

@property
Expand Down Expand Up @@ -307,6 +309,7 @@ def setup(self, no_create: bool = False) -> None:
tables: t.Set = set()
# tables with user defined foreign keys
user_defined_fkey_tables: dict = {}
watched_columns_for_table: dict = {}

for node in self.tree.traverse_breadth_first():
if node.schema != schema:
Expand All @@ -333,6 +336,8 @@ def setup(self, no_create: bool = False) -> None:
if columns:
user_defined_fkey_tables.setdefault(node.table, set())
user_defined_fkey_tables[node.table] |= set(columns)

watched_columns_for_table[node.table] = node.watched_columns
if tables:
if if_not_exists or not self.view_exists(
MATERIALIZED_VIEW, schema
Expand All @@ -343,6 +348,7 @@ def setup(self, no_create: bool = False) -> None:
schema,
tables,
user_defined_fkey_tables,
watched_columns_for_table,
)

self.create_triggers(
Expand Down Expand Up @@ -1348,6 +1354,19 @@ async def async_poll_redis(self) -> None:
while True:
await self._async_poll_redis()

def _should_skip_update_due_to_watched_columns(self, payload: dict) -> bool:
"""
Returns True if this UPDATE payload should be skipped because none of the watched
columns changed; False otherwise.
"""
if payload.get("tg_op") != UPDATE:
return False

if payload["table"] not in self.tree.watched_columns_tables:
return False

return payload["old"] == payload["new"]

@threaded
@exception
def poll_db(self) -> None:
Expand Down Expand Up @@ -1404,7 +1423,12 @@ def poll_db(self) -> None:
and self.index in payload["indices"]
and payload["schema"] in self.tree.schemas
):
payloads.append(payload)
if self._should_skip_update_due_to_watched_columns(payload):
logger.info(f"Skipping payload due to no change: {payload['new']}")
with self.lock_skipped_xmins:
self.skipped_xmins.append(payload["xmin"])
else:
payloads.append(payload)
logger.debug(f"poll_db: {payload}")
with self.lock:
self.count["db"] += 1
Expand Down Expand Up @@ -1504,7 +1528,9 @@ def _on_publish(self, payloads: t.List[Payload]) -> None:
)
_payloads: list = []

txids: t.Set = set(map(lambda x: x.xmin, payloads))
with self.lock_skipped_xmins:
txids: t.Set = set(map(lambda x: x.xmin, payloads)) | set(self.skipped_xmins)
self.skipped_xmins = []
# for truncate, tg_op txids is None so skip setting the checkpoint
if txids != set([None]):
self.checkpoint: int = min(min(txids), self.txid_current) - 1
Expand Down
9 changes: 5 additions & 4 deletions pgsync/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
_indices TEXT [];
_primary_keys TEXT [];
_foreign_keys TEXT [];
_watched_columns TEXT [];

BEGIN
-- database is also the channel name.
Expand All @@ -40,23 +41,23 @@
ELSE
IF TG_OP <> 'TRUNCATE' THEN

SELECT primary_keys, foreign_keys, indices
INTO _primary_keys, _foreign_keys, _indices
SELECT primary_keys, foreign_keys, indices, watched_columns
INTO _primary_keys, _foreign_keys, _indices, _watched_columns
FROM {MATERIALIZED_VIEW}
WHERE table_name = TG_TABLE_NAME;

new_row = ROW_TO_JSON(NEW);
new_row := (
SELECT JSONB_OBJECT_AGG(key, value)
FROM JSON_EACH(new_row)
WHERE key = ANY(_primary_keys || _foreign_keys)
WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns))
);
IF TG_OP = 'UPDATE' THEN
old_row = ROW_TO_JSON(OLD);
old_row := (
SELECT JSONB_OBJECT_AGG(key, value)
FROM JSON_EACH(old_row)
WHERE key = ANY(_primary_keys || _foreign_keys)
WHERE key = ANY(_primary_keys || _foreign_keys || _watched_columns))
);
END IF;
xmin := NEW.xmin;
Expand Down
36 changes: 25 additions & 11 deletions pgsync/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ def create_view(
tables: t.Set,
user_defined_fkey_tables: dict,
views: t.List[str],
watched_columns_for_tables: t.Dict[str, t.List[str]],
) -> None:
"""
This module defines a function `create_view` that creates a view describing primary_keys and foreign_keys for each table
Expand Down Expand Up @@ -371,18 +372,18 @@ def create_view(
So if 'specie' was the only row before, and the next query returns
'unit' and 'structure', we want to end up with the result below.

table_name | primary_keys | foreign_keys | indices
------------+--------------+------------------+------------
specie | {id} | {id, user_id} | {foo, bar}
unit | {id} | {id, profile_id} | {foo, bar}
structure | {id} | {id} | {foo, bar}
unit | {id} | {id, profile_id} | {foo, bar}
structure | {id} | {id} | {foo, bar}
table_name | primary_keys | foreign_keys | indices | watched_columns
------------+--------------+------------------+------------+----------------
specie | {id} | {id, user_id} | {foo, bar} | {foo, bar}
unit | {id} | {id, profile_id} | {foo, bar} | {foo}
structure | {id} | {id} | {foo, bar} | {bar}
unit | {id} | {id, profile_id} | {foo, bar} | {foo}
structure | {id} | {id} | {foo, bar} | {bar}
"""

rows: dict = {}
if MATERIALIZED_VIEW in views:
for table_name, primary_keys, foreign_keys, indices in fetchall(
for table_name, primary_keys, foreign_keys, indices, watched_columns in fetchall(
sa.select("*").select_from(
sa.text(f"{schema}.{MATERIALIZED_VIEW}")
)
Expand All @@ -393,6 +394,7 @@ def create_view(
"primary_keys": set(),
"foreign_keys": set(),
"indices": set(),
"watched_columns": set(),
},
)
if primary_keys:
Expand All @@ -401,6 +403,8 @@ def create_view(
rows[table_name]["foreign_keys"] = set(foreign_keys)
if indices:
rows[table_name]["indices"] = set(indices)
if watched_columns:
rows[table_name]["watched_columns"] = set(watched_columns)
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT"
) as conn:
Expand All @@ -413,7 +417,7 @@ def create_view(
for table_name, columns in fetchall(_primary_keys(models, schema, tables)):
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()},
)
if columns:
rows[table_name]["primary_keys"] |= set(columns)
Expand All @@ -422,7 +426,7 @@ def create_view(
for table_name, columns in fetchall(_foreign_keys(models, schema, tables)):
rows.setdefault(
table_name,
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()},
)
if columns:
rows[table_name]["foreign_keys"] |= set(columns)
Expand All @@ -436,6 +440,7 @@ def create_view(
"primary_keys": set(),
"foreign_keys": set(),
"indices": set(),
"watched_columns": set(),
},
)
if columns:
Expand All @@ -445,15 +450,19 @@ def create_view(
if not rows:
rows.setdefault(
None,
{"primary_keys": set(), "foreign_keys": set(), "indices": set()},
{"primary_keys": set(), "foreign_keys": set(), "indices": set(), "watched_columns": set()},
)

for table_name, watched_columns in watched_columns_for_tables.items():
rows[table_name]["watched_columns"] = set(watched_columns)

statement = sa.select(
sa.sql.Values(
sa.column("table_name"),
sa.column("primary_keys"),
sa.column("foreign_keys"),
sa.column("indices"),
sa.column("watched_columns"),
)
.data(
[
Expand All @@ -474,6 +483,11 @@ def create_view(
if fields.get("indices")
else None
),
(
array(fields.get("watched_columns"))
if fields.get("watched_columns")
else None
)
)
for table_name, fields in rows.items()
]
Expand Down
1 change: 1 addition & 0 deletions tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@ def test_setup(self, mock_teardown, sync):
"public",
{"publisher", "book"},
{"publisher": {"publisher_id", "id"}},
{'book': [], 'publisher': []},
)
mock_create_function.assert_called_once_with("public")
mock_teardown.assert_called_once_with(drop_view=False)
Expand Down
Loading