How can I stream a response from LangChain‘s OpenAI using Flask API?
题意:怎样在 Flask API 中使用 LangChain 的 OpenAI 模型流式传输响应
问题背景:
I am using Python Flask app for chat over data. In the console I am getting streamable response directly from the OpenAI since I can enable streming with a flag streaming=True
.
我正在使用 Python Flask 应用程序进行数据聊天。在控制台中,我直接从 OpenAI 获取流式响应,因为我可以通过设置 `streaming=True` 来启用流式传输。
The problem is, that I can't "forward" the stream or "show" the stream than in my API call.
问题是,我无法在 API 调用中“转发”或“显示”这个流式响应。
Code for the processing OpenAI and chain is:
处理 OpenAI 和链的代码如下:
def askQuestion(self, collection_id, question):
collection_name = "collection-" + str(collection_id)
self.llm = ChatOpenAI(model_name=self.model_name, temperature=self.temperature, openai_api_key=os.environ.get('OPENAI_API_KEY'), streaming=True, callback_manager=CallbackManager([StreamingStdOutCallbackHandler()]))
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True, output_key='answer')
chroma_Vectorstore = Chroma(collection_name=collection_name, embedding_function=self.embeddingsOpenAi, client=self.chroma_client)
self.chain = ConversationalRetrievalChain.from_llm(self.llm, chroma_Vectorstore.as_retriever(similarity_search_with_score=True),
return_source_documents=True,verbose=VERBOSE,
memory=self.memory)
result = self.chain({"question": question})
res_dict = {
"answer": result["answer"],
}
res_dict["source_documents"] = []
for source in result["source_documents"]:
res_dict["source_documents"].append({
"page_content": source.page_content,
"metadata": source.metadata
})
return res_dict
and the API route code: 以及 API 路由的代码:
@app.route("/collection/<int:collection_id>/ask_question", methods=["POST"])
def ask_question(collection_id):
question = request.form["question"]
# response_generator = document_thread.askQuestion(collection_id, question)
# return jsonify(response_generator)
def stream(question):
completion = document_thread.askQuestion(collection_id, question)
for line in completion['answer']:
yield line
I am testing my endpoint with curl and I am passing flag -N
to curl, so I should get the streamable response, if it is possible.
我正在使用 curl 测试我的端点,并传递了 `-N` 标志,因此如果可能的话,我应该能得到流式响应。
When I make API call first the endpoint is waiting to process the data (I can see in my terminal in VS code the streamable answer) and when finished, I get everything displayed in one go.
当我发起 API 调用时,端点首先等待处理数据(我可以在 VS Code 的终端中看到流式的回答),处理完成后,所有内容一次性显示出来。
问题解决:
With the usage of threading
and callback
we can have a streaming response from flask API.
通过使用 `threading` 和 `callback`,我们可以在 Flask API 中实现流式响应。
In flask API, you may create a queue to register tokens through langchain's callback.
在 Flask API 中,可以创建一个队列,通过 LangChain 的回调函数来注册令牌。
class StreamingHandler(BaseCallbackHandler):
...
def on_llm_new_token(self, token: str, **kwargs) -> None:
self.queue.put(token)
You may get
tokens from the same queue in your flask route.
你可以在 Flask 路由中从同一个队列获取令牌。
from flask import Response, stream_with_context
import threading
@app.route(....):
def stream_output():
q = Queue()
def generate(rq: Queue):
...
# add your logic to prevent while loop
# to run indefinitely
while( ...):
yield rq.get()
callback_fn = StreamingHandler(q)
threading.Thread(target= askQuestion, args=(collection_id, question, callback_fn))
return Response(stream_with_context(generate(q))
In your langchain's ChatOpenAI
add the above custom callback StreamingHandler
.
在你的 LangChain 的 `ChatOpenAI` 中添加上述自定义回调 `StreamingHandler`。
self.llm = ChatOpenAI(
model_name=self.model_name,
temperature=self.temperature,
openai_api_key=os.environ.get('OPENAI_API_KEY'),
streaming=True,
callback=[callback_fn,]
)
For reference: 参考如下
- https://python.langchain.com/en/latest/modules/callbacks/getting_started.html#creating-a-custom-handler
- Streaming Contents — Flask Documentation (2.3.x)