rejoignez-nous sur ? (Twitter), Discorde et WeChat
Installer à partir des sources :
git clone https://github.com/InternLM/lagent.git
cd lagent
pip install -e .
Lagent s'inspire de la philosophie de conception de PyTorch. Nous espérons que l'analogie avec les couches de réseau neuronal rendra le flux de travail plus clair et plus intuitif, de sorte que les utilisateurs n'auront qu'à se concentrer sur la création de couches et la définition des messages transmis entre elles de manière pythonique. Il s'agit d'un didacticiel simple pour vous permettre de démarrer rapidement dans la création d'applications multi-agents.
Les agents utilisent AgentMessage
pour la communication.
from typing import Dict , List
from lagent . agents import Agent
from lagent . schema import AgentMessage
from lagent . llms import VllmModel , INTERNLM2_META
llm = VllmModel (
path = 'Qwen/Qwen2-7B-Instruct' ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ '<|im_end|>' ],
max_new_tokens = 1024 ,
)
system_prompt = '你的回答只能从“典”、“孝”、“急”三个字中选一个。'
agent = Agent ( llm , system_prompt )
user_msg = AgentMessage ( sender = 'user' , content = '今天天气情况' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='急' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Les messages d'entrée et de sortie seront ajoutés à la mémoire de Agent
à chaque passage direct. Ceci est effectué dans __call__
plutôt que forward
. Voir le pseudo-code suivant
def __call__ ( self , * message ):
message = pre_hooks ( message )
add_memory ( message )
message = self . forward ( * message )
add_memory ( message )
message = post_hooks ( message )
return message
Inspectez la mémoire de deux manières
memory : List [ AgentMessage ] = agent . memory . get_memory ()
print ( memory )
print ( '-' * 120 )
dumped_memory : Dict [ str , List [ dict ]] = agent . state_dict ()
print ( dumped_memory [ 'memory' ])
[AgentMessage(content='今天天气情况', sender='user', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>), AgentMessage(content='急', sender='Agent', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>)]
------------------------------------------------------------------------------------------------------------------------
[{'content': '今天天气情况', 'sender': 'user', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}, {'content': '急', 'sender': 'Agent', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}]
Effacer la mémoire de cette session ( session_id=0
par défaut) :
agent . reset ()
DefaultAggregator
est appelé sous le capot pour assembler et convertir AgentMessage
au format de message OpenAI.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
formatted_messages = self . aggregator . aggregate (
self . memory . get ( session_id ),
self . name ,
self . output_format ,
self . template ,
)
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
...
Implémenter un agrégateur simple pouvant recevoir quelques clichés
from typing import List , Union
from lagent . memory import Memory
from lagent . prompts import StrParser
from lagent . agents . aggregator import DefaultAggregator
class FewshotAggregator ( DefaultAggregator ):
def __init__ ( self , few_shot : List [ dict ] = None ):
self . few_shot = few_shot or []
def aggregate ( self ,
messages : Memory ,
name : str ,
parser : StrParser = None ,
system_instruction : Union [ str , dict , List [ dict ]] = None ) -> List [ dict ]:
_message = []
if system_instruction :
_message . extend (
self . aggregate_system_intruction ( system_instruction ))
_message . extend ( self . few_shot )
messages = messages . get_memory ()
for message in messages :
if message . sender == name :
_message . append (
dict ( role = 'assistant' , content = str ( message . content )))
else :
user_message = message . content
if len ( _message ) > 0 and _message [ - 1 ][ 'role' ] == 'user' :
_message [ - 1 ][ 'content' ] += user_message
else :
_message . append ( dict ( role = 'user' , content = user_message ))
return _message
agent = Agent (
llm ,
aggregator = FewshotAggregator (
[
{ "role" : "user" , "content" : "今天天气" },
{ "role" : "assistant" , "content" : "【晴】" },
]
)
)
user_msg = AgentMessage ( sender = 'user' , content = '昨天天气' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='【多云转晴,夜间有轻微降温】' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Dans AgentMessage
, formatted
est réservé pour stocker les informations analysées par output_format
à partir de la sortie du modèle.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
...
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
if self . output_format :
formatted_messages = self . output_format . parse_response ( llm_response )
return AgentMessage (
sender = self . name ,
content = llm_response ,
formatted = formatted_messages ,
)
...
Utilisez un analyseur d'outils comme suit
from lagent . prompts . parsers import ToolParser
system_prompt = "逐步分析并编写Python代码解决以下问题。"
parser = ToolParser ( tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n ' )
llm . gen_params [ 'stop_words' ]. append ( ' n ``` n ' )
agent = Agent ( llm , system_prompt , output_format = parser )
user_msg = AgentMessage (
sender = 'user' ,
content = 'Marie is thinking of a multiple of 63, while Jay is thinking of a '
'factor of 63. They happen to be thinking of the same number. There are '
'two possibilities for the number that each of them is thinking of, one '
'positive and one negative. Find the product of these two numbers.' )
bot_msg = agent ( user_msg )
print ( bot_msg . model_dump_json ( indent = 4 ))
{
"content": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn```pythonndef find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"sender": "Agent",
"formatted": {
"tool_type": "code interpreter",
"thought": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn",
"action": "def find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"status": 1
},
"extra_info": null,
"type": null,
"receiver": null,
"stream_state": 0
}
ActionExecutor
utilise la même structure de données de communication que Agent
, mais nécessite que le contenu de l'entrée AgentMessage
soit un dict contenant :
name
: nom de l'outil, par exemple 'IPythonInterpreter'
, 'WebBrowser.search'
.parameters
: arguments de mots-clés de l'API de l'outil, par exemple {'command': 'import math;math.sqrt(2)'}
, {'query': ['recent progress in AI']}
.Vous pouvez enregistrer des hooks personnalisés pour la conversion des messages.
from lagent . hooks import Hook
from lagent . schema import ActionReturn , ActionStatusCode , AgentMessage
from lagent . actions import ActionExecutor , IPythonInteractive
class CodeProcessor ( Hook ):
def before_action ( self , executor , message , session_id ):
message = message . copy ( deep = True )
message . content = dict (
name = 'IPythonInteractive' , parameters = { 'command' : message . formatted [ 'action' ]}
)
return message
def after_action ( self , executor , message , session_id ):
action_return = message . content
if isinstance ( action_return , ActionReturn ):
if action_return . state == ActionStatusCode . SUCCESS :
response = action_return . format_result ()
else :
response = action_return . errmsg
else :
response = action_return
message . content = response
return message
executor = ActionExecutor ( actions = [ IPythonInteractive ()], hooks = [ CodeProcessor ()])
bot_msg = AgentMessage (
sender = 'Agent' ,
content = '首先,我们需要...' ,
formatted = {
'tool_type' : 'code interpreter' ,
'thought' : '首先,我们需要...' ,
'action' : 'def find_numbers(): n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63] n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63] n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors] n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors] n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0] n n return product n n result = find_numbers() n print(result)' ,
'status' : 1
})
executor_msg = executor ( bot_msg )
print ( executor_msg )
content='3969.0' sender='ActionExecutor' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Pour plus de commodité, Lagent fournit InternLMActionProcessor
qui est adapté aux messages formatés par ToolParser
comme mentionné ci-dessus.
Lagent adopte une conception à double interface, où presque tous les composants (LLM, actions, exécuteurs d'actions...) ont la variante asynchrone correspondante en préfixant son identifiant par « Async ». Il est recommandé d'utiliser des agents synchrones pour le débogage et des agents asynchrones pour l'inférence à grande échelle afin de tirer le meilleur parti des ressources CPU et GPU inutilisées.
Cependant, assurez-vous de la cohérence interne des agents, c'est-à-dire que les agents asynchrones doivent être équipés de LLM asynchrones et d'exécuteurs d'actions asynchrones qui pilotent des outils asynchrones.
from lagent . llms import VllmModel , AsyncVllmModel , LMDeployPipeline , AsyncLMDeployPipeline
from lagent . actions import ActionExecutor , AsyncActionExecutor , WebBrowser , AsyncWebBrowser
from lagent . agents import Agent , AsyncAgent , AgentForInternLM , AsyncAgentForInternLM
forward
au lieu de __call__
des sous-classes, sauf si nécessaire.session_id
, qui est conçu pour l'isolation de la mémoire, les requêtes LLM et l'invocation d'outils (par exemple, maintenir plusieurs environnements IPython indépendants) en simultanéité.Agents mathématiques qui résolvent des problèmes par programmation
from lagent . agents . aggregator import InternLMToolAggregator
class Coder ( Agent ):
def __init__ ( self , model_path , system_prompt , max_turn = 3 ):
super (). __init__ ()
llm = VllmModel (
path = model_path ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ ' n ``` n ' , '<|im_end|>' ],
max_new_tokens = 1024 ,
)
self . agent = Agent (
llm ,
system_prompt ,
output_format = ToolParser (
tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n '
),
# `InternLMToolAggregator` is adapted to `ToolParser` for aggregating
# messages with tool invocations and execution results
aggregator = InternLMToolAggregator (),
)
self . executor = ActionExecutor ([ IPythonInteractive ()], hooks = [ CodeProcessor ()])
self . max_turn = max_turn
def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = self . agent ( message , session_id = session_id )
if message . formatted [ 'tool_type' ] is None :
return message
message = self . executor ( message , session_id = session_id )
return message
coder = Coder ( 'Qwen/Qwen2-7B-Instruct' , 'Solve the problem step by step with assistance of Python code' )
query = AgentMessage (
sender = 'user' ,
content = 'Find the projection of $ \ mathbf{a}$ onto $ \ mathbf{b} = '
' \ begin{pmatrix} 1 \ \ -3 \ end{pmatrix}$ if $ \ mathbf{a} \ cdot \ mathbf{b} = 2.$'
)
answer = coder ( query )
print ( answer . content )
print ( '-' * 120 )
for msg in coder . state_dict ()[ 'agent.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Agents de blogs asynchrones qui améliorent la qualité d'écriture par auto-raffinement (exemple AutoGen original)
import asyncio
import os
from lagent . llms import AsyncGPTAPI
from lagent . agents import AsyncAgent
os . environ [ 'OPENAI_API_KEY' ] = 'YOUR_API_KEY'
class PrefixedMessageHook ( Hook ):
def __init__ ( self , prefix : str , senders : list = None ):
self . prefix = prefix
self . senders = senders or []
def before_agent ( self , agent , messages , session_id ):
for message in messages :
if message . sender in self . senders :
message . content = self . prefix + message . content
class AsyncBlogger ( AsyncAgent ):
def __init__ ( self , model_path , writer_prompt , critic_prompt , critic_prefix = '' , max_turn = 3 ):
super (). __init__ ()
llm = AsyncGPTAPI ( model_type = model_path , retry = 5 , max_new_tokens = 2048 )
self . writer = AsyncAgent ( llm , writer_prompt , name = 'writer' )
self . critic = AsyncAgent (
llm , critic_prompt , name = 'critic' , hooks = [ PrefixedMessageHook ( critic_prefix , [ 'writer' ])]
)
self . max_turn = max_turn
async def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = await self . writer ( message , session_id = session_id )
message = await self . critic ( message , session_id = session_id )
return await self . writer ( message , session_id = session_id )
blogger = AsyncBlogger (
'gpt-4o-2024-05-13' ,
writer_prompt = "You are an writing assistant tasked to write engaging blogpost. You try to generate the best blogpost possible for the user's request. "
"If the user provides critique, then respond with a revised version of your previous attempts" ,
critic_prompt = "Generate critique and recommendations on the writing. Provide detailed recommendations, including requests for length, depth, style, etc.." ,
critic_prefix = 'Reflect and provide critique on the following writing. n n ' ,
)
user_prompt = (
"Write an engaging blogpost on the recent updates in {topic}. "
"The blogpost should be engaging and understandable for general audience. "
"Should have more than 3 paragraphes but no longer than 1000 words." )
bot_msgs = asyncio . get_event_loop (). run_until_complete (
asyncio . gather (
* [
blogger ( AgentMessage ( sender = 'user' , content = user_prompt . format ( topic = topic )), session_id = i )
for i , topic in enumerate ([ 'AI' , 'Biotechnology' , 'New Energy' , 'Video Games' , 'Pop Music' ])
]
)
)
print ( bot_msgs [ 0 ]. content )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'writer.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'critic.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Un flux de travail multi-agents qui effectue la récupération d'informations, la collecte de données et le traçage de graphiques (exemple LangGraph original)