O repositório implementa memória de chat para um pipeline RAG usando Redis, para que as mensagens anteriores sejam lembradas:
Isso é feito criando os componentes personalizados "Componente personalizado de recuperação de mensagem do Redis.json" e "Componente personalizado de recuperação de mensagem do Redis.json".
O código dos componentes personalizados pode ser encontrado na GUI do Langflow:
ou abaixo:
Componente de recuperação de mensagens Redis
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
import socket
import logging
# Configure logging
logging . basicConfig ( level = logging . INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
class RedisChatMemory ( LCChatMemoryComponent ):
display_name = "Redis Chat Memory"
description = "Retrieves chat messages from Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
]
def is_port_in_use ( self , port ):
with socket . socket ( socket . AF_INET , socket . SOCK_STREAM ) as s :
return s . connect_ex (( 'localhost' , port )) == 0
def start_redis_server ( self , port = 6379 , password = None ):
if self . is_port_in_use ( port ):
logging . error ( f"Port { port } is already in use. Please choose a different port." )
return None
cmd = [ 'redis-server' , '--port' , str ( port )]
if password :
cmd . extend ([ '--requirepass' , password ])
try :
redis_process = subprocess . Popen ( cmd )
time . sleep ( 2 ) # Wait for Redis server to start
# Check if the process is still running
if redis_process . poll () is None :
logging . info ( f"Redis server started successfully on port { port } " )
return redis_process
else :
logging . error ( f"Failed to start Redis server on port { port } " )
return None
except Exception as e :
logging . error ( f"Error starting Redis server: { e } " )
return None
def build_message_history ( self ) -> BaseChatMessageHistory :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
self . start_redis_server ( password = password )
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
return memory
Componente de armazenamento de mensagens Redis
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
from langflow . schema . message import Message
class RedisChatMemory ( Component ):
display_name = "Redis Chat Memory"
description = "Store chat messages to Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
MultilineInput (
name = "human_message_input" ,
display_name = "Human Messages"
),
MessageTextInput (
name = "ai_message_output" ,
display_name = "AI Message" ,
info = "RAG Generated answer" ,
),
]
outputs = [
Output ( display_name = "Message" , name = "message" , method = "build_message_history" ),
]
def build_message_history ( self ) -> Message :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
memory . add_user_message ( self . human_message_input )
memory . add_ai_message ( self . ai_message_output )
return Message ( text = self . ai_message_output )
Clonar o repositório
git clone https : // github . com / paulomuraroferreira / langflow_redis . git
Instale langflow e redis:
!p ip install langflow
!p ip install redis
No terminal, execute
langflow run
Carregue o json 'RAG_pipeline_with_Redis_chat_memory.json'
Copie os documentos PDF para a pasta pdf_documents ou altere o caminho no componente Document Loader:
Insira sua chave de API OpenAI em ambos os componentes de Embeddings,
no componente de modelos OpenAI,
Execute o pipeline de chunking executando o componente ChromaDB:
Entre no playground e faça perguntas:
Podemos ver o histórico de passagem para o LLM inspecionando o componente prompt: