우리와 함께 하시겠습니까? (트위터), 디스코드, 위챗
소스에서 설치:
git clone https://github.com/InternLM/lagent.git
cd lagent
pip install -e .
Lagent는 PyTorch의 디자인 철학에서 영감을 받았습니다. 우리는 신경망 레이어의 비유가 워크플로를 더 명확하고 직관적으로 만들 것으로 기대하므로 사용자는 Python 방식으로 레이어를 생성하고 레이어 사이에 전달되는 메시지를 정의하는 데만 집중하면 됩니다. 이는 다중 에이전트 애플리케이션 구축을 빠르게 시작할 수 있는 간단한 튜토리얼입니다.
에이전트는 통신을 위해 AgentMessage
사용합니다.
from typing import Dict , List
from lagent . agents import Agent
from lagent . schema import AgentMessage
from lagent . llms import VllmModel , INTERNLM2_META
llm = VllmModel (
path = 'Qwen/Qwen2-7B-Instruct' ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ '<|im_end|>' ],
max_new_tokens = 1024 ,
)
system_prompt = '你的回答只能从“典”、“孝”、“急”三个字中选一个。'
agent = Agent ( llm , system_prompt )
user_msg = AgentMessage ( sender = 'user' , content = '今天天气情况' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='急' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
입력 및 출력 메시지는 각 정방향 전달에서 Agent
의 메모리에 추가됩니다. 이는 forward
이 아닌 __call__
에서 수행됩니다. 다음 의사 코드를 참조하세요.
def __call__ ( self , * message ):
message = pre_hooks ( message )
add_memory ( message )
message = self . forward ( * message )
add_memory ( message )
message = post_hooks ( message )
return message
두 가지 방법으로 메모리를 검사합니다.
memory : List [ AgentMessage ] = agent . memory . get_memory ()
print ( memory )
print ( '-' * 120 )
dumped_memory : Dict [ str , List [ dict ]] = agent . state_dict ()
print ( dumped_memory [ 'memory' ])
[AgentMessage(content='今天天气情况', sender='user', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>), AgentMessage(content='急', sender='Agent', formatted=None, extra_info=None, type=None, receiver=None, stream_state=<AgentStatusCode.END: 0>)]
------------------------------------------------------------------------------------------------------------------------
[{'content': '今天天气情况', 'sender': 'user', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}, {'content': '急', 'sender': 'Agent', 'formatted': None, 'extra_info': None, 'type': None, 'receiver': None, 'stream_state': <AgentStatusCode.END: 0>}]
이 세션의 메모리를 지웁니다(기본적으로 session_id=0
).
agent . reset ()
DefaultAggregator
AgentMessage
OpenAI 메시지 형식으로 어셈블하고 변환하기 위해 내부적으로 호출됩니다.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
formatted_messages = self . aggregator . aggregate (
self . memory . get ( session_id ),
self . name ,
self . output_format ,
self . template ,
)
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
...
몇 장의 샷을 수신할 수 있는 간단한 수집기 구현
from typing import List , Union
from lagent . memory import Memory
from lagent . prompts import StrParser
from lagent . agents . aggregator import DefaultAggregator
class FewshotAggregator ( DefaultAggregator ):
def __init__ ( self , few_shot : List [ dict ] = None ):
self . few_shot = few_shot or []
def aggregate ( self ,
messages : Memory ,
name : str ,
parser : StrParser = None ,
system_instruction : Union [ str , dict , List [ dict ]] = None ) -> List [ dict ]:
_message = []
if system_instruction :
_message . extend (
self . aggregate_system_intruction ( system_instruction ))
_message . extend ( self . few_shot )
messages = messages . get_memory ()
for message in messages :
if message . sender == name :
_message . append (
dict ( role = 'assistant' , content = str ( message . content )))
else :
user_message = message . content
if len ( _message ) > 0 and _message [ - 1 ][ 'role' ] == 'user' :
_message [ - 1 ][ 'content' ] += user_message
else :
_message . append ( dict ( role = 'user' , content = user_message ))
return _message
agent = Agent (
llm ,
aggregator = FewshotAggregator (
[
{ "role" : "user" , "content" : "今天天气" },
{ "role" : "assistant" , "content" : "【晴】" },
]
)
)
user_msg = AgentMessage ( sender = 'user' , content = '昨天天气' )
bot_msg = agent ( user_msg )
print ( bot_msg )
content='【多云转晴,夜间有轻微降温】' sender='Agent' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
AgentMessage
에서 formatted
모델 출력의 output_format
으로 구문 분석된 정보를 저장하기 위해 예약되어 있습니다.
def forward ( self , * message : AgentMessage , session_id = 0 , ** kwargs ) -> Union [ AgentMessage , str ]:
...
llm_response = self . llm . chat ( formatted_messages , ** kwargs )
if self . output_format :
formatted_messages = self . output_format . parse_response ( llm_response )
return AgentMessage (
sender = self . name ,
content = llm_response ,
formatted = formatted_messages ,
)
...
다음과 같이 도구 파서를 사용하십시오.
from lagent . prompts . parsers import ToolParser
system_prompt = "逐步分析并编写Python代码解决以下问题。"
parser = ToolParser ( tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n ' )
llm . gen_params [ 'stop_words' ]. append ( ' n ``` n ' )
agent = Agent ( llm , system_prompt , output_format = parser )
user_msg = AgentMessage (
sender = 'user' ,
content = 'Marie is thinking of a multiple of 63, while Jay is thinking of a '
'factor of 63. They happen to be thinking of the same number. There are '
'two possibilities for the number that each of them is thinking of, one '
'positive and one negative. Find the product of these two numbers.' )
bot_msg = agent ( user_msg )
print ( bot_msg . model_dump_json ( indent = 4 ))
{
"content": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn```pythonndef find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"sender": "Agent",
"formatted": {
"tool_type": "code interpreter",
"thought": "首先,我们需要找出63的所有正因数和负因数。63的正因数可以通过分解63的质因数来找出,即\(63 = 3^2 \times 7\)。因此,63的正因数包括1, 3, 7, 9, 21, 和 63。对于负因数,我们只需将上述正因数乘以-1。nn接下来,我们需要找出与63的正因数相乘的结果为63的数,以及与63的负因数相乘的结果为63的数。这可以通过将63除以每个正因数和负因数来实现。nn最后,我们将找到的两个数相乘得到最终答案。nn下面是Python代码实现:nn",
"action": "def find_numbers():n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63]n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63]n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors]n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors]n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0]n n return productnnresult = find_numbers()nprint(result)",
"status": 1
},
"extra_info": null,
"type": null,
"receiver": null,
"stream_state": 0
}
ActionExecutor
Agent
와 동일한 통신 데이터 구조를 사용하지만 입력 AgentMessage
의 내용은 다음을 포함하는 사전이어야 합니다.
name
: 도구 이름, 예: 'IPythonInterpreter'
, 'WebBrowser.search'
.parameters
: 도구 API의 키워드 인수(예 {'command': 'import math;math.sqrt(2)'}
, {'query': ['recent progress in AI']}
메시지 변환을 위한 사용자 정의 후크를 등록할 수 있습니다.
from lagent . hooks import Hook
from lagent . schema import ActionReturn , ActionStatusCode , AgentMessage
from lagent . actions import ActionExecutor , IPythonInteractive
class CodeProcessor ( Hook ):
def before_action ( self , executor , message , session_id ):
message = message . copy ( deep = True )
message . content = dict (
name = 'IPythonInteractive' , parameters = { 'command' : message . formatted [ 'action' ]}
)
return message
def after_action ( self , executor , message , session_id ):
action_return = message . content
if isinstance ( action_return , ActionReturn ):
if action_return . state == ActionStatusCode . SUCCESS :
response = action_return . format_result ()
else :
response = action_return . errmsg
else :
response = action_return
message . content = response
return message
executor = ActionExecutor ( actions = [ IPythonInteractive ()], hooks = [ CodeProcessor ()])
bot_msg = AgentMessage (
sender = 'Agent' ,
content = '首先,我们需要...' ,
formatted = {
'tool_type' : 'code interpreter' ,
'thought' : '首先,我们需要...' ,
'action' : 'def find_numbers(): n # 正因数n positive_factors = [1, 3, 7, 9, 21, 63] n # 负因数n negative_factors = [-1, -3, -7, -9, -21, -63] n n # 找到与正因数相乘的结果为63的数n positive_numbers = [63 / factor for factor in positive_factors] n # 找到与负因数相乘的结果为63的数n negative_numbers = [-63 / factor for factor in negative_factors] n n # 计算两个数的乘积n product = positive_numbers[0] * negative_numbers[0] n n return product n n result = find_numbers() n print(result)' ,
'status' : 1
})
executor_msg = executor ( bot_msg )
print ( executor_msg )
content='3969.0' sender='ActionExecutor' formatted=None extra_info=None type=None receiver=None stream_state=<AgentStatusCode.END: 0>
편의를 위해 Lagent는 위에서 언급한 ToolParser
로 형식화된 메시지에 맞게 조정된 InternLMActionProcessor
제공합니다.
Lagent는 거의 모든 구성 요소(LLM, 작업, 작업 실행기...)에 식별자 앞에 'Async'를 붙여 해당 비동기 변형을 갖는 이중 인터페이스 디자인을 채택합니다. 유휴 CPU 및 GPU 리소스를 최대한 활용하려면 디버깅용 동기 에이전트와 대규모 추론용 비동기 에이전트를 사용하는 것이 좋습니다.
그러나 에이전트의 내부 일관성을 확인하십시오. 즉, 비동기 에이전트에는 비동기 도구를 구동하는 비동기 LLM 및 비동기 작업 실행기가 장착되어야 합니다.
from lagent . llms import VllmModel , AsyncVllmModel , LMDeployPipeline , AsyncLMDeployPipeline
from lagent . actions import ActionExecutor , AsyncActionExecutor , WebBrowser , AsyncWebBrowser
from lagent . agents import Agent , AsyncAgent , AgentForInternLM , AsyncAgentForInternLM
__call__
대신 forward
구현해 보세요.session_id
인수를 항상 명시적으로 포함하십시오.프로그래밍으로 문제를 해결하는 수학 에이전트
from lagent . agents . aggregator import InternLMToolAggregator
class Coder ( Agent ):
def __init__ ( self , model_path , system_prompt , max_turn = 3 ):
super (). __init__ ()
llm = VllmModel (
path = model_path ,
meta_template = INTERNLM2_META ,
tp = 1 ,
top_k = 1 ,
temperature = 1.0 ,
stop_words = [ ' n ``` n ' , '<|im_end|>' ],
max_new_tokens = 1024 ,
)
self . agent = Agent (
llm ,
system_prompt ,
output_format = ToolParser (
tool_type = 'code interpreter' , begin = '```python n ' , end = ' n ``` n '
),
# `InternLMToolAggregator` is adapted to `ToolParser` for aggregating
# messages with tool invocations and execution results
aggregator = InternLMToolAggregator (),
)
self . executor = ActionExecutor ([ IPythonInteractive ()], hooks = [ CodeProcessor ()])
self . max_turn = max_turn
def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = self . agent ( message , session_id = session_id )
if message . formatted [ 'tool_type' ] is None :
return message
message = self . executor ( message , session_id = session_id )
return message
coder = Coder ( 'Qwen/Qwen2-7B-Instruct' , 'Solve the problem step by step with assistance of Python code' )
query = AgentMessage (
sender = 'user' ,
content = 'Find the projection of $ \ mathbf{a}$ onto $ \ mathbf{b} = '
' \ begin{pmatrix} 1 \ \ -3 \ end{pmatrix}$ if $ \ mathbf{a} \ cdot \ mathbf{b} = 2.$'
)
answer = coder ( query )
print ( answer . content )
print ( '-' * 120 )
for msg in coder . state_dict ()[ 'agent.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
자체 개선을 통해 글쓰기 품질을 향상시키는 비동기 블로깅 에이전트(원본 AutoGen 예)
import asyncio
import os
from lagent . llms import AsyncGPTAPI
from lagent . agents import AsyncAgent
os . environ [ 'OPENAI_API_KEY' ] = 'YOUR_API_KEY'
class PrefixedMessageHook ( Hook ):
def __init__ ( self , prefix : str , senders : list = None ):
self . prefix = prefix
self . senders = senders or []
def before_agent ( self , agent , messages , session_id ):
for message in messages :
if message . sender in self . senders :
message . content = self . prefix + message . content
class AsyncBlogger ( AsyncAgent ):
def __init__ ( self , model_path , writer_prompt , critic_prompt , critic_prefix = '' , max_turn = 3 ):
super (). __init__ ()
llm = AsyncGPTAPI ( model_type = model_path , retry = 5 , max_new_tokens = 2048 )
self . writer = AsyncAgent ( llm , writer_prompt , name = 'writer' )
self . critic = AsyncAgent (
llm , critic_prompt , name = 'critic' , hooks = [ PrefixedMessageHook ( critic_prefix , [ 'writer' ])]
)
self . max_turn = max_turn
async def forward ( self , message : AgentMessage , session_id = 0 ) -> AgentMessage :
for _ in range ( self . max_turn ):
message = await self . writer ( message , session_id = session_id )
message = await self . critic ( message , session_id = session_id )
return await self . writer ( message , session_id = session_id )
blogger = AsyncBlogger (
'gpt-4o-2024-05-13' ,
writer_prompt = "You are an writing assistant tasked to write engaging blogpost. You try to generate the best blogpost possible for the user's request. "
"If the user provides critique, then respond with a revised version of your previous attempts" ,
critic_prompt = "Generate critique and recommendations on the writing. Provide detailed recommendations, including requests for length, depth, style, etc.." ,
critic_prefix = 'Reflect and provide critique on the following writing. n n ' ,
)
user_prompt = (
"Write an engaging blogpost on the recent updates in {topic}. "
"The blogpost should be engaging and understandable for general audience. "
"Should have more than 3 paragraphes but no longer than 1000 words." )
bot_msgs = asyncio . get_event_loop (). run_until_complete (
asyncio . gather (
* [
blogger ( AgentMessage ( sender = 'user' , content = user_prompt . format ( topic = topic )), session_id = i )
for i , topic in enumerate ([ 'AI' , 'Biotechnology' , 'New Energy' , 'Video Games' , 'Pop Music' ])
]
)
)
print ( bot_msgs [ 0 ]. content )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'writer.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
print ( '-' * 120 )
for msg in blogger . state_dict ( session_id = 0 )[ 'critic.memory' ]:
print ( '*' * 80 )
print ( f' { msg [ "sender" ] } : n n { msg [ "content" ] } ' )
정보 검색, 데이터 수집 및 차트 작성을 수행하는 다중 에이전트 워크플로우(원본 LangGraph 예)