2026年Python调用ChatGPT API实现流式输出教程

Python调用ChatGPT API实现流式输出教程关键词 Python ChatGPT 流式输出 ChatGPT API stream openai 流式 你有没有注意到 ChatGPT 网页版的 打字机效果 回复不是一次性出现 而是一个字一个字地显示出来 这种效果背后用的就是流式输出 Streaming 技术 在实际应用中 流式输出有重要价值 用户体验 更好 用户无需等待完整响应

大家好,我是讯享网,很高兴认识大家。这里提供最前沿的Ai技术和互联网信息。



关键词:Python ChatGPT流式输出, ChatGPT API stream, openai流式

你有没有注意到ChatGPT网页版的“打字机效果”——回复不是一次性出现,而是一个字一个字地显示出来?这种效果背后用的就是流式输出(Streaming)技术。

在实际应用中,流式输出有重要价值:

  • 用户体验更好:用户无需等待完整响应,立即看到内容开始出现
  • 长文本不超时:生成长文章时,避免等待超时问题
  • 实时反馈:用户可以提前判断输出方向,必要时提前终止

本文将完整讲解如何在Python中实现ChatGPT API的流式输出,从基础到高级。


普通请求 vs 流式请求

普通请求(一次性返回):

 

用户发送请求 → 等待… → 一次性收到完整回复

流式请求(逐块返回):

 

用户发送请求 → 立即收到第一个chunk → 收到第二个chunk → … → 收到完整回复

底层实现上,流式输出使用的是HTTP的 Server-Sent Events (SSE) 协议,服务端持续推送数据块,直到生成完毕。


 

from openai import OpenAI client = OpenAI( api_key=“your-api-key”, base_url=“https://api.jiekou.ai/v1"; # 国内用户推荐 ) # 关键:设置 stream=True stream = client.chat.completions.create( model=”gpt-4o“, messages=[ {”role“: ”user“, ”content“: ”写一篇关于Python异步编程的介绍,大约300字“} ], stream=True # 开启流式输出 ) # 遍历数据块 for chunk in stream: content = chunk.choices[0].delta.content if content is not None: print(content, end=”“, flush=True) # flush=True确保立即输出 print() # 最后换行

关键点解析:

  • stream=True:开启流式模式
  • chunk.choices[0].delta.content:每个数据块中的文本片段
  • end=”“:不换行,内容连续拼接
  • flush=True:立即刷新输出缓冲区,确保实时显示

流式输出时,需要手动拼接所有chunk来获取完整内容:

 

from openai import OpenAI client = OpenAI( api_key=”your-api-key“, base_url=”https://api.jiekou.ai/v1"; ) def stream_and_collect(prompt: str, model: str = “gpt-4o”) -> str: “”“ 流式输出的同时收集完整响应 :return: 完整的响应文本 ”“” stream = client.chat.completions.create( model=model, messages=[{“role”: “user”, “content”: prompt}], stream=True ) full_content = [] print(“AI回复:”, end=“”, flush=True) for chunk in stream: delta = chunk.choices[0].delta # 提取文本内容 if delta.content is not None: print(delta.content, end=“”, flush=True) full_content.append(delta.content) # 检查结束原因 if chunk.choices[0].finish_reason == “stop”: print() # 换行 break return “”.join(full_content) # 使用示例 full_response = stream_and_collect(“解释一下Python的asyncio模块”) print(f“ — 完整响应(共{len(full_response)}字符)—”) print(f“前100字符:{full_response[:100]}…”)


除了文本内容,还可以获取其他元信息:

 

from openai import OpenAI client = OpenAI( api_key=“your-api-key”, base_url=“https://api.jiekou.ai/v1"; ) def stream_with_metadata(prompt: str): stream = client.chat.completions.create( model=”gpt-4o“, messages=[{”role“: ”user“, ”content“: prompt}], stream=True, stream_options={”include_usage“: True} # 在最后一个chunk包含token用量 ) full_content = [] finish_reason = None usage = None for chunk in stream: # 获取文本内容 if chunk.choices and chunk.choices[0].delta.content: content = chunk.choices[0].delta.content full_content.append(content) print(content, end=”“, flush=True) # 获取结束原因 if chunk.choices and chunk.choices[0].finish_reason: finish_reason = chunk.choices[0].finish_reason # 获取用量统计(最后一个chunk) if chunk.usage: usage = chunk.usage print() # 换行 print(f” — 统计信息 —“) print(f”结束原因: {finish_reason}“) if usage: print(f”输入tokens: {usage.prompt_tokens}“) print(f”输出tokens: {usage.completion_tokens}“) print(f”总tokens: {usage.total_tokens}“) return ”“.join(full_content) stream_with_metadata(”用3点总结Python的优缺点“)


在Web框架(FastAPI/Django等)中,通常需要使用异步流式输出:

 

import asyncio from openai import AsyncOpenAI async_client = AsyncOpenAI( api_key=”your-api-key“, base_url=”https://api.jiekou.ai/v1"; ) async def async_stream(prompt: str): “”“异步流式输出”“” stream = await async_client.chat.completions.create( model=“gpt-4o”, messages=[{“role”: “user”, “content”: prompt}], stream=True ) async for chunk in stream: if chunk.choices[0].delta.content: print(chunk.choices[0].delta.content, end=“”, flush=True) print() # 运行异步函数 asyncio.run(async_stream(“写一个Python异步爬虫的示例”))


构建一个支持流式输出的API服务:

 

from fastapi import FastAPI from fastapi.responses import StreamingResponse from openai import AsyncOpenAI import asyncio app = FastAPI() async_client = AsyncOpenAI( api_key=“your-api-key”, base_url=“https://api.jiekou.ai/v1"; ) @app.post(”/chat/stream“) async def stream_chat(user_message: str): ”“” 流式聊天接口 前端可以用 fetch + ReadableStream 接收 “”“ async def generate(): stream = await async_client.chat.completions.create( model=”gpt-4o“, messages=[{”role“: ”user“, ”content“: user_message}], stream=True ) async for chunk in stream: content = chunk.choices[0].delta.content if content: # SSE格式:data: {内容}

yield f”data: {content}

“ yield ”data: [DONE]

“ # 结束信号 return StreamingResponse( generate(), media_type=”text/event-stream“, headers={ ”Cache-Control“: ”no-cache“, ”Connection“: ”keep-alive“, ”X-Accel-Buffering“: ”no“ # 禁用Nginx缓冲 } ) # 启动:uvicorn main:app –reload

前端对应的JavaScript代码:

 

async function streamChat(message) = await reader.read(); if (done) break; const text = decoder.decode(value); const lines = text.split(‘ ’); for (const line of lines) } } }


 

import asyncio from openai import AsyncOpenAI async_client = AsyncOpenAI( api_key=”your-api-key“, base_url=”https://api.jiekou.ai/v1"; ) async def stream_with_timeout(prompt: str, timeout_seconds: int = 30): “”“ 带超时控制的流式输出 :param prompt: 用户输入 :param timeout_seconds: 超时时间(秒) ”“” try: async with asyncio.timeout(timeout_seconds): stream = await async_client.chat.completions.create( model=“gpt-4o”, messages=[{“role”: “user”, “content”: prompt}], stream=True ) chunks = [] async for chunk in stream: content = chunk.choices[0].delta.content if content: chunks.append(content) print(content, end=“”, flush=True) print() return “”.join(chunks) except asyncio.TimeoutError: print(f“ ⚠️ 超时!已超过{timeout_seconds}秒”) return None asyncio.run(stream_with_timeout(“写一部长篇小说…”))


允许用户提前中断生成:

 

from openai import OpenAI import threading client = OpenAI( api_key=“your-api-key”, base_url=“https://api.jiekou.ai/v1"; ) # 中断标志 should_stop = threading.Event() def stream_with_interrupt(prompt: str): ”“”支持中断的流式输出“”“ stream = client.chat.completions.create( model=”gpt-4o“, messages=[{”role“: ”user“, ”content“: prompt}], stream=True ) content = [] try: for chunk in stream: # 检查是否需要中断 if should_stop.is_set(): print(” [已中断]“) stream.close() # 关闭流连接 break if chunk.choices[0].delta.content: text = chunk.choices[0].delta.content print(text, end=”“, flush=True) content.append(text) except Exception as e: print(f” 错误: {e}“) return ”“.join(content) # 使用示例(在另一线程中设置中断信号) def user_input_listener(): input(”按回车键中断生成… “) should_stop.set() interrupt_thread = threading.Thread(target=user_input_listener, daemon=True) interrupt_thread.start() result = stream_with_interrupt(”写一篇非常长的文章…“)


流式输出对网络稳定性要求更高——连接中断意味着整个流需要重新开始。这也是推荐使用 jiekou.ai 的重要原因:

  • 国内直连:无代理转发,网络更稳定,流式输出更顺畅
  • 低延迟:减少每个chunk的传输延迟,打字机效果更流畅
  • 断线恢复:稳定的连接减少流式传输中断的概率
  • 多模型支持:同一接口,流式调用GPT-4o/Claude/Gemini
 

# 一行设置,解决所有网络问题 client = OpenAI( api_key=”your-jiekou-api-key“, base_url=”https://api.jiekou.ai/v1"; )


Q: 流式输出比普通输出贵吗? A: Token计费完全一样,不因为是流式输出而有额外费用。

Q: 流式输出时如何计算Token用量? A: 需要在请求中设置 stream_options={“include_usage”: True},最后一个chunk会包含用量信息。

Q: 流式输出中途断了怎么办? A: 需要从头重新请求。建议做好错误处理和断点续传(记录已生成内容,断线时提示用户重新生成)。

Q: 能同时进行多个流式请求吗? A: 可以,使用异步编程(asyncio)可以并发处理多个流式请求。


流式输出是AI应用必备的技术,本文覆盖了从基础到生产环境的所有场景:

场景

实现方式

命令行打字机效果

同步stream + print flush

收集完整响应

拼接所有chunk

Web API服务

FastAPI + StreamingResponse

高并发场景

AsyncOpenAI + asyncio

超时控制

asyncio.timeout

用户中断

threading.Event

将这些技术应用到你的项目中,配合 jiekou.ai 的稳定网络,打造丝滑的AI用户体验。

小讯
上一篇 2026-04-17 22:53
下一篇 2026-04-17 22:51

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/267967.html