Source code for niryo_robot_sound.api.sound_ros_wrapper

import rospy

# - Messages
from niryo_robot_sound.msg import SoundList

# - Services
from niryo_robot_sound.srv import PlaySound, PlaySoundRequest
from niryo_robot_msgs.srv import SetInt, Trigger
from niryo_robot_sound.srv import ManageSound, ManageSoundRequest
from niryo_robot_sound.srv import TextToSpeech

# Command Status
from niryo_robot_msgs.msg import CommandStatus


class SoundRosWrapperException(Exception):
    pass


def check_ned2_version(func):
    """
    Decorator that check the robot version
    """

    def wrap(*args, **kwargs):
        robot_instance = args[0]
        if robot_instance.hardware_version != 'ned2':
            raise SoundRosWrapperException(
                "Error Code : {}\nMessage : Wrong robot hardware version, feature only available on Ned2".format(
                    CommandStatus.BAD_HARDWARE_VERSION))

        return func(*args, **kwargs)

    return wrap


[docs]class SoundRosWrapper(object): def __init__(self, hardware_version='ned2', service_timeout=1): self.__service_timeout = service_timeout self.__hardware_version = hardware_version self.__sounds = [] self.__sound_duration = {} if hardware_version == 'ned2': rospy.Subscriber('/niryo_robot_sound/sound_database', SoundList, self.__sound_database_callback) @property def hardware_version(self): return self.__hardware_version def __check_ned_2_version(self): if self.__hardware_version != 'ned2': raise SoundRosWrapperException( "Error Code : {}\nMessage : Wrong robot hardware version, feature only available on Ned2".format( CommandStatus.BAD_HARDWARE_VERSION)) @property def sounds(self): """ Get sound name list :return: list of the sounds of the robot :rtype: list[string] """ return self.__sounds def __sound_database_callback(self, msg): self.__sound_duration = {sound.name: sound.duration for sound in msg.sounds} self.__sounds = list(self.__sound_duration.keys()) # @check_ned2_version
[docs] def play(self, sound_name, wait_end=True, start_time_sec=0, end_time_sec=0): """ Play a sound from the robot If failed, raise NiryoRosWrapperException :param sound_name: Name of the sound to play :type sound_name: str :param start_time_sec: start the sound from this value in seconds :type start_time_sec: float :param end_time_sec: end the sound at this value in seconds :type end_time_sec: float :param wait_end: wait for the end of the sound before exiting the function :type wait_end: bool :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() result = self.__call_service('/niryo_robot_sound/play', PlaySound, PlaySoundRequest(sound_name=sound_name, start_time_sec=start_time_sec, end_time_sec=end_time_sec, wait_end=wait_end)) rospy.sleep(0.1) return self.__classic_return_w_check(result)
# @check_ned2_version
[docs] def set_volume(self, sound_volume): """ Set the volume percentage of the robot. If failed, raise NiryoRosWrapperException :param sound_volume: volume percentage of the sound (0: no sound, 100: max sound) :type sound_volume: int :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() result = self.__call_service('/niryo_robot_sound/set_volume', SetInt, sound_volume) rospy.sleep(0.1) return self.__classic_return_w_check(result)
# @check_ned2_version
[docs] def stop(self): """ Stop a sound being played. If failed, raise NiryoRosWrapperException :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() result = self.__call_service('/niryo_robot_sound/stop', Trigger) rospy.sleep(0.1) return self.__classic_return_w_check(result)
# @check_ned2_version
[docs] def delete_sound(self, sound_name): """ Delete a sound on the RaspberryPi of the robot. If failed, raise NiryoRosWrapperException :param sound_name: name of the sound which needs to be deleted :type sound_name: str :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() req = ManageSoundRequest(sound_name=sound_name, action=ManageSoundRequest.DELETE) result = self.__call_service('/niryo_robot_sound/manage', ManageSound, req) return self.__classic_return_w_check(result)
# @check_ned2_version
[docs] def import_sound(self, sound_name, sound_data): """ Delete a sound on the RaspberryPi of the robot. If failed, raise NiryoRosWrapperException :param sound_name: name of the sound which needs to be deleted :type sound_name: str :param sound_data: String containing the encoded data of the sound file (wav or mp3) :type sound_data: str :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() import base64 sound_data_b64 = base64.b64encode(sound_data) req = ManageSoundRequest(sound_name=sound_name, action=ManageSoundRequest.DELETE, sound_data=sound_data_b64) result = self.__call_service('/niryo_robot_sound/send_sound', ManageSound, req) return self.__classic_return_w_check(result)
# @check_ned2_version
[docs] def get_sound_duration(self, sound_name): """ Returns the duration in seconds of a sound stored in the robot database raise SoundRosWrapperException if the sound doesn't exists :param sound_name: name of sound :type sound_name: str :return: sound duration in seconds :rtype: float """ self.__check_ned_2_version() if sound_name not in self.__sound_duration: raise SoundRosWrapperException("Sound name: {} not found".format(sound_name)) return self.__sound_duration[sound_name]
# @check_ned2_version
[docs] def say(self, text, language=0): """ Use gtts (Google Text To Speech) to interpret a string as sound Languages available are: - English: 0 - French: 1 - Spanish: 2 - Mandarin: 3 - Portuguese: 4 :param text: text to speek < 100 char :type text: string :param language: language of the text :type language: int :return: status, message :rtype: (int, str) """ self.__check_ned_2_version() result = self.__call_service('/niryo_robot_sound/text_to_speech', TextToSpeech, text, language) if not result.success: raise SoundRosWrapperException("Message : {}".format(result.message)) return result.success, result.message
# --- Functions interface def __call_service(self, service_name, service_msg_type, *args): """ Wait for the service called service_name Then call the service with args :param service_name: :param service_msg_type: :param args: Tuple of arguments :raises NiryoRosWrapperException: Timeout during waiting of services :return: Response """ # Connect to service try: rospy.wait_for_service(service_name, self.__service_timeout) except rospy.ROSException as e: raise SoundRosWrapperException(e) # Call service try: service = rospy.ServiceProxy(service_name, service_msg_type) response = service(*args) return response except rospy.ServiceException as e: raise SoundRosWrapperException(e) def __classic_return_w_check(self, result): self.__check_result_status(result) return result.status, result.message @staticmethod def __check_result_status(result): if result.status < 0: raise SoundRosWrapperException("Error Code : {}\nMessage : {}".format(result.status, result.message))