Source code for Env.Client

# !/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
# @FileName       : Client.py
# @Time           : 2024-08-03 15:02:08
# @Author         : yk
# @Email          : yangkui1127@gmail.com
# @Description:   : pybullet client 
"""


import json
import math
import time
from datetime import datetime

import numpy as np
import pybullet as p
import pybullet_data

from RoboticsToolBox import Pose
from Visualization.blender_render import PyBulletRecorder


[docs]class Client: """ Pybullet client, Provides some interfaces for interacting with simulation scenes """
[docs] def __init__(self, cfg): """ Initialize a new PyBullet Client object. Args: cfg (Config): Configuration object with various settings for the simulation. """ if cfg.enable_GUI: if cfg.enable_capture: width, height = cfg.width, cfg.height self.client_id = p.connect( p.GUI, options=f"--width={width} --height={height}" ) p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 0) else: self.client_id = p.connect(p.GUI) else: self.client_id = p.connect(p.DIRECT) if not cfg.enable_Debug: p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) if cfg.shadows: p.configureDebugVisualizer(p.COV_ENABLE_SHADOWS, 1) # enable shadows p.configureDebugVisualizer( p.COV_ENABLE_SEGMENTATION_MARK_PREVIEW, 0 ) # close segment mark preview # p.configureDebugVisualizer(p.COV_ENABLE_PLANAR_REFLECTION, 1) # enable planar reflection p.setAdditionalSearchPath(pybullet_data.getDataPath()) p.setGravity(cfg.Gravity[0], cfg.Gravity[1], cfg.Gravity[2]) p.setPhysicsEngineParameter(numSolverIterations=cfg.numSolverIterations) # p.setRealTimeSimulation(1) # set up real-time simulation # pybullet recorder for blender show self.blender = cfg.blender if self.blender: self.recorder = PyBulletRecorder() # parameters self.timestep = 1.0 / cfg.frequency self.timeout = cfg.timeout # maximum time for planning # Enable caching of graphic shapes when loading URDF files self.enable_cache = p.URDF_ENABLE_CACHED_GRAPHICS_SHAPES planeId = p.loadURDF(cfg.plane_urdf_path, flags=self.enable_cache)
# plane_path = os.path.join(pybullet_data.getDataPath(), cfg.plane_urdf_path) # planeId = self.load_object( # obj_name='plane', # model_path=plane_path, # scale=0.5, # fixed_base=True # ) # ---------------------------------------------------------------- # A few basic functions # ----------------------------------------------------------------
[docs] def get_client_id(self): """ Get the PyBullet client ID. Returns: int: The PyBullet client ID. """ return self.client_id
[docs] def get_datapath(self): """ Get the path to the PyBullet data files. Returns: str: The path to the PyBullet data files. """ return pybullet_data.getDataPath()
[docs] def disconnect(self): """ Disconnect from the PyBullet simulation. Save the recorder data if Blender is enabled. """ if self.blender: self.record_save() p.disconnect(physicsClientId=self.client_id)
[docs] def wait(self, x): # seconds """ Pause the execution for a given number of seconds. Args: x (float): Number of seconds to wait. """ time.sleep(x)
[docs] def run(self, x=1): # steps """ Step the simulation for a given number of steps. Args: x (int): Number of simulation steps to run. """ for _ in range(x): p.stepSimulation(physicsClientId=self.client_id) time.sleep(self.timestep) if self.blender: self.recorder.add_keyframe()
# ---------------------------------------------------------------- # Load scene # ----------------------------------------------------------------
[docs] def load_object( self, obj_name, model_path, object_position=[0, 0, 0], object_orientation=[0, 0, 0], scale=1, fixed_base=False, ): """ Load a given object into the PyBullet simulation environment. Args: obj_name (str): The name of the object. model_path (str): The path to the URDF file for the object. object_position (list): The initial position of the object. object_orientation (list): The initial orientation of the object. scale (float): The scale of the object. fixed_base (bool): If True, the object is fixed in place. Returns: int: The ID of the loaded object in the PyBullet simulation. """ if ( isinstance(object_orientation, (tuple, list, np.ndarray)) and len(object_orientation) == 3 ): object_orientation = p.getQuaternionFromEuler( object_orientation, physicsClientId=self.client_id ) object_id = p.loadURDF( fileName=model_path, basePosition=object_position, baseOrientation=object_orientation, globalScaling=scale, useFixedBase=fixed_base, physicsClientId=self.client_id, flags=self.enable_cache, ) if self.blender: self.register_object(object_id, model_path, scale) setattr(self, obj_name, object_id) # self.run(240) return object_id
[docs] def create_scene(self, json_path): """ Import the complete environment from the environment file based on the basic environment. Args: json_path (str): Path to the scene JSON file. """ with open(json_path, "r") as f: scene_data = json.load(f) for object in scene_data: object_orientation = [ eval(i) if isinstance(i, str) else i for i in object["object_orientation"] ] self.load_object( object["obj_name"], object["model_path"], object["object_position"], object_orientation, object["scale"], object["fixed_base"], ) self.run(480) print(f"[Client] \033[34mInfo\033[0m: success load scene from {json_path}!")
# ---------------------------------------------------------------- # object joint info / operate # ----------------------------------------------------------------
[docs] def remove_object(self, object): """ Remove an object from the scene. Args: object (int / str): The ID or name of the object to remove. """ object_id = self.resolve_object_id(object) p.removeBody(object_id) self.run(10)
[docs] def change_object_joint_angle( self, object, joint_index, target_position, max_force=5 ): """ Change the state of a specific joint of the object. Args: object (int / str): The ID or name of the object. joint_index (int): The index of the joint to be changed. target_position (float): The target position of the joint in radians. max_force (float): The maximum force to be applied to achieve the target position. """ object_id = self.resolve_object_id(object) p.setJointMotorControl2( bodyUniqueId=object_id, jointIndex=joint_index, controlMode=p.POSITION_CONTROL, targetPosition=target_position, force=max_force, physicsClientId=self.client_id, ) self.run(10)
# ---------------------------------------------------------------- # Get info from environment # ----------------------------------------------------------------
[docs] def get_object_id(self, object_name): """ Get the ID of an object by its name. Args: object_name (str): The name of the object. Returns: int: The ID of the object. """ return getattr(self, object_name)
[docs] def resolve_object_id(self, object): """ Resolve the object ID from either an ID or name. Args: object (int / str): The ID or name of the object. Returns: int: The ID of the object. Raises: AssertionError: If the object name does not exist or the input type is incorrect. """ if isinstance(object, str): assert hasattr(self, object), f"scene has not object named {object}!" object_id = self.get_object_id(object) elif isinstance(object, int): object_id = object else: assert (0, "error input type") return object_id
[docs] def print_object_joint_info(self, object): """ Print information about the links and joints of an object. Args: object (int / str): The ID or name of the object. """ object_id = self.resolve_object_id(object) num_joints = p.getNumJoints(object_id, physicsClientId=self.client_id) print( "[Client] \033[34mInfo\033[0m: The object {} has {} joints".format( object_id, num_joints ) ) for i in range(num_joints): joint_info = p.getJointInfo(object_id, i, physicsClientId=self.client_id) joint_name = joint_info[1] joint_state = p.getJointState(object_id, i, physicsClientId=self.client_id) joint_angle = joint_state[0] print( "[Client] \033[34mInfo\033[0m: Joint index:{}, name:{}, angle:{}".format( i, joint_name, joint_angle ) )
[docs] def get_object_pose(self, object): """ Retrieve the pose of a object. Args: object (int / str): The ID or name of the object. Returns: Pose: The pose of the object. """ object_id = self.resolve_object_id(object) position, orientation = p.getBasePositionAndOrientation(object_id) return Pose(position, orientation)
[docs] def get_bounding_box(self, object): """ Retrieve the bounding box of a given object in the PyBullet simulation environment. Args: object (int / str): The ID or name of the object. Returns: list: The minimum and maximum x, y, z coordinates of the bounding box. """ object_id = self.resolve_object_id(object) link_ids = [ i for i in range( -1, p.getNumJoints(object_id, physicsClientId=self.client_id) ) ] aabb_bounds = np.array( [ p.getAABB(object_id, link_id, physicsClientId=self.client_id) for link_id in link_ids ] ) min_bounds = aabb_bounds[:, 0, :] max_bounds = aabb_bounds[:, 1, :] min_x, min_y, min_z = np.min(min_bounds, axis=0) max_x, max_y, max_z = np.max(max_bounds, axis=0) return [min_x, min_y, min_z, max_x, max_y, max_z]
# ---------------------------------------------------------------- # For blender # ----------------------------------------------------------------
[docs] def register_object(self, object_id, model_path, scale=1.0): """ Register an object with the PyBullet-blender recorder. Args: object_id (int): The ID of the object. model_path (str): The path to the model file of the object. scale (float): The scale of the object. """ self.recorder.register_object(object_id, model_path, scale)
[docs] def record_save(self): """ Save the current pybullet-blender recording to a file with a timestamped name. """ current_time = datetime.now().strftime("%Y%m%d_%H%M%S") self.recorder.save(f"../Examples/record/{current_time}.pkl")