Skip to content

feat(bedrock): Add S3 file upload and batch processing integration to llm_http_handler #13167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
f84e40e
feat(bedrock): Add S3 file upload handler for batch processing
colesmcintosh Jul 31, 2025
8730642
feat: add bedrock files handler tests
colesmcintosh Jul 31, 2025
e1e8e95
fix: remove cross-branch integration test
colesmcintosh Jul 31, 2025
04d3518
style: apply black formatting to bedrock files handler
colesmcintosh Jul 31, 2025
003aa4a
feat: add batch creation support to llm_http_handler
colesmcintosh Jul 31, 2025
8315554
feat: integrate Bedrock batch and file operations into llm_http_handler
colesmcintosh Jul 31, 2025
542f39d
refactor: remove old BedrockFilesHandler and integrate with llm_http_…
colesmcintosh Jul 31, 2025
03201cc
test: add Bedrock batch and file integration tests
colesmcintosh Jul 31, 2025
75acbf2
fix: resolve linting issues for too many statements and import errors
colesmcintosh Jul 31, 2025
faa431c
fix: remove unused uuid import from bedrock common_utils
colesmcintosh Jul 31, 2025
1bbc819
test: skip Bedrock batch retrieval test and remove unused import hand…
colesmcintosh Jul 31, 2025
53b88e6
fix: resolve mypy type errors in files and http handlers
colesmcintosh Jul 31, 2025
5e6e2e9
Merge branch 'BerriAI:main' into feat/bedrock-batch-and-files
colesmcintosh Jul 31, 2025
e56465d
Merge branch 'BerriAI:main' into feat/bedrock-batch-and-files
colesmcintosh Jul 31, 2025
7fc9b22
Merge branch 'BerriAI:main' into feat/bedrock-batch-and-files
colesmcintosh Aug 1, 2025
d720f47
Merge branch 'BerriAI:main' into feat/bedrock-batch-and-files
colesmcintosh Aug 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions litellm/files/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,90 @@
#################################################


def _create_bedrock_file_handler(
bedrock_litellm_params: dict,
create_file_request: CreateFileRequest,
extra_headers: Optional[Dict[str, str]],
optional_params: GenericLiteLLMParams,
logging_obj: LiteLLMLoggingObj,
is_async: bool,
client: Optional[Union[HTTPHandler, AsyncHTTPHandler]],
timeout: Optional[Union[float, httpx.Timeout]],
) -> Union[OpenAIFileObject, Coroutine[Any, Any, OpenAIFileObject]]:
"""Helper function to handle Bedrock file creation"""
from litellm.llms.base_llm.files.transformation import BaseFilesConfig

class BedrockFilesConfig(BaseFilesConfig):
@property
def custom_llm_provider(self) -> LlmProviders:
return LlmProviders.BEDROCK

def get_supported_openai_params(self, model: str):
return []

def map_openai_params(
self,
non_default_params: dict,
optional_params: dict,
model: str,
drop_params: bool,
) -> dict:
return {}

def validate_environment(
self,
headers: dict,
model: str,
messages: list,
optional_params: dict,
litellm_params: dict,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
) -> dict:
# Basic validation for AWS environment
return {}

def get_error_class(self, error_message: str, status_code: int, headers):
from litellm.llms.bedrock.common_utils import BedrockError
return BedrockError(message=error_message, status_code=status_code)

def transform_create_file_request(
self,
model: str,
create_file_data: CreateFileRequest,
optional_params: dict,
litellm_params: dict,
) -> Union[bytes, str, dict]:
# CreateFileRequest is a TypedDict, convert to dict
return dict(create_file_data)

def transform_create_file_response(
self, model, raw_response, logging_obj, litellm_params
):
return raw_response

provider_config = BedrockFilesConfig()
return base_llm_http_handler.create_file(
provider_config=provider_config,
litellm_params=bedrock_litellm_params,
create_file_data=create_file_request,
headers=extra_headers or {},
api_base=optional_params.api_base,
api_key=optional_params.api_key,
logging_obj=logging_obj,
_is_async=is_async,
client=client
if client is not None and isinstance(client, (HTTPHandler, AsyncHTTPHandler))
else None,
timeout=timeout,
)


@client
async def acreate_file(
file: FileTypes,
purpose: Literal["assistants", "batch", "fine-tune"],
custom_llm_provider: Literal["openai", "azure", "vertex_ai"] = "openai",
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock"] = "openai",
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
Expand Down Expand Up @@ -94,7 +173,9 @@ async def acreate_file(
def create_file(
file: FileTypes,
purpose: Literal["assistants", "batch", "fine-tune"],
custom_llm_provider: Optional[Literal["openai", "azure", "vertex_ai"]] = None,
custom_llm_provider: Optional[
Literal["openai", "azure", "vertex_ai", "bedrock"]
] = None,
extra_headers: Optional[Dict[str, str]] = None,
extra_body: Optional[Dict[str, str]] = None,
**kwargs,
Expand Down Expand Up @@ -250,9 +331,25 @@ def create_file(
max_retries=optional_params.max_retries,
create_file_data=_create_file_request,
)
elif custom_llm_provider == "bedrock":
from litellm.llms.bedrock.common_utils import prepare_bedrock_params

bedrock_litellm_params = prepare_bedrock_params(
optional_params, litellm_params_dict
)
response = _create_bedrock_file_handler(
bedrock_litellm_params=bedrock_litellm_params,
create_file_request=_create_file_request,
extra_headers=extra_headers,
optional_params=optional_params,
logging_obj=logging_obj,
is_async=_is_async,
client=client,
timeout=timeout,
)
else:
raise litellm.exceptions.BadRequestError(
message="LiteLLM doesn't support {} for 'create_file'. Only ['openai', 'azure', 'vertex_ai'] are supported.".format(
message="LiteLLM doesn't support {} for 'create_file'. Only ['openai', 'azure', 'vertex_ai', 'bedrock'] are supported.".format(
custom_llm_provider
),
model="n/a",
Expand Down
3 changes: 3 additions & 0 deletions litellm/llms/base_llm/batches/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .transformation import BaseBatchesConfig

__all__ = ["BaseBatchesConfig"]
93 changes: 93 additions & 0 deletions litellm/llms/base_llm/batches/transformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from abc import abstractmethod
from typing import TYPE_CHECKING, Any, List, Optional, Union

import httpx

from litellm.types.llms.openai import (
AllMessageValues,
CreateBatchRequest,
)
from litellm.types.utils import LlmProviders

from ..chat.transformation import BaseConfig

if TYPE_CHECKING:
from litellm.litellm_core_utils.litellm_logging import Logging as _LiteLLMLoggingObj
from litellm.router import Router as _Router
from openai.types import Batch

LiteLLMLoggingObj = _LiteLLMLoggingObj
Span = Any
Router = _Router
else:
LiteLLMLoggingObj = Any
Span = Any
Router = Any


class BaseBatchesConfig(BaseConfig):
@property
@abstractmethod
def custom_llm_provider(self) -> LlmProviders:
pass

@abstractmethod
def get_supported_openai_params(
self, model: str
) -> List[str]:
pass

def get_complete_batch_url(
self,
api_base: Optional[str],
api_key: Optional[str],
model: str,
optional_params: dict,
litellm_params: dict,
data: CreateBatchRequest,
) -> Optional[str]:
"""
Returns the complete URL for batch creation.
"""
return api_base

@abstractmethod
def transform_create_batch_request(
self,
model: str,
create_batch_data: CreateBatchRequest,
litellm_params: dict,
optional_params: dict,
) -> Union[bytes, str, dict]:
"""
Transform the create batch request to the provider's format.
"""
pass

@abstractmethod
def transform_create_batch_response(
self,
model: Optional[str],
raw_response: httpx.Response,
logging_obj: LiteLLMLoggingObj,
litellm_params: dict,
) -> "Batch":
"""
Transform the provider's response to OpenAI batch format.
"""
pass

def validate_environment(
self,
headers: dict,
model: str,
messages: List[AllMessageValues],
optional_params: dict,
litellm_params: dict,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
) -> dict:
"""
Validates the environment for batch creation.
"""
return headers
84 changes: 84 additions & 0 deletions litellm/llms/bedrock/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,3 +524,87 @@ def _parse_message_from_event(self, event) -> Optional[str]:
return None

return chunk.decode() # type: ignore[no-any-return]


# Helper functions for Bedrock file and batch operations
def extract_bedrock_file_content(create_file_data: dict) -> tuple[bytes, str]:
"""Extract file content and filename from create_file_data"""

file_obj = create_file_data["file"]
purpose = create_file_data.get("purpose", "batch")

# Handle different FileTypes formats
if isinstance(file_obj, tuple):
# Handle tuple formats: (filename, file), (filename, file, content_type), etc.
tuple_filename = file_obj[0] if file_obj[0] else f"file.{purpose}"
actual_file = file_obj[1]

if hasattr(actual_file, "read"):
file_content = actual_file.read()
filename = tuple_filename
else:
# Handle bytes directly
file_content = actual_file
filename = tuple_filename
else:
# Handle direct FileContent (IO[bytes], bytes, or PathLike)
if hasattr(file_obj, "read"):
file_content = file_obj.read()
filename = getattr(file_obj, "name", f"file.{purpose}")
else:
# Handle bytes directly
file_content = file_obj
filename = f"file.{purpose}"

return file_content, filename


def generate_bedrock_s3_key(filename: str, purpose: str) -> str:
"""Generate S3 key for Bedrock file upload"""
import uuid

file_uuid = str(uuid.uuid4())
if "." in filename:
ext = filename.split(".")[-1]
return f"{purpose}/{file_uuid}.{ext}"
else:
return f"{purpose}/{file_uuid}"


def prepare_bedrock_s3_upload(file_content: bytes, s3_key: str, bucket_name: str, region_name: str) -> tuple[str, dict]:
"""Prepare S3 upload URL and headers for Bedrock file"""
import hashlib

# Prepare S3 URL
url = f"https://{bucket_name}.s3.{region_name}.amazonaws.com/{s3_key}"

# Calculate content hash
content_hash = hashlib.sha256(file_content).hexdigest()

# Prepare headers
upload_headers = {
"Content-Type": "application/octet-stream",
"x-amz-content-sha256": content_hash,
"Content-Length": str(len(file_content)),
}

return url, upload_headers


def prepare_bedrock_params(optional_params, litellm_params_dict: dict) -> dict:
"""Helper function to prepare Bedrock-specific parameters"""
bedrock_litellm_params = litellm_params_dict.copy()
bedrock_litellm_params.update({
k: v for k, v in {
"aws_access_key_id": optional_params.get("aws_access_key_id"),
"aws_secret_access_key": optional_params.get("aws_secret_access_key"),
"aws_session_token": optional_params.get("aws_session_token"),
"aws_region_name": optional_params.get("aws_region_name"),
"aws_role_name": optional_params.get("aws_role_name"),
"aws_session_name": optional_params.get("aws_session_name"),
"aws_profile_name": optional_params.get("aws_profile_name"),
"aws_web_identity_token": optional_params.get("aws_web_identity_token"),
"aws_sts_endpoint": optional_params.get("aws_sts_endpoint"),
}.items() if v is not None
})
return bedrock_litellm_params
Loading
Loading