Skip to content

Feature: improve relationship builders for better async and reduced memory utilization #2077

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ragas/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dev = [
"haystack-ai",
"sacrebleu",
"r2r",
"scipy",
]
test = [
"pytest",
Expand Down
113 changes: 83 additions & 30 deletions ragas/src/ragas/testset/transforms/relationship_builders/cosine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,59 +12,111 @@ class CosineSimilarityBuilder(RelationshipBuilder):
property_name: str = "embedding"
new_property_name: str = "cosine_similarity"
threshold: float = 0.9
block_size: int = 1024

def _find_similar_embedding_pairs(
self, embeddings: np.ndarray, threshold: float
) -> t.List[t.Tuple[int, int, float]]:
# Normalize the embeddings
normalized = embeddings / np.linalg.norm(embeddings, axis=1)[:, np.newaxis]
def _validate_embedding_shapes(self, embeddings: t.List[t.Any]):
if not embeddings:
raise ValueError(f"No nodes have a valid {self.property_name}")
first_len = len(embeddings[0])
for idx, emb in enumerate(embeddings):
if len(emb) != first_len:
raise ValueError(
f"Embedding at index {idx} has length {len(emb)}, expected {first_len}. "
"All embeddings must have the same length."
)

# Calculate cosine similarity matrix
similarity_matrix = np.dot(normalized, normalized.T)
# Find pairs with similarity >= threshold
similar_pairs = np.argwhere(similarity_matrix >= threshold)
def _block_cosine_similarity(self, i: np.ndarray, j: np.ndarray):
"""Calculate cosine similarity matrix between two sets of embeddings."""
i_norm = i / np.linalg.norm(i, axis=1, keepdims=True)
j_norm = j / np.linalg.norm(j, axis=1, keepdims=True)
return np.dot(i_norm, j_norm.T)

# Filter out self-comparisons and duplicate pairs
return [
(pair[0], pair[1], similarity_matrix[pair[0], pair[1]])
for pair in similar_pairs
if pair[0] < pair[1]
]
async def _find_similar_embedding_pairs(
self, embeddings: np.ndarray, threshold: float, block_size: int = 1024
) -> t.Set[t.Tuple[int, int, float]]:
"""Sharded computation of cosine similarity to find similar pairs."""

async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]:
if self.property_name is None:
self.property_name = "embedding"
def process_block(i: int, j: int) -> t.Set[t.Tuple[int, int, float]]:
end_i = min(i + block_size, n_embeddings)
end_j = min(j + block_size, n_embeddings)
block = self._block_cosine_similarity(
embeddings[i:end_i, :], embeddings[j:end_j, :]
)
similar_idx = np.argwhere(block >= threshold)
return {
(int(i + ii), int(j + jj), float(block[ii, jj]))
for ii, jj in similar_idx
if int(i + ii) < int(j + jj)
}

n_embeddings, _dimension = embeddings.shape
triplets = set()

for i in range(0, n_embeddings, block_size):
for j in range(i, n_embeddings, block_size):
triplets.update(process_block(i, j))

return triplets

async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]:
embeddings = []
for node in kg.nodes:
embedding = node.get_property(self.property_name)
if embedding is None:
raise ValueError(f"Node {node.id} has no {self.property_name}")
embeddings.append(embedding)

similar_pairs = self._find_similar_embedding_pairs(
np.array(embeddings), self.threshold
self._validate_embedding_shapes(embeddings)
similar_pairs = await self._find_similar_embedding_pairs(
np.array(embeddings), self.threshold, self.block_size
)

return [
Relationship(
source=kg.nodes[i],
target=kg.nodes[j],
type="cosine_similarity",
type=self.new_property_name,
properties={self.new_property_name: similarity_float},
bidirectional=True,
)
for i, j, similarity_float in similar_pairs
]

def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]:
"""
Generates a coroutine task for finding similar embedding pairs, which can be scheduled/executed by an Executor.
"""
embeddings = []
for node in kg.nodes:
embedding = node.get_property(self.property_name)
if embedding is None:
raise ValueError(f"Node {node.id} has no {self.property_name}")
embeddings.append(embedding)
self._validate_embedding_shapes(embeddings)

async def find_and_add_relationships():
similar_pairs = await self._find_similar_embedding_pairs(
np.array(embeddings), self.threshold, self.block_size
)
for i, j, similarity_float in similar_pairs:
rel = Relationship(
source=kg.nodes[i],
target=kg.nodes[j],
type=self.new_property_name,
properties={self.new_property_name: similarity_float},
bidirectional=True,
)
kg.relationships.append(rel)

return [find_and_add_relationships()]


@dataclass
class SummaryCosineSimilarityBuilder(CosineSimilarityBuilder):
property_name: str = "summary_embedding"
new_property_name: str = "summary_cosine_similarity"
threshold: float = 0.1
block_size: int = 1024

def filter(self, kg: KnowledgeGraph) -> KnowledgeGraph:
def _document_summary_filter(self, kg: KnowledgeGraph) -> KnowledgeGraph:
"""
Filters the knowledge graph to only include nodes with a summary embedding.
"""
Expand All @@ -78,21 +130,22 @@ def filter(self, kg: KnowledgeGraph) -> KnowledgeGraph:
return KnowledgeGraph(nodes=nodes)

async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]:
filtered_kg = self._document_summary_filter(kg)
embeddings = [
node.get_property(self.property_name)
for node in kg.nodes
for node in filtered_kg.nodes
if node.get_property(self.property_name) is not None
]
if not embeddings:
raise ValueError(f"No nodes have a valid {self.property_name}")
similar_pairs = self._find_similar_embedding_pairs(
np.array(embeddings), self.threshold
similar_pairs = await self._find_similar_embedding_pairs(
np.array(embeddings), self.threshold, self.block_size
)
return [
Relationship(
source=kg.nodes[i],
target=kg.nodes[j],
type="summary_cosine_similarity",
source=filtered_kg.nodes[i],
target=filtered_kg.nodes[j],
type=self.new_property_name,
properties={self.new_property_name: similarity_float},
bidirectional=True,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import typing as t
from collections import Counter
from dataclasses import dataclass
Expand All @@ -19,39 +20,62 @@ def _jaccard_similarity(self, set1: t.Set[str], set2: t.Set[str]) -> float:
union = len(set1.union(set2))
return intersection / union if union > 0 else 0.0

async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]:
if self.property_name is None:
self.property_name

similar_pairs = []
for i, node1 in enumerate(kg.nodes):
for j, node2 in enumerate(kg.nodes):
if i >= j:
continue
items1 = node1.get_property(self.property_name)
items2 = node2.get_property(self.property_name)
if items1 is None or items2 is None:
raise ValueError(
f"Node {node1.id} or {node2.id} has no {self.property_name}"
)
if self.key_name is not None:
items1 = items1.get(self.key_name, [])
items2 = items2.get(self.key_name, [])
similarity = self._jaccard_similarity(set(items1), set(items2))
if similarity >= self.threshold:
similar_pairs.append((i, j, similarity))
async def _find_similar_embedding_pairs(
self, kg: KnowledgeGraph
) -> t.Set[t.Tuple[int, int, float]]:
"""
Finds all node index pairs with Jaccard similarity above the threshold.
Returns a set of (i, j, similarity) tuples.
"""

similar_pairs = set()
for (i, node1), (j, node2) in itertools.combinations(enumerate(kg.nodes), 2):
items1 = node1.get_property(self.property_name)
items2 = node2.get_property(self.property_name)
if items1 is None or items2 is None:
raise ValueError(
f"Node {node1.id} or {node2.id} has no {self.property_name}"
)
if self.key_name is not None:
items1 = items1.get(self.key_name, [])
items2 = items2.get(self.key_name, [])
similarity = self._jaccard_similarity(set(items1), set(items2))
if similarity >= self.threshold:
similar_pairs.add((i, j, similarity))
return similar_pairs

async def transform(self, kg: KnowledgeGraph) -> t.List[Relationship]:
similar_pairs = await self._find_similar_embedding_pairs(kg)
return [
Relationship(
source=kg.nodes[i],
target=kg.nodes[j],
type="jaccard_similarity",
type=self.new_property_name,
properties={self.new_property_name: similarity_float},
bidirectional=True,
)
for i, j, similarity_float in similar_pairs
]

def generate_execution_plan(self, kg: KnowledgeGraph) -> t.List[t.Coroutine]:
"""
Generates a coroutine task for finding similar pairs, which can be scheduled/executed by an Executor.
"""

async def find_and_add_relationships():
similar_pairs = await self._find_similar_embedding_pairs(kg)
for i, j, similarity_float in similar_pairs:
rel = Relationship(
source=kg.nodes[i],
target=kg.nodes[j],
type=self.new_property_name,
properties={self.new_property_name: similarity_float},
bidirectional=True,
)
kg.relationships.append(rel)

return [find_and_add_relationships()]


@dataclass
class OverlapScoreBuilder(RelationshipBuilder):
Expand All @@ -65,6 +89,7 @@ class OverlapScoreBuilder(RelationshipBuilder):
def __post_init__(self):
try:
from rapidfuzz import distance

except ImportError:
raise ImportError(
"rapidfuzz is required for string distance. Please install it using `pip install rapidfuzz`"
Expand All @@ -78,13 +103,11 @@ def __post_init__(self):
}

def _overlap_score(self, overlaps: t.List[bool]) -> float:

return sum(overlaps) / len(overlaps) if len(overlaps) > 0 else 0.0

def _get_noisy_items(
self, nodes: t.List[Node], property_name: str, percent_cut_off: float = 0.05
) -> t.List[str]:

all_items = []
for node in nodes:
items = node.get_property(property_name)
Expand Down
Loading
Loading