文档:文档
简单的机器人代理示例:
使用 SimplerEnv 的模拟示例:
?使用 OpenVLA 的汽车代理:
⏺️在机器人上记录数据集
?支持、讨论和操作方法:
更新:
2024 年 8 月 28 日,体现代理 v1.2
mbodied
来尝试它们。2024 年 6 月 30 日,embodied-agents v1.0 :
mbodied
。embodied Agents是一个工具包,只需几行代码即可将大型多模态模型集成到现有的机器人堆栈中。它提供一致性、可靠性、可扩展性,并且可配置到任何观察和操作空间。
Sample 类是用于序列化、记录和操作任意数据的基本模型。它被设计为可扩展、灵活且强类型。通过将观察或操作对象包装在 Sample 类中,您将能够轻松地在以下对象之间进行转换:
要了解有关具体代理的所有可能性的更多信息,请查看文档
Sample
或 Dict 列表pack
到单个Sample
或Dict
中并相应地unpack
吗?unflatten
为Sample
类吗? 创建 Sample 只需要使用Sample
类包装一个 Python 字典。此外,它们还可以由 kwargs、Gym Spaces 和 Tensors 等制成。
from mbodied . types . sample import Sample
# Creating a Sample instance
sample = Sample ( observation = [ 1 , 2 , 3 ], action = [ 4 , 5 , 6 ])
# Flattening the Sample instance
flat_list = sample . flatten ()
print ( flat_list ) # Output: [1, 2, 3, 4, 5, 6]
# Generating a simplified JSON schema
>> > schema = sample . schema ()
{ 'type' : 'object' , 'properties' : { 'observation' : { 'type' : 'array' , 'items' : { 'type' : 'integer' }}, 'action' : { 'type' : 'array' , 'items' : { 'type' : 'integer' }}}}
# Unflattening a list into a Sample instance
Sample . unflatten ( flat_list , schema )
>> > Sample ( observation = [ 1 , 2 , 3 ], action = [ 4 , 5 , 6 ])
Sample 类利用 Pydantic 强大的序列化和反序列化功能,使您可以轻松地在 Sample 实例和 JSON 之间进行转换。
# Serialize the Sample instance to JSON
sample = Sample ( observation = [ 1 , 2 , 3 ], action = [ 4 , 5 , 6 ])
json_data = sample . model_dump_json ()
print ( json_data ) # Output: '{"observation": [1, 2, 3], "action": [4, 5, 6]}'
# Deserialize the JSON data back into a Sample instance
json_data = '{"observation": [1, 2, 3], "action": [4, 5, 6]}'
sample = Sample . model_validate ( from_json ( json_data ))
print ( sample ) # Output: Sample(observation=[1, 2, 3], action=[4, 5, 6])
# Converting to a dictionary
sample_dict = sample . to ( "dict" )
print ( sample_dict ) # Output: {'observation': [1, 2, 3], 'action': [4, 5, 6]}
# Converting to a NumPy array
sample_np = sample . to ( "np" )
print ( sample_np ) # Output: array([1, 2, 3, 4, 5, 6])
# Converting to a PyTorch tensor
sample_pt = sample . to ( "pt" )
print ( sample_pt ) # Output: tensor([1, 2, 3, 4, 5, 6])
gym_space = sample . space ()
print ( gym_space )
# Output: Dict('action': Box(-inf, inf, (3,), float64), 'observation': Box(-inf, inf, (3,), float64))
有关更多详细信息,请参阅sample.py。
Message 类代表单个完成样本空间。它可以是文本、图像、文本/图像列表、样本或其他形式。 Message 类旨在处理各种类型的内容并支持不同的角色,例如用户、助理或系统。
您可以通过多种方式创建Message
。它们都可以被 mbodi 的后端理解。
from mbodied . types . message import Message
Message ( role = "user" , content = "example text" )
Message ( role = "user" , content = [ "example text" , Image ( "example.jpg" ), Image ( "example2.jpg" )])
Message ( role = "user" , content = [ Sample ( "Hello" )])
Backend 类是 Backend 实现的抽象基类。它提供了与不同后端服务交互所需的基本结构和方法,例如用于根据给定消息生成完成的 API 调用。有关如何实现各种后端的信息,请参阅后端目录。
Agent 是下面列出的各种代理的基类。它提供了一个用于创建代理的模板,该代理可以与远程后端/服务器通信并可选择记录其操作和观察结果。
语言代理可以连接到您选择的不同后端或转换器。它包括记录对话、管理上下文、查找消息、忘记消息、存储上下文以及根据指令和图像采取行动的方法。
原生支持 API 服务:OpenAI、Anthropic、vLLM、Ollama、HTTPX 或任何 gradio 端点。更多即将推出!
要将 OpenAI 用于您的机器人后端:
from mbodied . agents . language import LanguageAgent
agent = LanguageAgent ( context = "You are a robot agent." , model_src = "openai" )
执行指令:
instruction = "pick up the fork"
response = robot_agent . act ( instruction , image )
语言代理也可以连接到 vLLM。例如,假设您正在 1.2.3.4:1234 上运行 vLLM 服务器 Mistral-7B。您需要做的就是:
agent = LanguageAgent (
context = context ,
model_src = "openai" ,
model_kwargs = { "api_key" : "EMPTY" , "base_url" : "http://1.2.3.4:1234/v1" },
)
response = agent . act ( "Hello, how are you?" , model = "mistralai/Mistral-7B-Instruct-v0.3" )
使用 Ollama 的示例:
agent = LanguageAgent (
context = "You are a robot agent." , model_src = "ollama" ,
model_kwargs = { "endpoint" : "http://localhost:11434/api/chat" }
)
response = agent . act ( "Hello, how are you?" , model = "llama3.1" )
Motor Agent 与 Language Agent 类似,但它不返回字符串,而是始终返回Motion
。 Motor Agent 通常由机器人变压器模型提供支持,即 OpenVLA、RT1、Octo 等。一些小型模型(如 RT1)可以在边缘设备上运行。然而,有些(例如 OpenVLA)在没有量化的情况下运行可能会很困难。请参阅 OpenVLA 代理和示例 OpenVLA 服务器
这些代理与环境交互以收集传感器数据。它们始终返回SensorReading
,它可以是各种形式的处理后的感官输入,例如图像、深度数据或音频信号。
目前,我们有:
处理机器人传感器信息的代理。
自动代理根据任务和模型动态选择并初始化正确的代理。
from mbodied . agents . auto . auto_agent import AutoAgent
# This makes it a LanguageAgent
agent = AutoAgent ( task = "language" , model_src = "openai" )
response = agent . act ( "What is the capital of France?" )
# This makes it a motor agent: OpenVlaAgent
auto_agent = AutoAgent ( task = "motion-openvla" , model_src = "https://api.mbodi.ai/community-models/" )
action = auto_agent . act ( "move hand forward" , Image ( size = ( 224 , 224 )))
# This makes it a sensory agent: DepthEstimationAgent
auto_agent = AutoAgent ( task = "sense-depth-estimation" , model_src = "https://api.mbodi.ai/sense/" )
depth = auto_agent . act ( image = Image ( size = ( 224 , 224 )))
或者,您也可以使用 auto_agent 中的get_agent
方法。
language_agent = get_agent ( task = "language" , model_src = "openai" )
Motion_controls 模块定义了各种运动来控制机器人作为 Pydantic 模型。它们也是Sample
的子类,因此拥有上述Sample
的所有功能。这些控制涵盖了一系列动作,从简单的关节运动到复杂的姿势和完整的机器人控制。
您可以通过对 Robot 进行子类化来非常轻松地集成您的自定义机器人硬件。您只需要实现do()
函数即可执行操作(如果您想在机器人上记录数据集,还需要一些附加方法)。在我们的示例中,我们使用模拟机器人。我们还有一个 XArm 机器人作为示例。
在机器人上记录数据集非常简单!您需要做的就是为您的机器人实现get_observation()
、 get_state()
和prepare_action()
方法。之后,您可以随时在机器人上记录数据集。请参阅 example/5_teach_robot_record_dataset.py 和此 colab:了解更多详细信息。
from mbodied . robots import SimRobot
from mbodied . types . motion . control import HandControl , Pose
robot = SimRobot ()
robot . init_recorder ( frequency_hz = 5 )
with robot . record ( "pick up the fork" ):
motion = HandControl ( pose = Pose ( x = 0.1 , y = 0.2 , z = 0.3 , roll = 0.1 , pitch = 0.2 , yaw = 0.3 ))
robot . do ( motion )
数据集记录器是一个较低级别的记录器,用于在您与机器人交互/教导机器人时将您的对话和机器人的动作记录到数据集中。您可以为记录器定义任何观察空间和动作空间。有关空间的更多详细信息,请参阅体育馆。
from mbodied . data . recording import Recorder
from mbodied . types . motion . control import HandControl
from mbodied . types . sense . vision import Image
from gymnasium import spaces
observation_space = spaces . Dict ({
'image' : Image ( size = ( 224 , 224 )). space (),
'instruction' : spaces . Text ( 1000 )
})
action_space = HandControl (). space ()
recorder = Recorder ( 'example_recorder' , out_dir = 'saved_datasets' , observation_space = observation_space , action_space = action_space )
# Every time robot makes a conversation or performs an action:
recorder . record ( observation = { 'image' : image , 'instruction' : instruction ,}, action = hand_control )
数据集保存到./saved_datasets
。
Replayer 类旨在处理和管理Recorder
生成的 HDF5 文件中存储的数据。它提供了多种功能,包括读取样本、生成统计数据、提取独特的项目以及转换数据集以与 HuggingFace 一起使用。 Replayer 还支持在处理过程中保存特定图像,并提供用于各种操作的命令行界面。
使用 Replayer 迭代 Recorder 中的数据集的示例:
from mbodied . data . replaying import Replayer
replayer = Replayer ( path = str ( "path/to/dataset.h5" ))
for observation , action in replayer :
...
├─ assets/ ............. Images, icons, and other static assets
├─ examples/ ........... Example scripts and usage demonstrations
├─ resources/ .......... Additional resources for examples
├─ src/
│ └─ mbodied/
│ ├─ agents/ ....... Modules for robot agents
│ │ ├─ backends/ .. Backend implementations for different services for agents
│ │ ├─ language/ .. Language based agents modules
│ │ ├─ motion/ .... Motion based agents modules
│ │ └─ sense/ ..... Sensory, e.g. audio, processing modules
│ ├─ data/ ......... Data handling and processing
│ ├─ hardware/ ..... Hardware modules, i.e. camera
│ ├─ robot/ ........ Robot interface and interaction
│ └─ types/ ........ Common types and definitions
└─ tests/ .............. Unit tests
我们欢迎提出问题、疑问和 PR。请参阅贡献指南以获取更多信息。