Skip to content

[Bug] DSPY with openai Async dosent work #8039

Open
@Akshay1-6180

Description

@Akshay1-6180

What happened?

basically when running dspy with async it runs sequentially and its very slow

import asyncio
import time
import json
import dspy
from dspy import Predict, InputField, OutputField
import os
import pandas as pd
import pyarrow.parquet as pq
import pyarrow as pa
import logging


# Function to initialize logging with file output
def init_logging(log_file):
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(levelname)s - %(message)s",
        handlers=[logging.FileHandler(log_file), logging.StreamHandler()],
    )


# Configure DSPy with OpenAI backend
lm = dspy.LM(
    model="openai/gpt-4o",
    api_key=os.getenv("OPENAI_API_KEY"),
    cache=False,
    cache_in_memory=False,
)
dspy.configure(lm=lm)


# DSPy Signature Definition
class StructuredPatientConversation(dspy.Signature):
    discharge_summary = InputField(
        desc="Detailed discharge summary including patient history, diagnosis, treatment, and discharge instructions."
    )
    disease_category = OutputField(
        desc="List of relevant disease categories extracted from the discharge summary. Format: [disease1, disease2]"
    )
    consultation_transcript = OutputField(
        desc="""
        Structured synthetic doctor-patient conversation as a list of dictionaries:
        [
            {"role": "doctor", "content": "..."},
            {"role": "patient", "content": "..."}
        ]
        Ensure natural dialogue with diverse global names.
    """
    )


# Instantiate the model
conversation_generator = Predict(StructuredPatientConversation)


# Function to throttle requests based on rate limits and append incrementally to Parquet
async def throttle_api_requests(
    discharge_data,
    max_requests_per_minute,
    max_tokens_per_minute,
    batch_size=100,
    output_file="structured_responses.parquet",
    log_file="processing.log",
):
    init_logging(log_file)

    available_request_capacity = max_requests_per_minute
    available_token_capacity = max_tokens_per_minute
    last_update_time = time.time()

    responses = []

    for idx, sample in enumerate(discharge_data):
        id = sample["note_id"]
        summary = sample["text"]
        current_time = time.time()
        elapsed_time = current_time - last_update_time

        # Update available capacities
        available_request_capacity = min(
            available_request_capacity + (max_requests_per_minute * elapsed_time / 60),
            max_requests_per_minute,
        )
        available_token_capacity = min(
            available_token_capacity + (max_tokens_per_minute * elapsed_time / 60),
            max_tokens_per_minute,
        )

        estimated_tokens = len(summary.split())

        # Wait if not enough capacity
        while (
            available_request_capacity < 1
            or available_token_capacity < estimated_tokens
        ):
            logging.warning("Rate limit reached. Waiting to regain capacity.")
            await asyncio.sleep(1)
            current_time = time.time()
            elapsed_time = current_time - last_update_time

            available_request_capacity = min(
                available_request_capacity
                + (max_requests_per_minute * elapsed_time / 60),
                max_requests_per_minute,
            )
            available_token_capacity = min(
                available_token_capacity + (max_tokens_per_minute * elapsed_time / 60),
                max_tokens_per_minute,
            )
            last_update_time = current_time

        available_request_capacity -= 1
        available_token_capacity -= estimated_tokens
        last_update_time = current_time

        logging.info(f"Processing summary {idx+1}/{len(discharge_data)}")
        response = conversation_generator(discharge_summary=summary)
        responses.append(
            {
                "note_id": id,
                "discharge_summary": summary,
                "disease_category": response.disease_category,
                "consultation_transcript": response.consultation_transcript,
            }
        )

        if (idx + 1) % batch_size == 0 or idx == len(discharge_data) - 1:
            logging.info(f"Saving batch {(idx + 1) // batch_size}")
            table = pa.Table.from_pandas(pd.DataFrame(responses))
            if idx < batch_size:
                pq.write_table(table, output_file)
            else:
                existing_table = pq.read_table(output_file)
                combined_table = pa.concat_tables([existing_table, table])
                pq.write_table(combined_table, output_file)
            responses = []


# Example usage
if __name__ == "__main__":
    discharge_data = pd.read_csv(
        discharge.csv
    )
    data = discharge_data[["note_id", "text"]][:100].to_dict("records")
    MAX_REQUESTS_PER_MINUTE = 10000
    MAX_TOKENS_PER_MINUTE = 2000000

    asyncio.run(
        throttle_api_requests(
            data,
            MAX_REQUESTS_PER_MINUTE,
            MAX_TOKENS_PER_MINUTE,
            batch_size=10,
            log_file="processing.log",
        )
    )

Steps to reproduce

Its the sample code but the problem is even through its async it runs sequentially so if i see the logs i will see this and its very slow , which i feel is due to dspy the goal was to develop synthetic datasets

this 2025-04-01 07:16:52,093 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:16:52,096 - INFO -
LiteLLM completion() model= gpt-4o; provider = openai
2025-04-01 07:16:52,097 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:16:58,377 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-04-01 07:16:58,392 - INFO - Wrapper: Completed Call, calling success_handler
2025-04-01 07:16:58,393 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:16:58,394 - INFO - Processing summary 18/100
2025-04-01 07:16:58,395 - INFO -
LiteLLM completion() model= gpt-4o; provider = openai
2025-04-01 07:16:58,402 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:04,542 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-04-01 07:17:04,553 - INFO - Wrapper: Completed Call, calling success_handler
2025-04-01 07:17:04,553 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:04,554 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:04,555 - INFO - Processing summary 19/100
2025-04-01 07:17:04,556 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:04,558 - INFO -
LiteLLM completion() model= gpt-4o; provider = openai
2025-04-01 07:17:13,556 - INFO - HTTP Request: POST https://api.openai.com/v1/chat/completions "HTTP/1.1 200 OK"
2025-04-01 07:17:13,588 - INFO - Wrapper: Completed Call, calling success_handler
2025-04-01 07:17:13,588 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:13,589 - INFO - Processing summary 20/100
2025-04-01 07:17:13,590 - INFO - selected model name for cost calculation: openai/gpt-4o-2024-08-06
2025-04-01 07:17:13,592 - INFO -
LiteLLM completion() model= gpt-4o; provider = openai

DSPy version

2.6.16

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions