paint-brush
OpenAI 的速率限制:LLM 评估指数退避指南经过@abram
2,150 讀數
2,150 讀數

OpenAI 的速率限制:LLM 评估指数退避指南

经过 Abram11m2024/01/29
Read on Terminal Reader

太長; 讀書

本文将教您如何使用任何 LLM 模型运行评估,而不会屈服于可怕的“OpenAI 速率限制”异常。我们将从以下开始: - 定义速率限制的含义并使用类比 - 了解 OpenAI 速率限制是什么 - 解释如何制定速率限制流程 - 解释代码实现
featured image - OpenAI 的速率限制:LLM 评估指数退避指南
Abram HackerNoon profile picture
0-item
1-item


介绍

本文将教您如何使用任何 LLM 模型运行评估,而不会屈服于可怕的“OpenAI 速率限制”异常。我们从以下开始:


  • 定义速率限制的含义并使用类比
  • 了解 OpenAI 速率限制是什么
  • 解释如何制定速率限制流程
  • 解释代码实现
  • 总结代码实现中使用的策略


什么是速率限制(以及类比解释)?

到目前为止,Cloudflare 的解释是我见过的最好的:速率限制是限制网络流量的策略。它限制了某人在特定时间范围内重复操作的频率 - 例如,尝试登录帐户。


简而言之,想象一下你是四个孩子的母亲,他们都喜欢蜂蜜。上次蜂蜜用完的时间比预期的要早。现在,您已经设置了一个计时器,数到一万,并让每个孩子轮流吃一些蜂蜜。计时器代表速率限制,因为它会强制执行特定的等待时间,然后才能获得更多蜂蜜。


解释了这个概念后,让我们了解 OpenAI 速率限制并讨论如何使用 Python 实现速率限制逻辑来管理 OpenAI 的 R/TPM(每分钟请求/令牌)。


了解 OpenAI 速率限制

OpenAI 对一分钟内对其 AI 模型发出的请求数量设置了一定的限制。对于 OpenAI 提供的每个 AI 模型,这些限制都不同。


对于免费版本:

  • 对于gpt-3.5-turbo模型,用户每天可以发出40,000个令牌请求或每分钟3个API调用。


对于 1 级:

  • 对于 gpt-3.5-turbo 模型,每分钟允许用户 60,000 个令牌请求或 3,500 个 API 调用。
  • 对于 gpt-4 模型,限制为每分钟 10,000 个令牌请求或 500 个 API 调用。
  • 对于 gpt-4-turbo-preview,用户每分钟可以发出 150,000 个令牌请求或 500 个 API 调用。
  • 对于 gpt-4-vision-preview,用户每分钟可以发出 10,000 个令牌请求和/或 500 个 API 调用。


有关其他层速率限制的更多信息,请参阅文档


这些限制的原因包括:

  1. 确保服务平稳且快速响应,因为人工智能模型执行的复杂任务需要大量资源。
  2. 管理所有用户的需求,并确保可用的基础设施(例如服务器和 GPU)可以处理请求而不会过载。
  3. 为使用量激增做好准备,并在高需求下保持高效运行。


预计这些限制在可预见的未来将保持一致。


解释速率限制过程

该过程(见下图)涉及使用户能够从 UI 运行 LLM 评估并为其 LLM 应用程序配置速率限制参数,而无需自己编写逻辑。


这是通过准备和调用批处理的函数来实现的。批处理中的每个调用都会调用run_with_retry函数,该函数又会使用重试机制调用最终函数 ( invoke_app )。


Agenta.ai 使用批量退避指数策略评估 LLM 评估的速率限制流程


我相信在看完上述过程后,您可以用您选择的任何语言编写代码逻辑。不管怎样,我会向你展示我是如何做到的。有关更多背景和背景,我主要在 Agenta 担任后端软件工程师。


Agenta是一个开源端到端 LLM 开发者平台,为您提供用于快速工程和管理、⚖️ 评估、人工注释和 🚀 部署的工具。所有这些都不会对您选择的框架、库或模型施加任何限制。 Agenta允许开发人员和产品团队在更短的时间内协作构建由 LLM 驱动的生产级应用程序。


我们希望让用户能够从 UI 配置其 LLM 评估速率限制配置,以便他们可以绕过其 LLM 提供商速率限制例外。

用于在 agenta.ai 上创建新评估的 ui(用户界面)


查看流程图,首先要实现的是准备和调用批处理(LLM 调用)的逻辑。验证速率限制配置并使用数据验证模型来定义 LLM 运行速率限制非常重要。下面的模型处理批量调用运行所需的rate_limit_config参数。


 from pydantic import BaseModel, Field class LLMRunRateLimit(BaseModel): batch_size: int = Field(default=10) max_retries: int = Field(default=3) retry_delay: int = Field(default=3) delay_between_batches: int = Field(default=5)


batch_invoke函数采用以下参数:

  • uri:您的 LLM 申请的 URL。
  • testset_data:您的LLM申请需要处理的测试集数据。
  • 参数:您的 LLM 申请的参数。
  • rate_limit_config:速率限制配置(如上面创建新评估的界面所示)。


 async def batch_invoke( uri: str, testset_data: List[Dict], parameters: Dict, rate_limit_config: Dict ) -> List[AppOutput]: """ Invokes the LLm app in batches, processing the testset data. Args: uri (str): The URI of the LLm app. testset_data (List[Dict]): The testset data to be processed. parameters (Dict): The parameters for the LLm app. rate_limit_config (Dict): The rate limit configuration. Returns: List[AppOutput]: The list of app outputs after running all batches. """ batch_size = rate_limit_config[ "batch_size" ] # Number of testset to make in each batch max_retries = rate_limit_config[ "max_retries" ] # Maximum number of times to retry the failed llm call retry_delay = rate_limit_config[ "retry_delay" ] # Delay before retrying the failed llm call (in seconds) delay_between_batches = rate_limit_config[ "delay_between_batches" ] # Delay between batches (in seconds) list_of_app_outputs: List[AppOutput] = [] # Outputs after running all batches openapi_parameters = await get_parameters_from_openapi(uri + "/openapi.json") async def run_batch(start_idx: int): print(f"Preparing {start_idx} batch...") end_idx = min(start_idx + batch_size, len(testset_data)) for index in range(start_idx, end_idx): try: batch_output: AppOutput = await run_with_retry( uri, testset_data[index], parameters, max_retries, retry_delay, openapi_parameters, ) list_of_app_outputs.append(batch_output) print(f"Adding outputs to batch {start_idx}") except Exception as exc: import traceback traceback.print_exc() print( f"Error processing batch[{start_idx}]:[{end_idx}] ==> {str(exc)}" ) # Schedule the next batch with a delay next_batch_start_idx = end_idx if next_batch_start_idx < len(testset_data): await asyncio.sleep(delay_between_batches) await run_batch(next_batch_start_idx) # Start the first batch await run_batch(0) return list_of_app_outputs


准备并调用批处理后,下一步涉及执行run_with_retry逻辑。此自定义实现包括速率限制功能并管理 llm 应用程序的调用,在达到设置的延迟后重试。指数退避是一种以指数增加的等待时间重试操作的技术,直到达到最大重试计数为止。


 async def run_with_retry( uri: str, input_data: Any, parameters: Dict, max_retry_count: int, retry_delay: int, openapi_parameters: List[Dict], ) -> AppOutput: """ Runs the specified app with retry mechanism. Args: uri (str): The URI of the app. input_data (Any): The input data for the app. parameters (Dict): The parameters for the app. max_retry_count (int): The maximum number of retries. retry_delay (int): The delay between retries in seconds. openapi_parameters (List[Dict]): The OpenAPI parameters for the app. Returns: AppOutput: The output of the app. """ retries = 0 last_exception = None while retries < max_retry_count: try: result = await invoke_app(uri, input_data, parameters, openapi_parameters) return result except (httpx.TimeoutException, httpx.ConnectTimeout, httpx.ConnectError) as e: last_exception = e print(f"Error in evaluation. Retrying in {retry_delay} seconds:", e) await asyncio.sleep(retry_delay) retries += 1 # If max retries reached, return the last exception return AppOutput(output=None, status=str(last_exception))


AppOutput的使用:即使在已用完最大重试次数后处理异常也很重要。这样,您可以允许您尝试处理的所有数据运行,然后您可以确定哪些数据失败了,哪些数据通过了。


最后一步是调用应用程序,使用 LLM 应用程序的openapi_parameters来确定如何使用单个数据点调用它。


make_payload 函数不应该让您担心。它根据其OpenAPI参数构造用于调用 LLM 应用程序的有效负载。


 async def invoke_app( uri: str, datapoint: Any, parameters: Dict, openapi_parameters: List[Dict] ) -> AppOutput: """ Invokes an app for one datapoint using the openapi_parameters to determine how to invoke the app. Args: uri (str): The URI of the app to invoke. datapoint (Any): The data to be sent to the app. parameters (Dict): The parameters required by the app taken from the db. openapi_parameters (List[Dict]): The OpenAPI parameters of the app. Returns: AppOutput: The output of the app. Raises: httpx.HTTPError: If the POST request fails. """ url = f"{uri}/generate" payload = await make_payload(datapoint, parameters, openapi_parameters) async with httpx.AsyncClient() as client: try: logger.debug(f"Invoking app {uri} with payload {payload}") response = await client.post( url, json=payload, timeout=httpx.Timeout(timeout=5, read=None, write=5) ) response.raise_for_status() llm_app_response = response.json() app_output = ( llm_app_response["message"] if isinstance(llm_app_response, dict) else llm_app_response ) return AppOutput(output=app_output, status="success") except: return AppOutput(output="Error", status="error")


整个过程就这样结束了。


概括

代码中的退避指数策略的工作原理如下:


  • 批处理: batch_invoke 函数将测试集数据拆分为具有可配置大小的较小批次。每个批次均按顺序处理。


  • 单独调用重试:在每个批次中,每个数据点都由run_with_retry函数处理。此函数尝试调用数据点的应用程序。如果由于特定网络错误(超时、连接问题)导致调用失败,该函数会延迟重试。此延迟最初设置为可配置值 ( retry_delay ),并针对同一批次中的每次后续重试尝试加倍。


此方法有助于避免失败后重复请求导致应用程序服务器过载。它为服务器提供了恢复时间,并允许在重试之前清除待处理请求的队列。


该策略还包括每个数据点的可配置最大重试次数,以防止无限循环。还包括批次之间的延迟 ( delay_between_batches ),以避免超出应用程序服务器设置的速率限制。


我希望这总结了您在今天的文章中学到的所有内容。请让我知道,如果你有任何问题!