Module inovopy.rosbridge
Ros Bridge Module
This module contain function and class for using rosbridge api to command and get state from inovo arm
Classes
class InovoRos (host: str, logger: logging.Logger | str | None = None)
-
Expand source code
class InovoRos(inovopy.util.Loggable): """ # RosBridge A class managing rosbridge api communication ## Usage ```python from inovopy.rosbridge import InovoRos ros = InovoRos("192.168.1.1") ros.estop_reset() ``` """ # Ros ros: roslibpy.Ros """connection manager to ROS server (`roslibpy.Ros`)""" # Tcp Speed tcp_speed_lin: float = 0 tcp_speed_ang: float = 0 # Tcp Pose tcp_pose_vec: tuple[float, float, float] = (0, 0, 0) tcp_pose_quat: tuple[float, float, float, float] = (0, 0, 0, 0) # Joint State joint_pos: list[float] = [] joint_vel: list[float] = [] joint_eff: list[float] = [] # Power Status voltage: float = 0 current: float = 0 power_status: str = 0 # Robot Status driver_state: str = "" drive_powered: bool = False # EStop Status estop_active: bool = False estop_circuit: bool = False # Safe Stop Status safe_stop_active: bool = False safe_stop_circuit: bool = False # Runtime State active_blocks: list[str] = [] current_block_progress: float = 0 runtime_status: RuntimeState = 0 variables: list[Variable] = [] # Arm State enable: bool = False state: int = 0 joint_states: list[JointState] = [] def __init__(self, host: str, logger: logging.Logger | str | None = None): """ initalize `InovoRos` ## Args - `host : str` : host of psu, preferably in form of `192.168.x.x` - `logger: logging.Logger | str | None` : - if `logger` is instance of `logging.Logger`, it will log with it; - if `logger` is `str`, new logger will be created with it as name - otherwise, no log """ super().__init__(logger) self.host: str = host self.info(f"ros bridge initalized with host : {self.host}") if not "192.168" in self.host: self.warning("host is not in form of 192.168.x.x") self.warning("this might cause networking latency") self.ros = roslibpy.Ros(host, 9090) self.ros.run() build_list = [ (self.__tcp_speed, TOPIC_TCP_SPEED, TYPE_TCP_SPEED), (self.__tcp_pose, TOPIC_TCP_POSE, TYPE_TCP_POSE), (self.__joint_state, TOPIC_JOINT_STATE, TYPE_JOINT_STATE), (self.__power_status, TOPIC_POWER_STATUS, TYPE_POWER_STATUS), (self.__robot_status, TOPIC_ROBOT_STATUS, TYPE_ROBOT_STATUS), (self.__estop_status, TOPIC_ESTOP_STATUS, TYPE_ESTOP_STATUS), (self.__safe_stop_status, TOPIC_SAFE_STOP_STATUS, TYPE_SAFE_STOP_STATUS), (self.__runtime_state, TOPIC_RUNTIME_STATE, TYPE_RUNTIME_STATE), (self.__arm_state, TOPIC_ARM_STATE, TYPE_ARM_STATE), ] self.subscribers: dict[str, roslibpy.Topic] = {} for cb, name, message_type in build_list: self.info(f"Subscribing to topic: {name}, type : {message_type}") sub = roslibpy.Topic(self.ros, name, message_type) sub.subscribe(cb) self.subscribers[name] = sub # Topic Call Back def __tcp_speed(self, message): self.tcp_speed_lin = message["speed"]["linear"] self.tcp_speed_ang = message["speed"]["angular"] def __tcp_pose(self, message): vec = message["pose"]["position"] self.tcp_pose_vec = (vec["x"], vec["y"], vec["z"]) quat = message["pose"]["orientation"] self.tcp_pose_quat = ( quat["x"], quat["y"], quat["z"], quat["w"], ) def __joint_state(self, message): self.joint_pos = message["position"] self.joint_vel = message["velocity"] self.joint_eff = message["effort"] def __power_status(self, message): self.power_status = message["state"] self.voltage = message["voltage"] self.current = message["current"] def __robot_status(self, message): self.driver_state = message["driver_state"] self.drives_powered = message["drives_powered"] def __estop_status(self, message): self.estop_active = message["active"] self.estop_circuit = message["circuit_complete"] def __safe_stop_status(self, message): self.safe_stop_active = message["active"] self.safe_stop_circuit = message["circuit_complete"] def __runtime_state(self, message): self.active_blocks = message["active_blocks"] self.current_block_progress = message["current_block_progress"] self.runtime_status = RuntimeState(message["state"]) self.variables = [] for v in message["variables"]: self.variables.append(Variable.from_message(v)) def __arm_state(self, message): self.enabled = message["enabled"] self.state = message["state"] self.joint_states = [] for j in message["joint_states"]: self.joint_states.append(JointState.from_message(j)) # Service # PSU/Robot def safe_stop_reset(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SAFE_STOP_RESET, TYPE_TRIGGER).call( req ) return InovoServiceResponse.from_message(res) def estop_reset(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ESTOP_RESET, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def power_on(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_POWER_ON, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def power_off(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_POWER_OFF, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def robot_enable(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ROBOT_ENABLE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def robot_disable(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ROBOT_DISABLE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) # Runtime def runtime_start(self, procedure_name: str | None = None) -> InovoServiceResponse: """ start a sequence in runtime ## Args: - `procedure_name : str | None`: - if `None`, will start from start block - if `str`, will try to start function with respected name """ payload = {} if procedure_name: payload["procedure_name"] = procedure_name req = roslibpy.ServiceRequest(payload) res = roslibpy.Service( self.ros, SERVICE_SEQUENCE_START, TYPE_RUN_SEQUENCE ).call(req) return InovoServiceResponse.from_message(res) def runtime_stop(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_STOP, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def runtime_pause(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_PAUSE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def runtime_step(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_STEP, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def runtime_debug(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_DEBUG, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res) def runtime_continue(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_CONTINUE, TYPE_TRIGGER).call( req ) return InovoServiceResponse.from_message(res) def runtime_get_var(self, name: str) -> InovoServiceResponse: req = roslibpy.ServiceRequest({"name": name}) res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_GET_VAR, TYPE_GET_VAR).call( req ) return InovoServiceResponse.from_message(res) def runtime_set_var(self, name: str, value: typing.Any) -> InovoServiceResponse: req = roslibpy.ServiceRequest({"name": name, "value": str(value)}) res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_SET_VAR, TYPE_SET_VAR).call( req ) return InovoServiceResponse.from_message(res) @contextmanager def jog(self): """ Enter a context of jog command mode. Advertise when enter context and automatically unadvertise when exit. ## Yields: - `JogPublisher`: api for jog command ## Example: ```python import time from inovopy.rosbridge import InovoRos ros = InovoRos("192.168.1.122", "Jog") with ros.jog() as j: for _ in range(1000): j.jog(z=-0.01) time.sleep(0.01) for _ in range(1000): j.jog(z=0.01) time.sleep(0.01) ``` """ top = roslibpy.Topic(self.ros, TOPIC_CARTESIAN_JOG, TYPE_CARTESIAN_JOG_DEMAND) top.advertise() publisher = JogPublisher(top) yield publisher publisher.publisher.unadvertise() del publisher # Destruction def unsubscribe_all(self): for n, sub in self.subscribers.items(): self.info(f"unsubscribing topic : {n}") sub.unsubscribe() def resubscribe_all(self): for n, sub in self.subscribers.items(): self.info(f"resubscribing topic : {n}") sub.subscribe() def __del__(self): self.ros.close()
RosBridge
A class managing rosbridge api communication
Usage
from inovopy.rosbridge import InovoRos ros = InovoRos("192.168.1.1") ros.estop_reset()
initalize
InovoRos
Args
host : str
: host of psu, preferably in form of192.168.x.x
logger: logging.Logger | str | None
:- if
logger
is instance oflogging.Logger
, it will log with it; - if
logger
isstr
, new logger will be created with it as name - otherwise, no log
- if
Ancestors
- Loggable
- abc.ABC
Class variables
var active_blocks : list[str]
-
The type of the None singleton.
var current : float
-
The type of the None singleton.
var current_block_progress : float
-
The type of the None singleton.
var drive_powered : bool
-
The type of the None singleton.
var driver_state : str
-
The type of the None singleton.
var enable : bool
-
The type of the None singleton.
var estop_active : bool
-
The type of the None singleton.
var estop_circuit : bool
-
The type of the None singleton.
var joint_eff : list[float]
-
The type of the None singleton.
var joint_pos : list[float]
-
The type of the None singleton.
var joint_states : list[JointState]
-
The type of the None singleton.
var joint_vel : list[float]
-
The type of the None singleton.
var power_status : str
-
The type of the None singleton.
var ros : roslibpy.ros.Ros
-
connection manager to ROS server (
roslibpy.Ros
) var runtime_status : RuntimeState
-
The type of the None singleton.
var safe_stop_active : bool
-
The type of the None singleton.
var safe_stop_circuit : bool
-
The type of the None singleton.
var state : int
-
The type of the None singleton.
var tcp_pose_quat : tuple[float, float, float, float]
-
The type of the None singleton.
var tcp_pose_vec : tuple[float, float, float]
-
The type of the None singleton.
var tcp_speed_ang : float
-
The type of the None singleton.
var tcp_speed_lin : float
-
The type of the None singleton.
var variables : list[Variable]
-
The type of the None singleton.
var voltage : float
-
The type of the None singleton.
Methods
def estop_reset(self) ‑> InovoServiceResponse
-
Expand source code
def estop_reset(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ESTOP_RESET, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def jog(self)
-
Expand source code
@contextmanager def jog(self): """ Enter a context of jog command mode. Advertise when enter context and automatically unadvertise when exit. ## Yields: - `JogPublisher`: api for jog command ## Example: ```python import time from inovopy.rosbridge import InovoRos ros = InovoRos("192.168.1.122", "Jog") with ros.jog() as j: for _ in range(1000): j.jog(z=-0.01) time.sleep(0.01) for _ in range(1000): j.jog(z=0.01) time.sleep(0.01) ``` """ top = roslibpy.Topic(self.ros, TOPIC_CARTESIAN_JOG, TYPE_CARTESIAN_JOG_DEMAND) top.advertise() publisher = JogPublisher(top) yield publisher publisher.publisher.unadvertise() del publisher
Enter a context of jog command mode. Advertise when enter context and automatically unadvertise when exit.
Yields:
JogPublisher
: api for jog command
Example:
import time from inovopy.rosbridge import InovoRos ros = InovoRos("192.168.1.122", "Jog") with ros.jog() as j: for _ in range(1000): j.jog(z=-0.01) time.sleep(0.01) for _ in range(1000): j.jog(z=0.01) time.sleep(0.01)
def power_off(self) ‑> InovoServiceResponse
-
Expand source code
def power_off(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_POWER_OFF, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def power_on(self) ‑> InovoServiceResponse
-
Expand source code
def power_on(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_POWER_ON, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def resubscribe_all(self)
-
Expand source code
def resubscribe_all(self): for n, sub in self.subscribers.items(): self.info(f"resubscribing topic : {n}") sub.subscribe()
def robot_disable(self) ‑> InovoServiceResponse
-
Expand source code
def robot_disable(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ROBOT_DISABLE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def robot_enable(self) ‑> InovoServiceResponse
-
Expand source code
def robot_enable(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_ROBOT_ENABLE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def runtime_continue(self) ‑> InovoServiceResponse
-
Expand source code
def runtime_continue(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_CONTINUE, TYPE_TRIGGER).call( req ) return InovoServiceResponse.from_message(res)
def runtime_debug(self) ‑> InovoServiceResponse
-
Expand source code
def runtime_debug(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_DEBUG, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def runtime_get_var(self, name: str) ‑> InovoServiceResponse
-
Expand source code
def runtime_get_var(self, name: str) -> InovoServiceResponse: req = roslibpy.ServiceRequest({"name": name}) res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_GET_VAR, TYPE_GET_VAR).call( req ) return InovoServiceResponse.from_message(res)
def runtime_pause(self) ‑> InovoServiceResponse
-
Expand source code
def runtime_pause(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_PAUSE, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def runtime_set_var(self, name: str, value: Any) ‑> InovoServiceResponse
-
Expand source code
def runtime_set_var(self, name: str, value: typing.Any) -> InovoServiceResponse: req = roslibpy.ServiceRequest({"name": name, "value": str(value)}) res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_SET_VAR, TYPE_SET_VAR).call( req ) return InovoServiceResponse.from_message(res)
def runtime_start(self, procedure_name: str | None = None) ‑> InovoServiceResponse
-
Expand source code
def runtime_start(self, procedure_name: str | None = None) -> InovoServiceResponse: """ start a sequence in runtime ## Args: - `procedure_name : str | None`: - if `None`, will start from start block - if `str`, will try to start function with respected name """ payload = {} if procedure_name: payload["procedure_name"] = procedure_name req = roslibpy.ServiceRequest(payload) res = roslibpy.Service( self.ros, SERVICE_SEQUENCE_START, TYPE_RUN_SEQUENCE ).call(req) return InovoServiceResponse.from_message(res)
start a sequence in runtime
Args:
procedure_name : str | None
:- if
None
, will start from start block - if
str
, will try to start function with respected name
- if
def runtime_step(self) ‑> InovoServiceResponse
-
Expand source code
def runtime_step(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_STEP, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def runtime_stop(self) ‑> InovoServiceResponse
-
Expand source code
def runtime_stop(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SEQUENCE_STOP, TYPE_TRIGGER).call(req) return InovoServiceResponse.from_message(res)
def safe_stop_reset(self) ‑> InovoServiceResponse
-
Expand source code
def safe_stop_reset(self) -> InovoServiceResponse: req = roslibpy.ServiceRequest() res = roslibpy.Service(self.ros, SERVICE_SAFE_STOP_RESET, TYPE_TRIGGER).call( req ) return InovoServiceResponse.from_message(res)
def unsubscribe_all(self)
-
Expand source code
def unsubscribe_all(self): for n, sub in self.subscribers.items(): self.info(f"unsubscribing topic : {n}") sub.unsubscribe()
Inherited members
class InovoServiceResponse (message: dict)
-
Expand source code
class InovoServiceResponse: """ Response when calling service Field: - `success : bool` : whether the service call success - `message : str` : message from service call, usually error message - `value : str | None` : extra information from response, used in `runtime_get_var` """ success: bool message: str value: str | None def __init__(self, message: dict): self.success = message["success"] self.message = message["message"] self.value = message.get("value") def __str__(self): return str( {"success": self.success, "message": self.message, "value": self.value} ) @classmethod def from_message(cls, message) -> "InovoServiceResponse": return InovoServiceResponse(message)
Response when calling service
Field: -
success : bool
: whether the service call success -message : str
: message from service call, usually error message -value : str | None
: extra information from response, used inruntime_get_var
Class variables
var message : str
-
The type of the None singleton.
var success : bool
-
The type of the None singleton.
var value : str | None
-
The type of the None singleton.
Static methods
def from_message(message) ‑> InovoServiceResponse
class JogPublisher (publisher: roslibpy.core.Topic)
-
Expand source code
class JogPublisher: """ context manager for jog automatically unadvertise when `del` """ publisher: roslibpy.Topic def __init__(self, publisher: roslibpy.Topic): self.publisher = publisher self.publisher.advertise() def jog( self, x: float = 0, y: float = 0, z: float = 0, rx: float = 0, ry: float = 0, rz: float = 0, ): """ Send a cartesian jog twist demand Args: - `x : float`: target linear velocity x. Defaults to 0. - `y : float`: target linear velocity y. Defaults to 0. - `z : float`: target linear velocity z. Defaults to 0. - `rx : float`: target angular velocity rx. Defaults to 0. - `ry : float`: target angular velocity ry. Defaults to 0. - `rz : float`: target angular velocity ry. Defaults to 0. """ self.publisher.publish( roslibpy.Message( { "twist": { "linear": { "x": x, "y": y, "z": z, }, "angular": { "x": rx, "y": ry, "z": rz, }, } } ) ) def __del__(self): self.publisher.unadvertise()
context manager for jog
automatically unadvertise when
del
Class variables
var publisher : roslibpy.core.Topic
-
The type of the None singleton.
Methods
def jog(self,
x: float = 0,
y: float = 0,
z: float = 0,
rx: float = 0,
ry: float = 0,
rz: float = 0)-
Expand source code
def jog( self, x: float = 0, y: float = 0, z: float = 0, rx: float = 0, ry: float = 0, rz: float = 0, ): """ Send a cartesian jog twist demand Args: - `x : float`: target linear velocity x. Defaults to 0. - `y : float`: target linear velocity y. Defaults to 0. - `z : float`: target linear velocity z. Defaults to 0. - `rx : float`: target angular velocity rx. Defaults to 0. - `ry : float`: target angular velocity ry. Defaults to 0. - `rz : float`: target angular velocity ry. Defaults to 0. """ self.publisher.publish( roslibpy.Message( { "twist": { "linear": { "x": x, "y": y, "z": z, }, "angular": { "x": rx, "y": ry, "z": rz, }, } } ) )
Send a cartesian jog twist demand
Args: -
x : float
: target linear velocity x. Defaults to 0. -y : float
: target linear velocity y. Defaults to 0. -z : float
: target linear velocity z. Defaults to 0. -rx : float
: target angular velocity rx. Defaults to 0. -ry : float
: target angular velocity ry. Defaults to 0. -rz : float
: target angular velocity ry. Defaults to 0.
class JointState
-
Expand source code
class JointState: age: int = 0 current: float = 0 drive_temp: float = 0 ff_torque: float = 0 joint_temp: float = 0 motor_temp: float = 0 output_gain: float = 0 position: float = 0 state: int = 0 status: int = 0 target_position: float = 0 torque: float = 0 velocity: float = 0 @classmethod def attr_list(cls) -> list[str]: return [ "age", "current", "drive_temp", "ff_torque", "joint_temp", "motor_temp", "output_gain", "position", "state", "status", "target_position", "torque", "velocity", ] def __init__(self): pass @classmethod def from_message(cls, message) -> "JointState": js = JointState() for a in JointState.attr_list(): setattr(js, a, message[a]) return js def __str__(self): return str(dict((a, getattr(self, a)) for a in JointState.attr_list()))
Class variables
var age : int
-
The type of the None singleton.
var current : float
-
The type of the None singleton.
var drive_temp : float
-
The type of the None singleton.
var ff_torque : float
-
The type of the None singleton.
var joint_temp : float
-
The type of the None singleton.
var motor_temp : float
-
The type of the None singleton.
var output_gain : float
-
The type of the None singleton.
var position : float
-
The type of the None singleton.
var state : int
-
The type of the None singleton.
var status : int
-
The type of the None singleton.
var target_position : float
-
The type of the None singleton.
var torque : float
-
The type of the None singleton.
var velocity : float
-
The type of the None singleton.
Static methods
def attr_list() ‑> list[str]
def from_message(message) ‑> JointState
class RuntimeState (*args, **kwds)
-
Expand source code
class RuntimeState(Enum): Idle = 0 Running = 1 Paused = 2 PausedOnError = 3
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Ancestors
- enum.Enum
Class variables
var Idle
-
The type of the None singleton.
var Paused
-
The type of the None singleton.
var PausedOnError
-
The type of the None singleton.
var Running
-
The type of the None singleton.
class Variable (name: str, dtype: str, value: str)
-
Expand source code
class Variable: name: str dtype: str value: str def __init__(self, name: str, dtype: str, value: str): self.name = name self.dtype = dtype self.value = value @classmethod def from_message(cls, message) -> "Variable": return Variable(message["name"], message["type"], message["value"])
Class variables
var dtype : str
-
The type of the None singleton.
var name : str
-
The type of the None singleton.
var value : str
-
The type of the None singleton.
Static methods
def from_message(message) ‑> Variable