import json
import tempfile
import time
from collections import OrderedDict
from typing import Dict, List, Optional, Tuple, Union
import numpy as np
import pandas as pd
import requests
from litellm import token_counter
from tqdm import tqdm
from notdiamond.exceptions import ApiError
from notdiamond.llms.client import NotDiamond
from notdiamond.llms.config import LLMConfig
from notdiamond.settings import NOTDIAMOND_API_KEY, NOTDIAMOND_API_URL, VERSION
from notdiamond.types import NDApiKeyValidator
[docs]
class CustomRouter:
"""
Implementation of CustomRouter class, used to train custom routers using custom datasets.
Attributes:
language (str): The language of the dataset in lowercase. Defaults to "english".
maximize (bool): Whether higher score is better. Defaults to true.
api_key (Optional[str], optional): The NotDiamond API key. If not specified, will try to
find it in the environment variable NOTDIAMOND_API_KEY.
"""
def __init__(
self,
language: str = "english",
maximize: bool = True,
api_key: Optional[str] = None,
):
if api_key is None:
api_key = NOTDIAMOND_API_KEY
NDApiKeyValidator(api_key=api_key)
self.api_key = api_key
self.language = language
self.maximize = maximize
def _request_train_router(
self,
prompt_column: str,
dataset_file: str,
llm_configs: List[LLMConfig],
preference_id: Optional[str],
nd_api_url: str,
) -> str:
url = f"{nd_api_url}/v2/pzn/trainCustomRouter"
files = {"dataset_file": open(dataset_file, "rb")}
payload = {
"language": self.language,
"llm_providers": json.dumps(
[provider.prepare_for_request() for provider in llm_configs]
),
"prompt_column": prompt_column,
"maximize": self.maximize,
"preference_id": preference_id,
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"User-Agent": f"Python-SDK/{VERSION}",
}
response = requests.post(
url=url, headers=headers, data=payload, files=files
)
if response.status_code != 200:
raise ApiError(
f"ND backend error status code: {response.status_code}, {response.text}"
)
preference_id = response.json()["preference_id"]
return preference_id
def _prepare_joint_dataset(
self,
dataset: Dict[Union[str, LLMConfig], pd.DataFrame],
prompt_column: str,
response_column: str,
score_column: str,
) -> Tuple[pd.DataFrame, List[LLMConfig]]:
a_provider = list(dataset.keys())[0]
prompts = dataset[a_provider].get(prompt_column, None)
if prompts is None:
raise ValueError(f"Prompt column {prompt_column} not found in df.")
prompts = prompts.to_list()
llm_configs = []
joint_dataset = {prompt_column: prompts}
for provider, df in dataset.items():
llm_configs.append(provider)
responses = df.get(response_column, None)
if responses is None:
raise ValueError(
f"Response column {response_column} not found in df."
)
responses = responses.to_list()
joint_dataset[f"{str(provider)}/response"] = responses
scores = df.get(score_column, None)
if scores is None:
raise ValueError(
f"Score column {score_column} not found in df."
)
scores = scores.to_list()
joint_dataset[f"{str(provider)}/score"] = scores
joint_df = pd.DataFrame(joint_dataset)
llm_configs = NotDiamond._parse_llm_configs_data(llm_configs)
return joint_df, llm_configs
[docs]
def fit(
self,
dataset: Dict[Union[str, LLMConfig], pd.DataFrame],
prompt_column: str,
response_column: str,
score_column: str,
preference_id: Optional[str] = None,
nd_api_url: Optional[str] = NOTDIAMOND_API_URL,
) -> str:
"""
Method to train a custom router using provided dataset.
Parameters:
dataset (Dict[str, pandas.DataFrame]): The dataset to train a custom router.
Each key in the dictionary should be in the form of <provider>/<model>.
prompt_column (str): The column name in each DataFrame corresponding
to the prompts used to evaluate the LLM.
response_column (str): The column name in each DataFrame corresponding
to the response given by the LLM for a given prompt.
score_column (str): The column name in each DataFrame corresponding
to the score given to the response from the LLM.
preference_id (Optional[str], optional): If specified, the custom router
associated with the preference_id will be updated with the provided dataset.
nd_api_url (Optional[str], optional): The URL of the NotDiamond API. Defaults to prod.
Raises:
ApiError: When the NotDiamond API fails
ValueError: When parsing the provided dataset fails
UnsupportedLLMProvider: When a provider specified in the dataset is not supported.
Returns:
str:
preference_id: the preference_id associated with the custom router.
Use this preference_id in your routing calls to use the custom router.
"""
joint_df, llm_configs = self._prepare_joint_dataset(
dataset, prompt_column, response_column, score_column
)
with tempfile.NamedTemporaryFile(suffix=".csv") as joint_csv:
joint_df.to_csv(joint_csv.name, index=False)
preference_id = self._request_train_router(
prompt_column,
joint_csv.name,
llm_configs,
preference_id,
nd_api_url,
)
return preference_id
def _get_latency(self, llm_config: LLMConfig, prompt: str) -> float:
llm = NotDiamond._llm_from_config(llm_config)
start_time = time.time()
_ = llm.invoke([("human", prompt)])
end_time = time.time()
return (end_time - start_time) * 1000 # ms
def _get_cost(
self, llm_config: LLMConfig, prompt: str, response: str
) -> float:
n_input_tokens = token_counter(model="gpt-4o", text=prompt)
n_output_tokens = token_counter(model="gpt-4o", text=response)
input_price = (
llm_config.default_input_price
if llm_config.input_price is None
else llm_config.input_price
)
output_price = (
llm_config.default_output_price
if llm_config.output_price is None
else llm_config.output_price
)
return (
n_input_tokens * input_price + n_output_tokens * output_price
) / 1e6
def _eval_custom_router(
self,
client: NotDiamond,
llm_configs: List[LLMConfig],
joint_df: pd.DataFrame,
prompt_column: str,
include_latency: bool,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
eval_results = OrderedDict()
eval_results[prompt_column] = []
eval_results["session_id"] = []
eval_results["notdiamond/score"] = []
eval_results["notdiamond/cost"] = []
eval_results["notdiamond/response"] = []
eval_results["notdiamond/recommended_provider"] = []
if include_latency:
eval_results["notdiamond/latency"] = []
for provider in llm_configs:
provider_score_column = (
f"{provider.provider}/{provider.model}/score"
)
eval_results[provider_score_column] = []
provider_response_column = (
f"{provider.provider}/{provider.model}/response"
)
eval_results[provider_response_column] = []
provider_cost_column = f"{provider.provider}/{provider.model}/cost"
eval_results[provider_cost_column] = []
if include_latency:
provider_latency_column = (
f"{provider.provider}/{provider.model}/latency"
)
eval_results[provider_latency_column] = []
for _, row in tqdm(joint_df.iterrows(), total=len(joint_df)):
prompt = row[prompt_column]
eval_results[prompt_column].append(prompt)
session_id, nd_provider = client.chat.completions.model_select(
messages=[{"role": "user", "content": prompt}], timeout=60
)
if nd_provider is None:
continue
eval_results["session_id"].append(session_id)
provider_matched = False
for provider in llm_configs:
provider_score = row[
f"{provider.provider}/{provider.model}/score"
]
eval_results[
f"{provider.provider}/{provider.model}/score"
].append(provider_score)
provider_response = row[
f"{provider.provider}/{provider.model}/response"
]
eval_results[
f"{provider.provider}/{provider.model}/response"
].append(provider_response)
provider_cost = self._get_cost(
provider, prompt, provider_response
)
eval_results[
f"{provider.provider}/{provider.model}/cost"
].append(provider_cost)
if include_latency:
provider_latency = self._get_latency(provider, prompt)
eval_results[
f"{provider.provider}/{provider.model}/latency"
].append(provider_latency)
if (
not provider_matched
and provider.provider == nd_provider.provider
and provider.model == nd_provider.model
):
provider_matched = True
eval_results["notdiamond/score"].append(provider_score)
eval_results["notdiamond/cost"].append(provider_cost)
eval_results["notdiamond/response"].append(
provider_response
)
eval_results["notdiamond/recommended_provider"].append(
f"{nd_provider.provider}/{nd_provider.model}"
)
if include_latency:
eval_results["notdiamond/latency"].append(
provider_latency
)
if not provider_matched:
raise ValueError(
f"""
Custom router returned {nd_provider.provider}/{nd_provider.model}
which is not in the set of models in the test dataset
"""
)
eval_results_df = pd.DataFrame(eval_results)
eval_stats = OrderedDict()
best_average_provider = None
best_average_score = -(2 * int(self.maximize) - 1) * np.inf
nd_average_score = eval_results_df["notdiamond/score"].mean()
eval_stats["Not Diamond Average Score"] = [nd_average_score]
nd_average_cost = eval_results_df["notdiamond/cost"].mean()
eval_stats["Not Diamond Average Cost"] = [nd_average_cost]
if include_latency:
nd_average_latency = eval_results_df["notdiamond/latency"].mean()
eval_stats["Not Diamond Average Latency"] = [nd_average_latency]
for provider in llm_configs:
provider_avg_score = eval_results_df[
f"{provider.provider}/{provider.model}/score"
].mean()
eval_stats[f"{provider.provider}/{provider.model}/avg_score"] = [
provider_avg_score
]
provider_avg_cost = eval_results_df[
f"{provider.provider}/{provider.model}/cost"
].mean()
eval_stats[f"{provider.provider}/{provider.model}/avg_cost"] = [
provider_avg_cost
]
if include_latency:
provider_avg_latency = eval_results_df[
f"{provider.provider}/{provider.model}/latency"
].mean()
eval_stats[
f"{provider.provider}/{provider.model}/avg_latency"
] = [provider_avg_latency]
if self.maximize:
if provider_avg_score > best_average_score:
best_average_score = provider_avg_score
best_average_cost = provider_avg_cost
best_average_provider = (
f"{provider.provider}/{provider.model}"
)
if include_latency:
best_average_latency = provider_avg_latency
else:
if provider_avg_score < best_average_score:
best_average_score = provider_avg_score
best_average_cost = provider_avg_cost
best_average_provider = (
f"{provider.provider}/{provider.model}"
)
if include_latency:
best_average_latency = provider_avg_latency
eval_stats["Best Average Provider"] = [best_average_provider]
eval_stats["Best Provider Average Score"] = [best_average_score]
eval_stats["Best Provider Average Cost"] = [best_average_cost]
if include_latency:
eval_stats["Best Provider Average Latency"] = [
best_average_latency
]
first_columns = [
"Best Average Provider",
"Best Provider Average Score",
"Best Provider Average Cost",
"Best Provider Average Latency",
"Not Diamond Average Score",
"Not Diamond Average Cost",
"Not Diamond Average Latency",
]
else:
first_columns = [
"Best Average Provider",
"Best Provider Average Score",
"Best Provider Average Cost",
"Not Diamond Average Score",
"Not Diamond Average Cost",
]
column_order = first_columns + [
col for col in eval_stats.keys() if col not in first_columns
]
ordered_eval_stats = OrderedDict()
for col in column_order:
ordered_eval_stats[col] = eval_stats[col]
eval_stats_df = pd.DataFrame(ordered_eval_stats)
return eval_results_df, eval_stats_df
[docs]
def eval(
self,
dataset: Dict[Union[str, LLMConfig], pd.DataFrame],
prompt_column: str,
response_column: str,
score_column: str,
preference_id: str,
include_latency: bool = False,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Method to evaluate a custom router using provided dataset.
Parameters:
dataset (Dict[str, pandas.DataFrame]): The dataset to train a custom router.
Each key in the dictionary should be in the form of <provider>/<model>.
prompt_column (str): The column name in each DataFrame corresponding
to the prompts used to evaluate the LLM.
response_column (str): The column name in each DataFrame corresponding
to the response given by the LLM for a given prompt.
score_column (str): The column name in each DataFrame corresponding
to the score given to the response from the LLM.
preference_id (str): The preference_id associated with the custom router
returned from .fit().
Raises:
ApiError: When the NotDiamond API fails
ValueError: When parsing the provided dataset fails
UnsupportedLLMProvider: When a provider specified in the dataset is not supported.
Returns:
Tuple[pandas.DataFrame, pandas.DataFrame]:
eval_results_df: A DataFrame containing all the prompts, responses of each provider
(indicated by column <provider>/<model>/response), scores of each provider
(indicated by column <provider>/<model>/score), and notdiamond custom router
response and score (indicated by column notdiamond/response and notdiamond/score).
eval_stats_df: A DataFrame containing the "Best Average Provider" computed from the
provided dataset, the "Best Provider Average Score" achieved by the "Best Average Provider",
and the "Not Diamond Average Score" achieved through custom router.
"""
joint_df, llm_configs = self._prepare_joint_dataset(
dataset, prompt_column, response_column, score_column
)
client = NotDiamond(
llm_configs=llm_configs,
api_key=self.api_key,
preference_id=preference_id,
)
eval_results_df, eval_stats_df = self._eval_custom_router(
client, llm_configs, joint_df, prompt_column, include_latency
)
return eval_results_df, eval_stats_df