Skip to content

v1 for support for embeddings in llmx #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions llmx/embeddings/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .oai_embedding import OpenAITextEmbedding
85 changes: 85 additions & 0 deletions llmx/embeddings/cohere_embedding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import cohere
from typing import Union, List
import numpy as np
import os
from .text_embedding import TextEmbedding
from ..utils import cache_request


class CohereTextEmbedding(TextEmbedding):
"""
Text embedding using Cohere models.

Attributes:
model (str): Name of the Cohere embedding model to use.
api_key (str): API key for accessing the Cohere service. Will use the
COHERE_API_KEY environment variable if not provided.
"""

def __init__(
self,
model: str = "embed-english-v2.0",
api_key: str = os.environ.get("COHERE_API_KEY", None),
):
super().__init__(model)

if api_key is None:
raise ValueError(
"Cohere API key is not set. Please set the COHERE_API_KEY environment variable."
)

self.client = cohere.Client(api_key)
self.model = model
self._size = self.get_embedding_size_from_model()

def get_embedding_size_from_model(self) -> int:
"""
Get the embedding size for the current model.

Returns:
int: Size of the embeddings returned by the model.
"""
size_map = {
"embed-english-v3.0": 1024,
"embed-multilingual-v3.0": 1024,
"embed-english-light-v3.0": 384,
"embed-multilingual-light-v3.0": 384,
"embed-english-v2.0": 4096,
"embed-english-light-v2.0": 1024,
"embed-multilingual-v2.0": 768,
}
size = size_map.get(self.model)
if not size:
raise ValueError("Invalid model name. Please provide a valid Cohere embed model.")
return size

def embed(
self, text: Union[str, List[str]], use_cache: bool = True, **kwargs
) -> np.ndarray:
"""
Compute embedding for the given text using the Cohere model.

Args:
text (Union[str, List[str]]): The input text(s) to compute embeddings for.
use_cache (bool, optional): Whether to use cached embeddings if available.
Defaults to True.
**kwargs: Additional keyword arguments.

Returns:
np.ndarray: The computed embeddings for the input text.
"""
# if text is numpy.ndarray, convert to list
if isinstance(text, np.ndarray):
text = text.tolist()

cache_params = dict(text=text, model=self.model, **kwargs)
if use_cache:
response = cache_request(cache=self.cache, params=cache_params)
if response:
return np.array(response).astype(np.float32)

co_response = self.client.embed(model=self.model, texts=text)
embeddings = co_response.embeddings

cache_request(cache=self.cache, params=cache_params, values=embeddings)
return np.array(embeddings).astype(np.float32)
51 changes: 51 additions & 0 deletions llmx/embeddings/oai_embedding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import os
import numpy as np
import openai
from typing import Union, List
from .text_embedding import TextEmbedding
from ..utils import cache_request


class OpenAITextEmbedding(TextEmbedding):
"""Text embedding using OpenAI models."""

def __init__(
self,
model: str = "text-embedding-ada-002",
api_base: str = None,
api_version: str = None,
api_type: str = None,
api_key: str = os.environ.get("OPENAI_API_KEY", None),
):
super().__init__(model)

if api_key is None:
raise ValueError(
"OpenAI API key is not set. Please set the OPENAI_API_KEY environment variable."
)
self.api_key = api_key
openai.api_key = self.api_key
openai.api_base = api_base or openai.api_base
openai.api_version = api_version or openai.api_version
openai.api_type = api_type or openai.api_type
self.model = model
self._size = 1536

def embed(
self, text: Union[str, List[str]], use_cache: bool = True, **kwargs
) -> np.ndarray:
"""Compute embedding for text."""
# if text is numpy.ndarray, convert to list
if isinstance(text, np.ndarray):
text = text.tolist()

cache_params = dict(text=text, model=self.model, **kwargs)
if use_cache:
response = cache_request(cache=self.cache, params=cache_params)
if response:
return np.array(response).astype(np.float32)
response = openai.Embedding.create(input=text, model=self.model)
embeddings = [x["embedding"] for x in response["data"]]

cache_request(cache=self.cache, params=cache_params, values=embeddings)
return np.array(embeddings).astype(np.float32)
42 changes: 42 additions & 0 deletions llmx/embeddings/text_embedding.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import numpy as np
from typing import Union, List
from diskcache import Cache
from abc import ABC, abstractmethod
from ..utils import get_user_cache_dir
from ..version import APP_NAME


class TextEmbedding(ABC):
"""Interface for computing Text Embeddings."""

def __init__(self, model_name: str, **kwargs):
"""
Initialize the text embedding model.

:param model_name: str, the name of the model
:param cache_dir: str, optional, the path to the directory where cache files are stored
"""
self.model_name = model_name
self._size = None

app_name = APP_NAME
cache_dir_default = get_user_cache_dir(app_name)
cache_dir_based_on_model = os.path.join(cache_dir_default, model_name)
self.cache_dir = kwargs.get("cache_dir", cache_dir_based_on_model)
self.cache = Cache(self.cache_dir)

@abstractmethod
def embed(self, text: Union[str, List[str]]) -> np.ndarray:
"""
Compute embedding for text.

:param text: Union[str, List[str]], the input text or a list of texts
:return: np.ndarray, the computed embedding(s)
"""
pass

@property
def size(self):
"""Return the embedding size."""
return self._size
122 changes: 106 additions & 16 deletions llmx/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dataclasses import asdict
import logging
import json
from typing import Any, Union, Dict
from typing import Any, List, Tuple, Union, Dict
import tiktoken
from diskcache import Cache
import hashlib
Expand All @@ -12,6 +12,10 @@
from google.oauth2 import service_account
import requests
import yaml
import numpy as np
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans

logger = logging.getLogger("llmx")

Expand Down Expand Up @@ -72,8 +76,10 @@ def get_user_cache_dir(app_name: str) -> str:
return cache_path


def get_gcp_credentials(service_account_key_file: str = None, scopes: list[str] = [
'https://www.googleapis.com/auth/cloud-platform']):
def get_gcp_credentials(
service_account_key_file: str = None,
scopes: list[str] = ["https://www.googleapis.com/auth/cloud-platform"],
):
try:
# Attempt to use Application Default Credentials
credentials, project_id = google.auth.default(scopes=scopes)
Expand All @@ -87,7 +93,8 @@ def get_gcp_credentials(service_account_key_file: str = None, scopes: list[str]
"Service account key file is not set. Please set the PALM_SERVICE_ACCOUNT_KEY_FILE environment variable."
)
credentials = service_account.Credentials.from_service_account_file(
service_account_key_file, scopes=scopes)
service_account_key_file, scopes=scopes
)
auth_req = google.auth.transport.requests.Request()
credentials.refresh(auth_req)
return credentials
Expand All @@ -102,7 +109,6 @@ def gcp_request(
request_timeout: int = 60,
**kwargs,
):

headers = headers or {}

if "key" not in url:
Expand All @@ -115,7 +121,12 @@ def gcp_request(
headers["Content-Type"] = "application/json"

response = requests.request(
method=method, url=url, json=body, headers=headers, timeout=request_timeout, **kwargs
method=method,
url=url,
json=body,
headers=headers,
timeout=request_timeout,
**kwargs,
)

if response.status_code not in range(200, 300):
Expand All @@ -135,35 +146,40 @@ def load_config():
config_path = os.environ.get("LLMX_CONFIG_PATH", None)
if config_path is None or os.path.exists(config_path) is False:
config_path = os.path.join(
os.path.dirname(__file__),
"configs/config.default.yml")
os.path.dirname(__file__), "configs/config.default.yml"
)
logger.info(
"Info: LLMX_CONFIG_PATH environment variable is not set to a valid config file. Using default config file at '%s'.",
config_path)
config_path,
)
if config_path is not None:
try:
with open(config_path, "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
logger.info(
"Loaded config from '%s'.",
config_path)
logger.info("Loaded config from '%s'.", config_path)
return config
except FileNotFoundError as file_not_found:
logger.info(
"Error: Config file not found at '%s'. Please check the LLMX_CONFIG_PATH environment variable. %s",
config_path,
str(file_not_found))
str(file_not_found),
)
except IOError as io_error:
logger.info(
"Error: Could not read the config file at '%s'. %s",
config_path, str(io_error))
config_path,
str(io_error),
)
except yaml.YAMLError as yaml_error:
logger.info(
"Error: Malformed YAML in config file at '%s'. %s",
config_path, str(yaml_error))
config_path,
str(yaml_error),
)
else:
logger.info(
"Info:LLMX_CONFIG_PATH environment variable is not set. Please set it to the path of your config file to setup your default model.")
"Info:LLMX_CONFIG_PATH environment variable is not set. Please set it to the path of your config file to setup your default model."
)
except Exception as error:
logger.info("Error: An unexpected error occurred: %s", str(error))

Expand All @@ -177,3 +193,77 @@ def get_models_maxtoken_dict(models_list):
details = model["model"]["parameters"]
models_dict[details["model"]] = model["max_tokens"]
return models_dict


def reduce_dimensions(
embeddings: np.ndarray, method: str = "pca", n_components: int = 2, **kwargs
) -> np.ndarray:
"""
Reduce the dimensionality of the embeddings using the specified method.

:param embeddings: The input embeddings as a NumPy array of shape (n_samples, n_features).
:param method: The dimensionality reduction method to use ('pca' or 'tsne').
:param n_components: The number of dimensions to reduce the embeddings to.
:param kwargs: Additional keyword arguments specific to the chosen method.
:return: The reduced embeddings as a NumPy array of shape (n_samples, n_components).
"""
if method.lower() == "pca":
reducer = PCA(n_components=n_components, **kwargs)
elif method.lower() == "tsne":
reducer = TSNE(n_components=n_components, **kwargs)
else:
raise ValueError(f"Unsupported dimensionality reduction method: {method}")

reduced_embeddings = reducer.fit_transform(embeddings)
return reduced_embeddings


def cluster_embeddings(
reduced_embeddings: np.ndarray,
method: str = "kmeans",
n_clusters: int = 3,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Cluster the reduced embeddings using the specified method and return the centroids.

:param reduced_embeddings: The reduced embeddings as a NumPy array of shape (n_samples, n_components).
:param method: The clustering method to use ('kmeans').
:param n_clusters: The number of clusters for KMeans clustering.
:param kwargs: Additional keyword arguments specific to the chosen method.
:return: A tuple containing the cluster labels (shape (n_samples,)) and the centroids (shape (n_clusters, n_components)).
"""
if method.lower() == "kmeans":
clusterer = KMeans(n_clusters=n_clusters, **kwargs)
clusters = clusterer.fit_predict(reduced_embeddings)
centroids = clusterer.cluster_centers_
else:
raise ValueError(f"Unsupported clustering method: {method}")

return clusters, centroids


def closest_samples_to_centroids(
embeddings: np.ndarray, clusters: np.ndarray, centroids: np.ndarray, n: int = 1
) -> List[List[int]]:
"""
Find the indices of n samples closest to the centroid of each cluster.

:param embeddings: The embeddings (reduced or not) as a NumPy array of shape (n_samples, n_components).
:param clusters: The cluster labels as a NumPy array of shape (n_samples,).
:param centroids: The centroids of the clusters as a NumPy array of shape (n_clusters, n_components).
:param n: The number of samples to find for each cluster.
:return: A list of lists containing the indices of the n closest samples for each cluster.
"""
cluster_samples = [[] for _ in range(centroids.shape[0])]

for i, cluster in enumerate(clusters):
cluster_samples[cluster].append(i)

closest_samples = []
for i, centroid in enumerate(centroids):
distances = np.linalg.norm(embeddings[cluster_samples[i]] - centroid, axis=1)
closest_indices = np.argsort(distances)[:n]
closest_samples.append([cluster_samples[i][index] for index in closest_indices])

return closest_samples
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
"google.auth",
"typer",
"pyyaml",
"scikit-learn"
]
optional-dependencies = {web = ["fastapi", "uvicorn"], transformers = ["transformers[torch]>=4.26"]}

Expand Down