langflow_redis
1.0.0
リポジトリは、過去のメッセージが記憶されるように、Redis を使用して RAG パイプラインのチャット メモリを実装します。
これは、カスタム コンポーネント「Redis メッセージ取得カスタムコンポーネント.json」および「Redis メッセージ取得カスタムコンポーネント.json」を作成することによって行われます。
カスタム コンポーネントのコードは、Langflow GUI にあります。
以下:
Redis メッセージ取得コンポーネント
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
import socket
import logging
# Configure logging
logging . basicConfig ( level = logging . INFO , format = '%(asctime)s - %(levelname)s - %(message)s' )
class RedisChatMemory ( LCChatMemoryComponent ):
display_name = "Redis Chat Memory"
description = "Retrieves chat messages from Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
]
def is_port_in_use ( self , port ):
with socket . socket ( socket . AF_INET , socket . SOCK_STREAM ) as s :
return s . connect_ex (( 'localhost' , port )) == 0
def start_redis_server ( self , port = 6379 , password = None ):
if self . is_port_in_use ( port ):
logging . error ( f"Port { port } is already in use. Please choose a different port." )
return None
cmd = [ 'redis-server' , '--port' , str ( port )]
if password :
cmd . extend ([ '--requirepass' , password ])
try :
redis_process = subprocess . Popen ( cmd )
time . sleep ( 2 ) # Wait for Redis server to start
# Check if the process is still running
if redis_process . poll () is None :
logging . info ( f"Redis server started successfully on port { port } " )
return redis_process
else :
logging . error ( f"Failed to start Redis server on port { port } " )
return None
except Exception as e :
logging . error ( f"Error starting Redis server: { e } " )
return None
def build_message_history ( self ) -> BaseChatMessageHistory :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
self . start_redis_server ( password = password )
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
return memory
Redis メッセージ ストレージ コンポーネント
from langflow . base . memory . model import LCChatMemoryComponent
from langflow . inputs import MessageTextInput , SecretStrInput , DictInput , MultilineInput
from langflow . io import MessageTextInput , Output
from langflow . field_typing import BaseChatMessageHistory
import subprocess
import time
from langchain_community . chat_message_histories import RedisChatMessageHistory
from langflow . schema . message import Message
class RedisChatMemory ( Component ):
display_name = "Redis Chat Memory"
description = "Store chat messages to Redis."
name = "RedisChatMemory"
icon = "RedisChatMemory"
inputs = [
MessageTextInput (
name = "RedisURL" ,
display_name = "Redis URL (Optional)" ,
info = "Redis URL" ,
required = False ,
),
SecretStrInput (
name = "password" ,
display_name = "Password (Optional)" ,
info = "User password for the database." ,
required = False ,
),
MessageTextInput (
name = "redis_session_id" ,
display_name = "Redis Session ID (Optional)" ,
info = "Redis Session ID." ,
advanced = False ,
required = False
),
DictInput (
name = "cluster_kwargs" ,
display_name = "Cluster arguments" ,
info = "Optional dictionary of additional keyword arguments for the Redis cluster." ,
advanced = True ,
is_list = True ,
),
MultilineInput (
name = "human_message_input" ,
display_name = "Human Messages"
),
MessageTextInput (
name = "ai_message_output" ,
display_name = "AI Message" ,
info = "RAG Generated answer" ,
),
]
outputs = [
Output ( display_name = "Message" , name = "message" , method = "build_message_history" ),
]
def build_message_history ( self ) -> Message :
session_id = self . redis_session_id or 'default_redis_user_session'
if self . RedisURL :
url = self . RedisURL
else :
port = 6379
password = self . password or 'admin'
url = f"redis://default: { password } @localhost: { port } "
memory = RedisChatMessageHistory ( session_id = session_id , url = url )
memory . add_user_message ( self . human_message_input )
memory . add_ai_message ( self . ai_message_output )
return Message ( text = self . ai_message_output )
リポジトリのクローンを作成する
git clone https : // github . com / paulomuraroferreira / langflow_redis . git
langflow と redis をインストールします。
!p ip install langflow
!p ip install redis
ターミナル上で実行します
langflow run
json「RAG_pipeline_with_Redis_chat_memory.json」をアップロードします。
PDF ドキュメントを pdf_documents フォルダーにコピーするか、ドキュメント ローダー コンポーネントのパスを変更します。
両方の Embeddings コンポーネントに OpenAI API キーを入力します。
OpenAI モデル コンポーネント上で、
ChromaDB コンポーネントを実行して、チャンク パイプラインを実行します。
遊び場に入って質問してください。
プロンプト コンポーネントを調べることで、LLM に渡された履歴を確認できます。