Source code for niryo_robot_python_ros_wrapper.custom_button_ros_wrapper
from threading import Lock, Event
import rospy
from ros_wrapper_enums import ButtonAction
from end_effector_interface.msg import EEButtonStatus
from niryo_robot_msgs.msg import CommandStatus
class CustomButtonRosWrapperException(Exception):
pass
def check_ned2_version(func):
"""
Decorator that check the robot version
"""
def wrap(*args, **kwargs):
robot_instance = args[0]
if robot_instance.hardware_version != 'ned2':
raise CustomButtonRosWrapperException(
"Error Code : {}\nMessage : Wrong robot hardware version, feature only available on Ned2".format(
CommandStatus.BAD_HARDWARE_VERSION))
return func(*args, **kwargs)
return wrap
[docs]class CustomButtonRosWrapper:
def __init__(self, hardware_version='ned2'):
self.__hardware_version = hardware_version
self.__action_lock = Lock()
self.__action_events = {
ButtonAction.HANDLE_HELD_ACTION: Event(),
ButtonAction.LONG_PUSH_ACTION: Event(),
ButtonAction.SINGLE_PUSH_ACTION: Event(),
ButtonAction.DOUBLE_PUSH_ACTION: Event(),
ButtonAction.NO_ACTION: Event(),
}
self.__custom_button_state = EEButtonStatus.NO_ACTION
self.__custom_button_topic = rospy.Subscriber(
'/niryo_robot_hardware_interface/end_effector_interface/custom_button_status',
EEButtonStatus, self.__callback_custom_pos_button_status)
def __check_ned_2_version(self):
if self.__hardware_version != 'ned2':
raise CustomButtonRosWrapperException(
"Error Code : {}\nMessage : Wrong robot hardware version, feature only available on Ned2".format(
CommandStatus.BAD_HARDWARE_VERSION))
@property
def hardware_version(self):
return self.__hardware_version
@property
def state(self):
"""
Get the button state from the ButtonAction class
:return: int value from the ButtonAction class
:rtype: int
"""
self.__check_ned_2_version()
return self.__custom_button_state
[docs] def is_pressed(self):
"""
Button press state
:rtype: bool
"""
self.__check_ned_2_version()
return self.__custom_button_state != EEButtonStatus.NO_ACTION
[docs] def wait_for_action(self, action, timeout=0):
"""
Waits until a specific action occurs and returns true. Returns false if the timeout is reached.
:param action: int value from the ButtonAction class
:type action: int
:type timeout: float
:return: True if the action has occurred, false otherwise
:rtype: bool
"""
self.__check_ned_2_version()
return self.__wait(action, timeout)
[docs] def wait_for_any_action(self, timeout=0):
"""
Returns the detected action. Returns ButtonAction.NO_ACTION if the timeout is reached without action.
:type timeout: float
:return: Returns the detected action, or ButtonAction.NO_ACTION if the timeout is reached without any action.
:rtype: int
"""
self.__check_ned_2_version()
return self.__wait_any(timeout)
[docs] def get_and_wait_press_duration(self, timeout=0):
"""
Waits for the button to be pressed and returns the press time.
Returns 0 if no press is detected after the timeout duration.
:type timeout: foat
:rtype: float
"""
self.__check_ned_2_version()
return self.__get_press_time(timeout)
def __clear(self):
with self.__action_lock:
for a_event in self.__action_events.values():
a_event.clear()
def __set(self, action):
self.__custom_button_state = action
if action in self.__action_events:
with self.__action_lock:
self.__action_events[action].set()
def __wait(self, action, timeout=0.0):
if action not in self.__action_events:
return False
self.__action_events[action].clear()
start_time = rospy.Time.now()
while not self.__action_events[action].is_set() and not rospy.is_shutdown():
self.__action_events[action].wait(0.1)
if self.__action_events[action].is_set():
return True
elif 0 < timeout < (rospy.Time.now() - start_time).to_sec():
return False
if self.__action_events[action].is_set():
return True
return False
def __wait_any(self, timeout=0.0):
start_time = rospy.Time.now()
self.__clear()
if not self.__wait(ButtonAction.HANDLE_HELD_ACTION, timeout=timeout):
return ButtonAction.NO_ACTION
while not rospy.is_shutdown():
if 0 < timeout < (rospy.Time.now() - start_time).to_sec():
break
for action_name in self.__action_events:
if action_name not in [ButtonAction.NO_ACTION, ButtonAction.HANDLE_HELD_ACTION] \
and self.__action_events[action_name].is_set():
return action_name
rospy.sleep(0.1)
return ButtonAction.HANDLE_HELD_ACTION
def __get_press_time(self, timeout=0.0):
if not self.__wait(ButtonAction.HANDLE_HELD_ACTION, timeout=timeout):
return 0
pressed_time = rospy.Time.now()
if not self.__wait(ButtonAction.NO_ACTION):
return 0
return (rospy.Time.now() - pressed_time).to_sec()
def __callback_custom_pos_button_status(self, custom_button_status):
self.__set(custom_button_status.action)