Das Repository implementiert Chat-Speicher für eine RAG-Pipeline mithilfe von Redis, sodass vergangene Nachrichten gespeichert werden:
Dies erfolgt durch die Erstellung der benutzerdefinierten Komponenten „Redis Message Retrieval Custom Component.json“ und „Redis Message Retrieval Custom Component.json“.
Den Code für die benutzerdefinierten Komponenten finden Sie auf der Langflow-GUI:
oder unten:
Redis-Komponente zum Abrufen von Nachrichten
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
import socket
import logging
# Configure logging
logging . basicConfig ( level = logging . INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
class RedisChatMemory ( LCChatMemoryComponent ):
display_name = "Redis Chat Memory"
description = "Retrieves chat messages from Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
]
def is_port_in_use ( self , port ):
with socket . socket ( socket . AF_INET , socket . SOCK_STREAM ) as s :
return s . connect_ex (( 'localhost' , port )) == 0
def start_redis_server ( self , port = 6379 , password = None ):
if self . is_port_in_use ( port ):
logging . error ( f"Port { port } is already in use. Please choose a different port." )
return None
cmd = [ 'redis-server' , '--port' , str ( port )]
if password :
cmd . extend ([ '--requirepass' , password ])
try :
redis_process = subprocess . Popen ( cmd )
time . sleep ( 2 ) # Wait for Redis server to start
# Check if the process is still running
if redis_process . poll () is None :
logging . info ( f"Redis server started successfully on port { port } " )
return redis_process
else :
logging . error ( f"Failed to start Redis server on port { port } " )
return None
except Exception as e :
logging . error ( f"Error starting Redis server: { e } " )
return None
def build_message_history ( self ) -> BaseChatMessageHistory :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
self . start_redis_server ( password = password )
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
return memory
Redis-Nachrichtenspeicherkomponente
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
from langflow . schema . message import Message
class RedisChatMemory ( Component ):
display_name = "Redis Chat Memory"
description = "Store chat messages to Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
MultilineInput (
name = "human_message_input" ,
display_name = "Human Messages"
),
MessageTextInput (
name = "ai_message_output" ,
display_name = "AI Message" ,
info = "RAG Generated answer" ,
),
]
outputs = [
Output ( display_name = "Message" , name = "message" , method = "build_message_history" ),
]
def build_message_history ( self ) -> Message :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
memory . add_user_message ( self . human_message_input )
memory . add_ai_message ( self . ai_message_output )
return Message ( text = self . ai_message_output )
Klonen Sie das Repository
git clone https : // github . com / paulomuraroferreira / langflow_redis . git
Installieren Sie Langflow und Redis:
!p ip install langflow
!p ip install redis
Auf dem Terminal ausführen
langflow run
Laden Sie den JSON „RAG_pipeline_with_Redis_chat_memory.json“ hoch.
Kopieren Sie die PDF-Dokumente in den Ordner pdf_documents oder ändern Sie den Pfad in der Document Loader-Komponente:
Geben Sie Ihren OpenAI-API-Schlüssel für beide Embeddings-Komponenten ein.
auf der OpenAI-Modellkomponente,
Führen Sie die Chunking-Pipeline aus, indem Sie die ChromaDB-Komponente ausführen:
Betreten Sie den Spielplatz und stellen Sie Fragen:
Wir können den Verlauf der Übergabe an das LLM sehen, indem wir die Eingabeaufforderungskomponente untersuchen: