Source code for pyScienceMode.utils

import crccheck
from .enums import ErrorCode, Rehastim2Commands

"""
This code provides utility functions for working with the Rehastim device, including packet construction and data 
validation. It also includes functions to convert and check various parameters used in Rehastim communication.
"""


[docs] def signed_int(packet: bytes) -> int: """ Converts a signed int from the packet received from the Rehastim. Parameters ---------- packet: bytes Packet received from the Rehastim. Returns ------- signed_int: int Signed int converted. """ return int.from_bytes(packet, "big", signed=True)
[docs] def check_stimulation_interval(stimulation_interval: int = None): """ Checks if the stimulation interval is within limits. Parameters ---------- stimulation_interval: int Stimulation interval to check. If out of limits, raise a ValueError. """ if stimulation_interval: if stimulation_interval < 8 or stimulation_interval > 1025: raise ValueError("Error : Stimulation interval [8,1025]. Stimulation given : %s" % stimulation_interval)
[docs] def check_inter_pulse_interval(inter_pulse_interval: int = None): """ Checks if the "inter pulse interval" is within limits. Parameters ---------- inter_pulse_interval: int, optional Inter pulse interval to check. If out of limits, raise a ValueError. """ if inter_pulse_interval: if inter_pulse_interval < 2 or inter_pulse_interval > 129: raise ValueError("Error : Inter pulse interval [2,129], given : %s" % inter_pulse_interval)
[docs] def check_low_frequency_factor(low_frequency_factor: int = None): """ Checks if the low frequency factor is within limits. Parameters ---------- low_frequency_factor: int, optional Low frequency factor to check. If out of limits, raise a ValueError. """ if low_frequency_factor: if low_frequency_factor < 0 or low_frequency_factor > 7: raise ValueError("Error : Low frequency factor [0,7], given : %s" % low_frequency_factor)
[docs] def check_unique_channel(list_channels: list = None) -> bool: """ Checks if there is not 2 times the same channel in the list given. Parameters ---------- list_channels: list[Channel], optional Contains a list of channel. Returns ------- True if all channels are unique, False and print a warning if not. """ if list_channels: active_channel = [] for i in range(len(list_channels)): if list_channels[i].get_no_channel() in active_channel: print( "Warning : 2 channel no%s" % list_channels[i].get_no_channel() + " in list_channels given. The first one given will be used." ) list_channels.pop(i) return False else: active_channel.append(list_channels[i].get_no_channel()) return True
[docs] def check_list_channel_order(list_channels): """ Checks if the channels in the list_channels given are ordered. Parameters ---------- list_channels: list[Channel] Contains the channels. Raises a RuntimeError if not ordered. """ number_previous_channel = 0 for i in range(len(list_channels)): if list_channels[i].get_no_channel() < number_previous_channel: raise RuntimeError("Error: channels in list_channels given are not in order.") number_previous_channel = list_channels[i].get_no_channel()
[docs] def calc_electrode_number(list_channels: list, enable_low_frequency: bool = False) -> int: """ When enable_low_frequency = False : Calculates the number corresponding to which electrode is activated. During the initialisation, the computer needs to tell the Rehastim which channel needs to be activated. It is done through the addition of 2 pow the number of the channel. For example if the channel 1 and 4 needs to be activated, electrode_number = 2**1 + 2**4 When enable_low_frequency = True : Calculates the number corresponding to which electrode has the low_frequency_factor enabled. Parameters ---------- list_channels: list[Channel] Contains the channels that will be activated. enable_low_frequency: bool Choose whether the number correspond to active channel (False) or correspond to channel with low frequency factor enabled (True). Returns ------- electrode_number: int Electrode number calculated. """ electrode_number = 0 for i in range(len(list_channels)): if enable_low_frequency: if list_channels[i].get_enable_low_frequency(): electrode_number += 2 ** (list_channels[i].get_no_channel() - 1) if not enable_low_frequency: electrode_number += 2 ** (list_channels[i].get_no_channel() - 1) return electrode_number
[docs] def packet_construction(packet_count: int, packet_type: str, packet_data: list = None) -> bytes: """ Constructs the packet which will be sent to the Rehastim. Parameters ---------- packet_count: int Correspond to the number of packet sent to the Rehastim. packet_type: str Correspond to the command that will be sent. packet_data: list Contain the data of the packet. Returns ------- packet_construct: bytes Packet constructed which will be sent. """ start_byte = 0xF0 stop_byte = 0x0F stuffing_byte = 0x81 packet = [start_byte] packet_command = Rehastim2Commands[packet_type].value packet_payload = [packet_count, packet_command] packet_payload = _stuff_packet_byte(packet_payload) if packet_data is not None: packet_data = _stuff_packet_byte(packet_data, command_data=True) packet_payload += packet_data checksum = crccheck.crc.Crc8.calc(packet_payload) checksum = _stuff_byte(checksum) data_length = _stuff_byte(len(packet_payload)) packet = packet + [stuffing_byte] + [checksum] + [stuffing_byte] + [data_length] + packet_payload + [stop_byte] return b"".join([byte.to_bytes(1, "little") for byte in packet])
def _stuff_packet_byte(packet: list, command_data: bool = False) -> list: """ Stuffs each byte if necessary of the packet. Parameters ---------- packet: list Packet containing the bytes that will be checked and stuffed if necessary. command_data: bool True if the packet is a command data, False if not. Returns ------- packet: list Packet returned with stuffed byte. """ stuffed_bytes = [240, 15, 129, 85, 10] stuffing_byte = 0x81 if command_data: packet_tmp = [] for i in range(len(packet)): if packet[i] in stuffed_bytes: packet_tmp = packet_tmp + [stuffing_byte, _stuff_byte(packet[i])] else: packet_tmp.append(packet[i]) return packet_tmp else: for i in range(len(packet)): if packet[i] in stuffed_bytes: packet[i] = _stuff_byte(packet[i]) return packet def _stuff_byte(byte: int) -> int: """ Stuffs the byte given. Parameters ---------- byte: int Byte which needs to be stuffed. Returns ------- byte_stuffed: int The byte stuffed. """ stuffing_key = 0x55 return (byte & ~stuffing_key) | (~byte & stuffing_key)
[docs] def generic_error_check(ack_object): error_code = ErrorCode(ack_object.result) if error_code.message: raise ValueError(error_code.message)