langflow_redis
1.0.0
该存储库使用 Redis 为 RAG 管道实现聊天记忆,以便记住过去的消息:
这是通过创建自定义组件“Redis 消息检索自定义组件.json”和“Redis 消息检索自定义组件.json”来完成的。
自定义组件的代码可以在 Langflow GUI 上找到:
或以下:
Redis消息检索组件
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
import socket
import logging
# Configure logging
logging . basicConfig ( level = logging . INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
class RedisChatMemory ( LCChatMemoryComponent ):
display_name = "Redis Chat Memory"
description = "Retrieves chat messages from Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
]
def is_port_in_use ( self , port ):
with socket . socket ( socket . AF_INET , socket . SOCK_STREAM ) as s :
return s . connect_ex (( 'localhost' , port )) == 0
def start_redis_server ( self , port = 6379 , password = None ):
if self . is_port_in_use ( port ):
logging . error ( f"Port { port } is already in use. Please choose a different port." )
return None
cmd = [ 'redis-server' , '--port' , str ( port )]
if password :
cmd . extend ([ '--requirepass' , password ])
try :
redis_process = subprocess . Popen ( cmd )
time . sleep ( 2 ) # Wait for Redis server to start
# Check if the process is still running
if redis_process . poll () is None :
logging . info ( f"Redis server started successfully on port { port } " )
return redis_process
else :
logging . error ( f"Failed to start Redis server on port { port } " )
return None
except Exception as e :
logging . error ( f"Error starting Redis server: { e } " )
return None
def build_message_history ( self ) -> BaseChatMessageHistory :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
self . start_redis_server ( password = password )
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
return memory
Redis消息存储组件
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
from langflow . schema . message import Message
class RedisChatMemory ( Component ):
display_name = "Redis Chat Memory"
description = "Store chat messages to Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
MultilineInput (
name = "human_message_input" ,
display_name = "Human Messages"
),
MessageTextInput (
name = "ai_message_output" ,
display_name = "AI Message" ,
info = "RAG Generated answer" ,
),
]
outputs = [
Output ( display_name = "Message" , name = "message" , method = "build_message_history" ),
]
def build_message_history ( self ) -> Message :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
memory . add_user_message ( self . human_message_input )
memory . add_ai_message ( self . ai_message_output )
return Message ( text = self . ai_message_output )
克隆存储库
git clone https : // github . com / paulomuraroferreira / langflow_redis . git
安装 langflow 和 redis:
!p ip install langflow
!p ip install redis
在终端上执行
langflow run
上传 json 'RAG_pipeline_with_Redis_chat_memory.json'
将 pdf 文档复制到 pdf_documents 文件夹,或更改文档加载器组件中的路径:
在两个嵌入组件上输入您的 OpenAI API 密钥,
在 OpenAI 模型组件上,
通过执行 ChromaDB 组件来运行分块管道:
进入游乐场提问:
我们可以通过检查提示组件来查看传递给 LLM 的历史记录: