Machen Sie mit? (Twitter), Discord und WeChat
Von der Quelle installieren:
git clone https://github.com/InternLM/lagent.git
cd lagent
pip install -e .
Lagent ist von der Designphilosophie von PyTorch inspiriert. Wir gehen davon aus, dass die Analogie der neuronalen Netzwerkschichten den Arbeitsablauf klarer und intuitiver macht, sodass sich Benutzer nur auf die Erstellung von Schichten und die Definition der Nachrichtenübermittlung zwischen ihnen auf pythonische Weise konzentrieren müssen. Dies ist ein einfaches Tutorial, das Ihnen den schnellen Einstieg in die Erstellung von Multi-Agent-Anwendungen erleichtert.
Agenten verwenden AgentMessage
für die Kommunikation.
from typing import Dict , List
from lagent . agents import Agent
from lagent . schema import AgentMessage
from lagent . llms import VllmModel , INTERNLM2_META
llm = VllmModel (
path = 'Qwen/Qwen2-7B-Instruct' ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ '<|im_end|>' ],
max_new_tokens = 1024 ,
)
system_prompt = '你的回答只能从“典”、“孝”、“急”三个字中选一个。'
agent = Agent ( llm , system_prompt )
user_msg = AgentMessage ( sender = 'user' , content = '今天天气情况' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='急' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Sowohl Eingabe- als auch Ausgabenachrichten werden bei jedem Weiterleitungsdurchlauf zum Speicher des Agent
hinzugefügt. Dies erfolgt in __call__
und nicht in forward
. Siehe den folgenden Pseudocode
def __call__ ( self , * message ):
message = pre_hooks ( message )
add_memory ( message )
message = self . forward ( * message )
add_memory ( message )
message = post_hooks ( message )
return message
Überprüfen Sie den Speicher auf zwei Arten
memory : List [ AgentMessage ] = agent . memory . get_memory ()
print ( memory )
print ( '-' * 120 )
dumped_memory : Dict [ str , List [ dict ]] = agent . state_dict ()
print ( dumped_memory [ 'memory' ])
[AgentMessage(content='今天天气情况', sender='user', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>), AgentMessage(content='急', sender='Agent', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>)]
------------------------------------------------------------------------------------------------------------------------
[{'content': '今天天气情况', 'sender': 'user', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}, {'content': '急', 'sender': 'Agent', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}]
Löschen Sie den Speicher dieser Sitzung (standardmäßig session_id=0
):
agent . reset ()
DefaultAggregator
wird unter der Haube aufgerufen, um AgentMessage
zusammenzustellen und in das OpenAI-Nachrichtenformat zu konvertieren.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
formatted_messages = self . aggregator . aggregate (
self . memory . get ( session_id ),
self . name ,
self . output_format ,
self . template ,
)
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
...
Implementieren Sie einen einfachen Aggregator, der wenige Schüsse empfangen kann
from typing import List , Union
from lagent . memory import Memory
from lagent . prompts import StrParser
from lagent . agents . aggregator import DefaultAggregator
class FewshotAggregator ( DefaultAggregator ):
def __init__ ( self , few_shot : List [ dict ] = None ):
self . few_shot = few_shot or []
def aggregate ( self ,
messages : Memory ,
name : str ,
parser : StrParser = None ,
system_instruction : Union [ str , dict , List [ dict ]] = None ) -> List [ dict ]:
_message = []
if system_instruction :
_message . extend (
self . aggregate_system_intruction ( system_instruction ))
_message . extend ( self . few_shot )
messages = messages . get_memory ()
for message in messages :
if message . sender == name :
_message . append (
dict ( role = 'assistant' , content = str ( message . content )))
else :
user_message = message . content
if len ( _message ) > 0 and _message [ - 1 ][ 'role' ] == 'user' :
_message [ - 1 ][ 'content' ] += user_message
else :
_message . append ( dict ( role = 'user' , content = user_message ))
return _message
agent = Agent (
llm ,
aggregator = FewshotAggregator (
[
{ "role" : "user" , "content" : "今天天气" },
{ "role" : "assistant" , "content" : "【晴】" },
]
)
)
user_msg = AgentMessage ( sender = 'user' , content = '昨天天气' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='【多云转晴,夜间有轻微降温】' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
In AgentMessage
ist formatted
zum Speichern von Informationen reserviert, die von output_format
aus der Modellausgabe analysiert werden.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
...
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
if self . output_format :
formatted_messages = self . output_format . parse_response ( llm_response )
return AgentMessage (
sender = self . name ,
content = llm_response ,
formatted = formatted_messages ,
)
...
Verwenden Sie einen Tool-Parser wie folgt
from lagent . prompts . parsers import ToolParser
system_prompt = "逐步分析并编写Python代码解决以下问题。"
parser = ToolParser ( tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n ' )
llm . gen_params [ 'stop_words' ]. append ( ' n ``` n ' )
agent = Agent ( llm , system_prompt , output_format = parser )
user_msg = AgentMessage (
sender = 'user' ,
content = 'Marie is thinking of a multiple of 63, while Jay is thinking of a '
'factor of 63. They happen to be thinking of the same number. There are '
'two possibilities for the number that each of them is thinking of, one '
'positive and one negative. Find the product of these two numbers.' )
bot_msg = agent ( user_msg )
print ( bot_msg . model_dump_json ( indent = 4 ))
{
"content": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn```pythonndef find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"sender": "Agent",
"formatted": {
"tool_type": "code interpreter",
"thought": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn",
"action": "def find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"status": 1
},
"extra_info": null,
"type": null,
"receiver": null,
"stream_state": 0
}
ActionExecutor
verwendet dieselbe Kommunikationsdatenstruktur wie Agent
, erfordert jedoch, dass der Inhalt der Eingabe AgentMessage
ein Diktat ist, das Folgendes enthält:
name
: Toolname, z. B. 'IPythonInterpreter'
, 'WebBrowser.search'
.parameters
: Schlüsselwortargumente der Tool-API, z. B. {'command': 'import math;math.sqrt(2)'}
, {'query': ['recent progress in AI']}
.Sie können benutzerdefinierte Hooks für die Nachrichtenkonvertierung registrieren.
from lagent . hooks import Hook
from lagent . schema import ActionReturn , ActionStatusCode , AgentMessage
from lagent . actions import ActionExecutor , IPythonInteractive
class CodeProcessor ( Hook ):
def before_action ( self , executor , message , session_id ):
message = message . copy ( deep = True )
message . content = dict (
name = 'IPythonInteractive' , parameters = { 'command' : message . formatted [ 'action' ]}
)
return message
def after_action ( self , executor , message , session_id ):
action_return = message . content
if isinstance ( action_return , ActionReturn ):
if action_return . state == ActionStatusCode . SUCCESS :
response = action_return . format_result ()
else :
response = action_return . errmsg
else :
response = action_return
message . content = response
return message
executor = ActionExecutor ( actions = [ IPythonInteractive ()], hooks = [ CodeProcessor ()])
bot_msg = AgentMessage (
sender = 'Agent' ,
content = '首先,我们需要...' ,
formatted = {
'tool_type' : 'code interpreter' ,
'thought' : '首先,我们需要...' ,
'action' : 'def find_numbers(): n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63] n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63] n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors] n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors] n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0] n n return product n n result = find_numbers() n print(result)' ,
'status' : 1
})
executor_msg = executor ( bot_msg )
print ( executor_msg )
content='3969.0' sender='ActionExecutor' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Der Einfachheit halber stellt Lagent InternLMActionProcessor
zur Verfügung, der wie oben erwähnt an von ToolParser
formatierte Nachrichten angepasst ist.
Lagent übernimmt das Dual-Interface-Design, bei dem fast jede Komponente (LLMs, Aktionen, Aktionsausführer ...) über die entsprechende asynchrone Variante verfügt, indem ihrer Kennung „Async“ vorangestellt wird. Es wird empfohlen, synchrone Agenten zum Debuggen und asynchrone Agenten für groß angelegte Inferenzen zu verwenden, um ungenutzte CPU- und GPU-Ressourcen optimal zu nutzen.
Stellen Sie jedoch die interne Konsistenz der Agenten sicher, d. h. asynchrone Agenten sollten mit asynchronen LLMs und asynchronen Aktionsausführern ausgestattet sein, die asynchrone Tools steuern.
from lagent . llms import VllmModel , AsyncVllmModel , LMDeployPipeline , AsyncLMDeployPipeline
from lagent . actions import ActionExecutor , AsyncActionExecutor , WebBrowser , AsyncWebBrowser
from lagent . agents import Agent , AsyncAgent , AgentForInternLM , AsyncAgentForInternLM
forward
anstelle von __call__
von Unterklassen zu implementieren, sofern dies nicht erforderlich ist.session_id
ein, das für die Isolierung von Speicher, LLM-Anfragen und Tool-Aufrufen (z. B. Pflege mehrerer unabhängiger IPython-Umgebungen) im Parallelbetrieb gedacht ist.Mathe-Agenten, die Probleme durch Programmierung lösen
from lagent . agents . aggregator import InternLMToolAggregator
class Coder ( Agent ):
def __init__ ( self , model_path , system_prompt , max_turn = 3 ):
super (). __init__ ()
llm = VllmModel (
path = model_path ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ ' n ``` n ' , '<|im_end|>' ],
max_new_tokens = 1024 ,
)
self . agent = Agent (
llm ,
system_prompt ,
output_format = ToolParser (
tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n '
),
# `InternLMToolAggregator` is adapted to `ToolParser` for aggregating
# messages with tool invocations and execution results
aggregator = InternLMToolAggregator (),
)
self . executor = ActionExecutor ([ IPythonInteractive ()], hooks = [ CodeProcessor ()])
self . max_turn = max_turn
def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = self . agent ( message , session_id = session_id )
if message . formatted [ 'tool_type' ] is None :
return message
message = self . executor ( message , session_id = session_id )
return message
coder = Coder ( 'Qwen/Qwen2-7B-Instruct' , 'Solve the problem step by step with assistance of Python code' )
query = AgentMessage (
sender = 'user' ,
content = 'Find the projection of $ \ mathbf{a}$ onto $ \ mathbf{b} = '
' \ begin{pmatrix} 1 \ \ -3 \ end{pmatrix}$ if $ \ mathbf{a} \ cdot \ mathbf{b} = 2.$'
)
answer = coder ( query )
print ( answer . content )
print ( '-' * 120 )
for msg in coder . state_dict ()[ 'agent.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Asynchrone Blogging-Agenten, die die Schreibqualität durch Selbstverfeinerung verbessern (ursprüngliches AutoGen-Beispiel)
import asyncio
import os
from lagent . llms import AsyncGPTAPI
from lagent . agents import AsyncAgent
os . environ [ 'OPENAI_API_KEY' ] = 'YOUR_API_KEY'
class PrefixedMessageHook ( Hook ):
def __init__ ( self , prefix : str , senders : list = None ):
self . prefix = prefix
self . senders = senders or []
def before_agent ( self , agent , messages , session_id ):
for message in messages :
if message . sender in self . senders :
message . content = self . prefix + message . content
class AsyncBlogger ( AsyncAgent ):
def __init__ ( self , model_path , writer_prompt , critic_prompt , critic_prefix = '' , max_turn = 3 ):
super (). __init__ ()
llm = AsyncGPTAPI ( model_type = model_path , retry = 5 , max_new_tokens = 2048 )
self . writer = AsyncAgent ( llm , writer_prompt , name = 'writer' )
self . critic = AsyncAgent (
llm , critic_prompt , name = 'critic' , hooks = [ PrefixedMessageHook ( critic_prefix , [ 'writer' ])]
)
self . max_turn = max_turn
async def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = await self . writer ( message , session_id = session_id )
message = await self . critic ( message , session_id = session_id )
return await self . writer ( message , session_id = session_id )
blogger = AsyncBlogger (
'gpt-4o-2024-05-13' ,
writer_prompt = "You are an writing assistant tasked to write engaging blogpost. You try to generate the best blogpost possible for the user's request. "
"If the user provides critique, then respond with a revised version of your previous attempts" ,
critic_prompt = "Generate critique and recommendations on the writing. Provide detailed recommendations, including requests for length, depth, style, etc.." ,
critic_prefix = 'Reflect and provide critique on the following writing. n n ' ,
)
user_prompt = (
"Write an engaging blogpost on the recent updates in {topic}. "
"The blogpost should be engaging and understandable for general audience. "
"Should have more than 3 paragraphes but no longer than 1000 words." )
bot_msgs = asyncio . get_event_loop (). run_until_complete (
asyncio . gather (
* [
blogger ( AgentMessage ( sender = 'user' , content = user_prompt . format ( topic = topic )), session_id = i )
for i , topic in enumerate ([ 'AI' , 'Biotechnology' , 'New Energy' , 'Video Games' , 'Pop Music' ])
]
)
)
print ( bot_msgs [ 0 ]. content )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'writer.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'critic.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Ein Multi-Agent-Workflow, der den Informationsabruf, die Datenerfassung und die Diagrammerstellung durchführt (ursprüngliches LangGraph-Beispiel)