junte-se a nós? (Twitter), Discord e WeChat
Instalar da fonte:
git clone https://github.com/InternLM/lagent.git
cd lagent
pip install -e .
Lagent é inspirado na filosofia de design do PyTorch. Esperamos que a analogia das camadas da rede neural torne o fluxo de trabalho mais claro e intuitivo, de modo que os usuários só precisem se concentrar na criação de camadas e na definição da passagem de mensagens entre elas de forma Pythonica. Este é um tutorial simples para você começar rapidamente a criar aplicativos multiagentes.
Os agentes usam AgentMessage
para comunicação.
from typing import Dict , List
from lagent . agents import Agent
from lagent . schema import AgentMessage
from lagent . llms import VllmModel , INTERNLM2_META
llm = VllmModel (
path = 'Qwen/Qwen2-7B-Instruct' ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ '<|im_end|>' ],
max_new_tokens = 1024 ,
)
system_prompt = '你的回答只能从“典”、“孝”、“急”三个字中选一个。'
agent = Agent ( llm , system_prompt )
user_msg = AgentMessage ( sender = 'user' , content = '今天天气情况' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='急' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
As mensagens de entrada e saída serão adicionadas à memória do Agent
em cada passagem de encaminhamento. Isso é executado em __call__
em vez de forward
. Veja o seguinte pseudocódigo
def __call__ ( self , * message ):
message = pre_hooks ( message )
add_memory ( message )
message = self . forward ( * message )
add_memory ( message )
message = post_hooks ( message )
return message
Inspecione a memória de duas maneiras
memory : List [ AgentMessage ] = agent . memory . get_memory ()
print ( memory )
print ( '-' * 120 )
dumped_memory : Dict [ str , List [ dict ]] = agent . state_dict ()
print ( dumped_memory [ 'memory' ])
[AgentMessage(content='今天天气情况', sender='user', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>), AgentMessage(content='急', sender='Agent', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>)]
------------------------------------------------------------------------------------------------------------------------
[{'content': '今天天气情况', 'sender': 'user', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}, {'content': '急', 'sender': 'Agent', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}]
Limpe a memória desta sessão ( session_id=0
por padrão):
agent . reset ()
DefaultAggregator
é chamado nos bastidores para montar e converter AgentMessage
para o formato de mensagem OpenAI.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
formatted_messages = self . aggregator . aggregate (
self . memory . get ( session_id ),
self . name ,
self . output_format ,
self . template ,
)
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
...
Implemente um agregador simples que possa receber poucas fotos
from typing import List , Union
from lagent . memory import Memory
from lagent . prompts import StrParser
from lagent . agents . aggregator import DefaultAggregator
class FewshotAggregator ( DefaultAggregator ):
def __init__ ( self , few_shot : List [ dict ] = None ):
self . few_shot = few_shot or []
def aggregate ( self ,
messages : Memory ,
name : str ,
parser : StrParser = None ,
system_instruction : Union [ str , dict , List [ dict ]] = None ) -> List [ dict ]:
_message = []
if system_instruction :
_message . extend (
self . aggregate_system_intruction ( system_instruction ))
_message . extend ( self . few_shot )
messages = messages . get_memory ()
for message in messages :
if message . sender == name :
_message . append (
dict ( role = 'assistant' , content = str ( message . content )))
else :
user_message = message . content
if len ( _message ) > 0 and _message [ - 1 ][ 'role' ] == 'user' :
_message [ - 1 ][ 'content' ] += user_message
else :
_message . append ( dict ( role = 'user' , content = user_message ))
return _message
agent = Agent (
llm ,
aggregator = FewshotAggregator (
[
{ "role" : "user" , "content" : "今天天气" },
{ "role" : "assistant" , "content" : "【晴】" },
]
)
)
user_msg = AgentMessage ( sender = 'user' , content = '昨天天气' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='【多云转晴,夜间有轻微降温】' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Em AgentMessage
, formatted
é reservado para armazenar informações analisadas por output_format
da saída do modelo.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
...
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
if self . output_format :
formatted_messages = self . output_format . parse_response ( llm_response )
return AgentMessage (
sender = self . name ,
content = llm_response ,
formatted = formatted_messages ,
)
...
Use um analisador de ferramentas da seguinte maneira
from lagent . prompts . parsers import ToolParser
system_prompt = "逐步分析并编写Python代码解决以下问题。"
parser = ToolParser ( tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n ' )
llm . gen_params [ 'stop_words' ]. append ( ' n ``` n ' )
agent = Agent ( llm , system_prompt , output_format = parser )
user_msg = AgentMessage (
sender = 'user' ,
content = 'Marie is thinking of a multiple of 63, while Jay is thinking of a '
'factor of 63. They happen to be thinking of the same number. There are '
'two possibilities for the number that each of them is thinking of, one '
'positive and one negative. Find the product of these two numbers.' )
bot_msg = agent ( user_msg )
print ( bot_msg . model_dump_json ( indent = 4 ))
{
"content": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn```pythonndef find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"sender": "Agent",
"formatted": {
"tool_type": "code interpreter",
"thought": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn",
"action": "def find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"status": 1
},
"extra_info": null,
"type": null,
"receiver": null,
"stream_state": 0
}
ActionExecutor
usa a mesma estrutura de dados de comunicação que Agent
, mas requer que o conteúdo da entrada AgentMessage
seja um dict contendo:
name
: nome da ferramenta, por exemplo 'IPythonInterpreter'
, 'WebBrowser.search'
.parameters
: argumentos de palavras-chave da API da ferramenta, por exemplo, {'command': 'import math;math.sqrt(2)'}
, {'query': ['recent progress in AI']}
.Você pode registrar ganchos personalizados para conversão de mensagens.
from lagent . hooks import Hook
from lagent . schema import ActionReturn , ActionStatusCode , AgentMessage
from lagent . actions import ActionExecutor , IPythonInteractive
class CodeProcessor ( Hook ):
def before_action ( self , executor , message , session_id ):
message = message . copy ( deep = True )
message . content = dict (
name = 'IPythonInteractive' , parameters = { 'command' : message . formatted [ 'action' ]}
)
return message
def after_action ( self , executor , message , session_id ):
action_return = message . content
if isinstance ( action_return , ActionReturn ):
if action_return . state == ActionStatusCode . SUCCESS :
response = action_return . format_result ()
else :
response = action_return . errmsg
else :
response = action_return
message . content = response
return message
executor = ActionExecutor ( actions = [ IPythonInteractive ()], hooks = [ CodeProcessor ()])
bot_msg = AgentMessage (
sender = 'Agent' ,
content = '首先,我们需要...' ,
formatted = {
'tool_type' : 'code interpreter' ,
'thought' : '首先,我们需要...' ,
'action' : 'def find_numbers(): n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63] n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63] n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors] n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors] n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0] n n return product n n result = find_numbers() n print(result)' ,
'status' : 1
})
executor_msg = executor ( bot_msg )
print ( executor_msg )
content='3969.0' sender='ActionExecutor' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
Por conveniência, Lagent fornece InternLMActionProcessor
que é adaptado para mensagens formatadas pelo ToolParser
conforme mencionado acima.
Lagent adota design de interface dupla, onde quase todos os componentes (LLMs, ações, executores de ações...) possuem a variante assíncrona correspondente prefixando seu identificador com 'Async'. Recomenda-se o uso de agentes síncronos para depuração e assíncronos para inferência em larga escala para aproveitar ao máximo os recursos ociosos de CPU e GPU.
No entanto, certifique-se da consistência interna dos agentes, ou seja, os agentes assíncronos devem ser equipados com LLMs assíncronos e executores de ações assíncronas que acionam ferramentas assíncronas.
from lagent . llms import VllmModel , AsyncVllmModel , LMDeployPipeline , AsyncLMDeployPipeline
from lagent . actions import ActionExecutor , AsyncActionExecutor , WebBrowser , AsyncWebBrowser
from lagent . agents import Agent , AsyncAgent , AgentForInternLM , AsyncAgentForInternLM
forward
em vez de __call__
de subclasses, a menos que seja necessário.session_id
, que é projetado para isolamento de memória, solicitações LLM e invocação de ferramentas (por exemplo, manter vários ambientes IPython independentes) em simultaneidade.Agentes matemáticos que resolvem problemas por programação
from lagent . agents . aggregator import InternLMToolAggregator
class Coder ( Agent ):
def __init__ ( self , model_path , system_prompt , max_turn = 3 ):
super (). __init__ ()
llm = VllmModel (
path = model_path ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ ' n ``` n ' , '<|im_end|>' ],
max_new_tokens = 1024 ,
)
self . agent = Agent (
llm ,
system_prompt ,
output_format = ToolParser (
tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n '
),
# `InternLMToolAggregator` is adapted to `ToolParser` for aggregating
# messages with tool invocations and execution results
aggregator = InternLMToolAggregator (),
)
self . executor = ActionExecutor ([ IPythonInteractive ()], hooks = [ CodeProcessor ()])
self . max_turn = max_turn
def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = self . agent ( message , session_id = session_id )
if message . formatted [ 'tool_type' ] is None :
return message
message = self . executor ( message , session_id = session_id )
return message
coder = Coder ( 'Qwen/Qwen2-7B-Instruct' , 'Solve the problem step by step with assistance of Python code' )
query = AgentMessage (
sender = 'user' ,
content = 'Find the projection of $ \ mathbf{a}$ onto $ \ mathbf{b} = '
' \ begin{pmatrix} 1 \ \ -3 \ end{pmatrix}$ if $ \ mathbf{a} \ cdot \ mathbf{b} = 2.$'
)
answer = coder ( query )
print ( answer . content )
print ( '-' * 120 )
for msg in coder . state_dict ()[ 'agent.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Agentes de blog assíncronos que melhoram a qualidade da escrita por meio de auto-refinamento (exemplo original do AutoGen)
import asyncio
import os
from lagent . llms import AsyncGPTAPI
from lagent . agents import AsyncAgent
os . environ [ 'OPENAI_API_KEY' ] = 'YOUR_API_KEY'
class PrefixedMessageHook ( Hook ):
def __init__ ( self , prefix : str , senders : list = None ):
self . prefix = prefix
self . senders = senders or []
def before_agent ( self , agent , messages , session_id ):
for message in messages :
if message . sender in self . senders :
message . content = self . prefix + message . content
class AsyncBlogger ( AsyncAgent ):
def __init__ ( self , model_path , writer_prompt , critic_prompt , critic_prefix = '' , max_turn = 3 ):
super (). __init__ ()
llm = AsyncGPTAPI ( model_type = model_path , retry = 5 , max_new_tokens = 2048 )
self . writer = AsyncAgent ( llm , writer_prompt , name = 'writer' )
self . critic = AsyncAgent (
llm , critic_prompt , name = 'critic' , hooks = [ PrefixedMessageHook ( critic_prefix , [ 'writer' ])]
)
self . max_turn = max_turn
async def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = await self . writer ( message , session_id = session_id )
message = await self . critic ( message , session_id = session_id )
return await self . writer ( message , session_id = session_id )
blogger = AsyncBlogger (
'gpt-4o-2024-05-13' ,
writer_prompt = "You are an writing assistant tasked to write engaging blogpost. You try to generate the best blogpost possible for the user's request. "
"If the user provides critique, then respond with a revised version of your previous attempts" ,
critic_prompt = "Generate critique and recommendations on the writing. Provide detailed recommendations, including requests for length, depth, style, etc.." ,
critic_prefix = 'Reflect and provide critique on the following writing. n n ' ,
)
user_prompt = (
"Write an engaging blogpost on the recent updates in {topic}. "
"The blogpost should be engaging and understandable for general audience. "
"Should have more than 3 paragraphes but no longer than 1000 words." )
bot_msgs = asyncio . get_event_loop (). run_until_complete (
asyncio . gather (
* [
blogger ( AgentMessage ( sender = 'user' , content = user_prompt . format ( topic = topic )), session_id = i )
for i , topic in enumerate ([ 'AI' , 'Biotechnology' , 'New Energy' , 'Video Games' , 'Pop Music' ])
]
)
)
print ( bot_msgs [ 0 ]. content )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'writer.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'critic.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
Um fluxo de trabalho multiagente que realiza recuperação de informações, coleta de dados e plotagem de gráficos (exemplo original do LangGraph)