
理解与应对API限流(429错误)
在使用任何api进行大规模数据处理时,遭遇“429 too many requests”错误是常见挑战。这表明您在短时间内发送了过多的请求,超出了api提供商设定的速率限制。最初尝试通过随机延迟来规避限流是正确的方向,但如果延迟不足或策略不当,仍会触发限制。
为何会发生429错误?
API提供商设置速率限制是为了保护其服务免受滥用、确保公平使用并维持服务稳定性。当请求频率过高时,服务器会返回429状态码,拒绝进一步的请求。对于Google Bard这类基于大型语言模型的服务,计算资源消耗较大,因此限流尤为重要。
官方API的优势:Google Generative AI API (原PaLM API)
原问题中提到的是与“Google Bard”的非官方交互,这往往缺乏明确的API文档和限流指导。为了获得更稳定、可控的开发体验,强烈建议使用Google官方提供的Generative AI API(前身为PaLM API)。该API通过MakerSuite进行原型设计,并提供标准的SDK进行编程访问。
使用官方API的好处包括:
- 明确的文档和支持: 官方API通常有详细的文档,包含速率限制、错误码等信息。
- 稳定性与可靠性: 官方API经过优化,更适合生产环境使用。
- 模型一致性: 官方API(如models/text-bison-001)与Bard当前使用的模型保持一致,确保结果可预测。
限流策略:延迟与退避
当面对429错误时,仅仅移除延迟是不明智的。相反,我们需要更智能的延迟策略:
-
固定时间延迟: 在每次API请求之间插入一个固定的短时间延迟。根据经验,对于Google Generative AI API,1到5秒的延迟通常是一个良好的起点,可以显著减少被限流的几率。
import time # ... (API 调用代码) ... time.sleep(3) # 每次请求后暂停3秒
-
指数退避(Exponential Backoff): 这是更健壮的策略。当收到429错误时,不要立即重试,而是等待一段时间,然后重试。如果再次失败,则等待更长的时间,依此类推,呈指数级增长。这可以避免在API服务器过载时进一步加剧其负担。
import time import random def call_api_with_retry(api_call_function, max_retries=5, initial_delay=1): delay = initial_delay for i in range(max_retries): try: response = api_call_function() return response except Exception as e: if "429" in str(e): # 检查是否是429错误 print(f"收到429错误,第 {i+1} 次重试,等待 {delay} 秒...") time.sleep(delay + random.uniform(0, 1)) # 添加随机抖动,避免“惊群效应” delay *= 2 # 延迟翻倍 else: raise e # 其他错误直接抛出 raise Exception("API调用失败,超出最大重试次数")
数据持久化与错误恢复
在长时间运行的循环中调用API时,如果发生错误(如429限流或网络中断),已处理的数据可能会丢失。因此,实时保存进度至关重要。
实时保存已处理数据
最简单有效的方法是在每次成功获取API响应后,立即将结果写入文件。这样,即使程序中断,您也可以从上次保存的点恢复。
- 逐行写入: 对于文本数据,可以将每条响应作为一行写入文本文件。
- 追加模式: 使用文件追加模式('a')确保不会覆盖之前的数据。
- 结构化数据保存: 对于更复杂的响应,可以将其序列化为JSON格式,然后写入文件。
import json
def save_response(data, filename="bard_responses.jsonl"):
"""将单条响应以JSON Lines格式追加到文件"""
with open(filename, 'a', encoding='utf-8') as f:
f.write(json.dumps(data, ensure_ascii=False) + '\n')
# 示例:在循环中保存
# for i, sentence in enumerate(sentences_to_process):
# try:
# response = genai.generate_text(...) # 假设这是API调用
# if response.result:
# save_response({"id": i, "input": sentence, "output": response.result})
# time.sleep(random.uniform(1, 5)) # 加入随机延迟
# except Exception as e:
# print(f"处理第 {i} 条数据时发生错误: {e}")
# # 错误处理逻辑,可以记录错误并跳过,或触发重试
# break # 或者根据需要决定是否中断循环健壮的错误处理
结合try-except块来捕获API调用过程中可能出现的异常,并采取相应的恢复措施。
# 结合错误处理和数据保存的伪代码
# responses = [] # 如果想先收集到内存再统一保存,但不如实时保存健壮
# for i, item in enumerate(data_items):
# try:
# api_response = call_api_with_retry(lambda: genai.generate_text(prompt=item))
# if api_response and api_response.result:
# processed_data = {"input": item, "output": api_response.result}
# save_response(processed_data, "processed_results.jsonl")
# # responses.append(processed_data) # 如果需要内存中的副本
# time.sleep(random.uniform(1, 5)) # 每次请求后延迟
# except Exception as e:
# print(f"处理第 {i} 条数据时发生致命错误: {e}")
# print("程序中断,已处理数据已保存到 processed_results.jsonl")
# break # 遇到无法恢复的错误时中断循环使用Google Generative AI API进行开发
Google Generative AI API提供了一个统一的接口来访问其各种生成式AI模型。
环境搭建
首先,安装Python SDK:
pip install google-generativeai
然后,您需要一个API密钥。可以通过Google Cloud Console或MakerSuite获取。
核心代码示例
以下是一个使用google-generativeai库调用text-bison-001模型生成文本的完整示例,并融入了限流和数据持久化的概念:
import google.generativeai as genai
import time
import random
import json
import os
# 配置您的API密钥
# 建议从环境变量或安全配置中加载,避免硬编码
# genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
genai.configure(api_key="YOUR_API_KEY") # 请替换为您的实际API密钥
# 定义默认模型参数
defaults = {
'model': 'models/text-bison-001',
'temperature': 0.7, # 创造性程度,0-1
'candidate_count': 1, # 生成的候选响应数量
'top_k': 40,
'top_p': 0.95,
'max_output_tokens': 1024, # 最大输出长度
'stop_sequences': [], # 停止生成的序列
# 安全设置:根据您的应用场景调整
'safety_settings': [
{"category":"HARM_CATEGORY_DEROGATORY","threshold":"BLOCK_LOW_AND_ABOVE"},
{"category":"HARM_CATEGORY_TOXICITY","threshold":"BLOCK_LOW_AND_ABOVE"},
{"category":"HARM_CATEGORY_VIOLENCE","threshold":"BLOCK_MEDIUM_AND_ABOVE"},
{"category":"HARM_CATEGORY_SEXUAL","threshold":"BLOCK_MEDIUM_AND_ABOVE"},
{"category":"HARM_CATEGORY_MEDICAL","threshold":"BLOCK_MEDIUM_AND_ABOVE"},
{"category":"HARM_CATEGORY_DANGEROUS","threshold":"BLOCK_MEDIUM_AND_ABOVE"}
],
}
# 待处理的句子列表(模拟数据)
sentences_to_process = [
"写一首关于海底城堡的诗歌",
"描述一个在月球上发现的神秘生物",
"创作一个关于时间旅行者的短故事",
"解释量子纠缠的基本原理",
"写一个关于未来城市的科幻场景",
"给我一个关于友谊的励志名言",
"总结一下人工智能的最新发展",
"编写一个关于古代文明的谜语",
"创作一首关于星空的俳句",
"描述一场发生在遥远星球上的战争",
"写一首关于海底城堡的诗歌",
"描述一个在月球上发现的神秘生物",
"创作一个关于时间旅行者的短故事",
"解释量子纠缠的基本原理",
"写一个关于未来城市的科幻场景",
"给我一个关于友谊的励志名言",
"总结一下人工智能的最新发展",
"编写一个关于古代文明的谜语",
"创作一首关于星空的俳句",
"描述一场发生在遥远星球上的战争",
# ... 更多句子,模拟120次请求
]
output_filename = "generative_ai_responses.jsonl"
def generate_text_with_retry(prompt_text, retry_count=5, base_delay=1):
"""
带指数退避和随机抖动的API调用函数。
"""
current_delay = base_delay
for attempt in range(retry_count):
try:
response = genai.generate_text(
**defaults,
prompt=prompt_text
)
if response.result:
return response.result
else:
# API可能成功返回但结果为空,也视为失败
raise Exception("API返回空结果")
except Exception as e:
error_message = str(e)
if "429" in error_message or "Quota exceeded" in error_message:
print(f"尝试 {attempt + 1}/{retry_count}: 收到限流错误 ({error_message}),等待 {current_delay:.2f} 秒后重试...")
time.sleep(current_delay + random.uniform(0, 0.5)) # 添加随机抖动
current_delay *= 2 # 指数退避
else:
print(f"尝试 {attempt + 1}/{retry_count}: 发生其他错误 ({error_message}),等待 {current_delay:.2f} 秒后重试...")
time.sleep(current_delay + random.uniform(0, 0.5))
current_delay *= 2
raise Exception(f"多次重试后仍无法成功调用API: {prompt_text}")
# 检查是否有之前保存的进度,从上次中断处恢复
start_index = 0
if os.path.exists(output_filename):
with open(output_filename, 'r', encoding='utf-8') as f:
for line in f:
start_index += 1
print(f"从文件 {output_filename} 中恢复,已处理 {start_index} 条数据。")
print(f"开始处理,总共 {len(sentences_to_process)} 条数据,从第 {start_index} 条开始。")
for i in range(start_index, len(sentences_to_process)):
subject = sentences_to_process[i]
prompt = f"""{subject}""" # 简化prompt,直接使用句子作为主题
print(f"正在处理第 {i+1}/{len(sentences_to_process)} 条数据: '{subject[:30]}...'")
try:
generated_text = generate_text_with_retry(prompt)
# 实时保存结果
with open(output_filename, 'a', encoding='utf-8') as f:
json.dump({"id": i, "input_prompt": subject, "generated_output": generated_text}, f, ensure_ascii=False)
f.write('\n')
print(f"成功生成并保存第 {i+1} 条数据。")
# 每次成功请求后,添加一个随机延迟,以避免连续触发限流
time.sleep(random.uniform(1, 3)) # 1到3秒的随机延迟
except Exception as e:
print(f"处理第 {i+1} 条数据 '{subject[:30]}...' 时发生致命错误: {e}")
print(f"程序中断。已处理的数据已保存到 '{output_filename}'。")
break # 遇到致命错误,中断整个处理循环
print("所有数据处理完成或程序中断。")参数配置详解
- model: 指定要使用的模型,如models/text-bison-001。
- temperature: 控制生成文本的随机性或创造性。值越高,输出越随机;值越低,输出越集中和确定。
- candidate_count: 请求生成的候选响应数量。通常设置为1。
-
top_k / top_p: 采样策略参数,用于控制模型在生成文本时考虑的词汇范围。
- top_k: 模型在生成下一个词时,会从概率最高的k个词中进行采样。
- top_p: 模型会选择概率累积和达到p的最小词汇集进行采样。
- max_output_tokens: 生成文本的最大长度。
- stop_sequences: 一个字符串列表,当模型生成其中任何一个字符串时,就会停止生成。
- safety_settings: 用于过滤不安全内容的设置。您可以根据应用程序的需求调整阈值。
最佳实践与注意事项
- API密钥安全: 绝不将API密钥硬编码到公开的代码库中。使用环境变量、配置文件或秘密管理服务来存储和访问密钥。
- 监控API使用情况: Google Cloud Console通常提供API使用情况仪表板,您可以监控请求量、错误率等,以便更好地理解和管理限流。
- 保持更新: Google的AI服务发展迅速,API和模型可能会更新。定期查看官方文档以获取最新信息。
- 理解模型限制: 即使是强大的模型也有其局限性,例如事实性错误、偏见或无法理解复杂指令。设计您的应用程序时要考虑到这些。
- 分批处理与异步: 对于需要处理海量数据的场景,除了简单的time.sleep,还可以考虑更高级的分批处理(如果API支持)或异步编程(如asyncio)来提高效率,同时仍需注意限流。对于更复杂的任务调度和容错,Celery等工具也是可选方案。
总结
通过采用Google官方的Generative AI API,并结合智能的限流策略(如固定延迟和指数退避)以及健壮的数据持久化与错误处理机制,开发者可以构建出更加稳定、高效且具备恢复能力的AI应用。始终记住API密钥安全、关注官方文档,并根据实际需求调整您的处理策略,以确保您的AI集成项目顺利进行。










