Package idpmodem

Library for interfacing with an IsatData Pro modem for satellite IoT.

Expand source code
"""Library for interfacing with an IsatData Pro modem for satellite IoT."""

# Workaround for legacy async client
from idpmodem.asyncio import atcommand_async
from idpmodem.asyncio.atcommand_async import IdpModemAsyncioClient

__all__ = ['atcommand_async', 'IdpModemAsyncioClient']

Sub-modules

idpmodem.asyncio

EXPERIMENTAL asyncio modem client.

idpmodem.aterror

Base classes for AT command errors.

idpmodem.codecs

Encoder/decoder utilities for efficient satellite IoT messages.

idpmodem.constants

IsatData Pro modem constants …

idpmodem.crcxmodem

Calculates CRC-16-CCITT checksum for xmodem …

idpmodem.helpers

Helpers for working with serial ports and encoding conversions.

idpmodem.location

Utilities for validating and parsing NMEA data into a Location object.

idpmodem.s_registers

IDP modem S-register definitions …

idpmodem.threaded

A threaded modem client with abstracted properties.

Classes

class IdpModemAsyncioClient (port: str = '/dev/ttyUSB0', baudrate: int = 9600, loop: asyncio.events.AbstractEventLoop = None, log_verbose: bool = False)

A satellite IoT messaging modem on Inmarsat's IsatData Pro service.

WARNING: Deprecated

Attributes

port
The serial port name e.g. /dev/ttyUSB0
baudrate
The baudrate of the serial port e.g. 9600
crc
A boolean used if CRC-16 is enabled for long serial cables
loop
The asyncio event loop (uses default if not provided)

Initializes the class.

Args

port
The serial port name e.g. /dev/ttyUSB0
baudrate
The serial port baudrate
crc
enables CRC-16 for long serial cables
loop
(optional) external asyncio event loop to use
logger
(optional) external logger to use
log_level
Level for the logger to record
Expand source code
class IdpModemAsyncioClient:
    """A satellite IoT messaging modem on Inmarsat's IsatData Pro service.

    **WARNING**: Deprecated

    Attributes:
        port: The serial port name e.g. `/dev/ttyUSB0`
        baudrate: The baudrate of the serial port e.g. `9600`
        crc: A boolean used if CRC-16 is enabled for long serial cables
        loop: The asyncio event loop (uses default if not provided)

    """

    def __init__(self,
                 port: str = '/dev/ttyUSB0',
                 baudrate: int = 9600,
                 loop: AbstractEventLoop = None,
                 log_verbose: bool = False,
                 ):
        """Initializes the class.
        
        Args:
            port: The serial port name e.g. `/dev/ttyUSB0`
            baudrate: The serial port baudrate
            crc: enables CRC-16 for long serial cables
            loop: (optional) external asyncio event loop to use
            logger: (optional) external logger to use
            log_level: Level for the logger to record

        """
        self._verbose = log_verbose
        self.port = port
        self.baudrate = baudrate
        self.crc = None
        self.loop = loop
        self._thread = current_thread()
        self._event = Event()
        self._serial = None
        self._pending_command = None
        self._pending_command_time = None
        self._retry_count = 0
        self._serial_async_error_count = 0

    @property
    def port(self):
        return self._port
    
    @port.setter
    def port(self, value):
        valid = len(glob(value)) == 1
        if not valid:
            err_msg = 'Serial port {} not found'.format(value)
            _log.error(err_msg)
            raise ValueError(err_msg)
        self._port = value

    @property
    def baudrate(self):
        return self._baudrate
    
    @baudrate.setter
    def baudrate(self, value):
        if value not in BAUDRATES:
            raise ValueError('Unsupported baudrate {}'.format(value))
        self._baudrate = value

    def _handle_at_error(self,
                         at_command: str,
                         err_code: Union[str, int],
                         return_value: any = None) -> any:
        """Manages log and/or raising errors.
        
        Args:
            at_command: The command that experienced an error
            err_code: The error code received
            return_value: The value to return after logging
        
        Raises:
            Re-raises the exceptions

        """
        error_str = AT_ERROR_CODES[int(err_code)]
        _log.error("{} Exception: {}".format(at_command, error_str))
        if return_value is None:
            raise AtException(error_str)
        return return_value

    async def _send(self, data: str) -> str:
        """Coroutine encodes and sends an AT command.
        
        Args:
            writer: A serial_asyncio writer
            data: An AT command string
        
        Returns:
            A string with the original data.
        """
        if self.crc:
            data = get_crc(data)
        self._pending_command = data
        to_send = self._pending_command + '\r'
        if self._verbose:
            _log.debug('Sending {}'.format(_printable(to_send)))
        self._pending_command_time = time()
        await self._serial.write_async(to_send.encode())
        return data

    async def _recv(self, timeout: int = 5) -> list:
        """Coroutine receives and decodes data from the serial port.

        Parsing stops when 'OK' or 'ERROR' is found.
        
        Args:
            reader: A serial_asyncio reader

        Returns:
            A list of response strings with empty lines removed.
        
        Raises:
            AtTimeout if the response timed out.

        """
        CRC_DELAY = 1   #: seconds after response body
        response = []
        verbose_response = ''
        msg = ''
        try:
            while True:
                chars = (await wait_for(
                    self._serial.read_until_async(b'\r\n'),
                    timeout=timeout)).decode()
                msg += chars
                verbose_response += chars
                if msg.endswith('\r\n'):
                    if self._verbose:
                        _log.debug('Processing {}'.format(_printable(msg)))
                    msg = msg.strip()
                    if msg != self._pending_command:
                        if msg != '':
                            # empty lines are not included in response list
                            # but are preserved in verbose_response for CRC
                            response.append(msg)
                    else:
                        # remove echo for possible CRC calculation
                        echo = self._pending_command + '\r'
                        if self._verbose:
                            _log.debug(f'Removing echo {_printable(echo)}')
                        verbose_response = verbose_response.replace(echo, '')
                    if msg in ['OK', 'ERROR']:
                        try:
                            response_crc = (await wait_for(
                                self._serial.read_until_async(b'\r\n'),
                                timeout=CRC_DELAY)).decode()
                            if response_crc:
                                response_crc = response_crc.strip()
                                if _serial_asyncio_lost_bytes(verbose_response):
                                    self._serial_async_error_count += 1
                                if not validate_crc(response=verbose_response,
                                                    candidate=response_crc):
                                    err_msg = '{} CRC error for {}'.format(
                                        response_crc,
                                        _printable(verbose_response))
                                    _log.error(err_msg)
                                    raise AtCrcError(err_msg)
                                elif self._verbose:
                                    _log.debug('CRC {} ok for {}'.format(
                                        response_crc,
                                        _printable(verbose_response)))
                                if not self.crc:
                                    # raise AtCrcConfigError('CRC found but unexpected') #: new <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
                                    self.crc = True
                        except TimeoutError:
                            if self.crc:
                                raise AtCrcConfigError('CRC expected but not found')   #: new <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
                            self.crc = False
                        break
                    msg = ''
        except TimeoutError:
            timeout_time = time() - self._pending_command_time
            err = ('AT timeout {} after {} seconds ({}s after command)'.format(
                self._pending_command, timeout, timeout_time))
            raise AtTimeout(err)
        return response

    async def command(self,
                      at_command: str,
                      timeout: int = 5,
                      retries: int = 0) -> list:
        """Submits an AT command and returns the response asynchronously.
        
        Proxies a private function to allow for multi-threaded operation.

        Args:
            at_command: The AT command string
            timeout: The maximum time in seconds to await a response.
            retries: Optional number of additional attempts on failure.
        
        Returns:
            A list of response strings finishing with 'OK', or 
                ['ERROR', '<error_code>']
        
        Raises:
            AtException if no response was received.
            AtException if bad CRC response count exceeds retries

        """
        if current_thread() != self._thread:
            _log.warning('Call from external thread may crash or hang')
            loop = get_running_loop()
            set_event_loop(loop)
            await sleep(1)   #: add a slight delay to mitigate race condition
            while self._event.is_set():
                pass
            concurrentfuture = run_coroutine_threadsafe(
                self._command(at_command, timeout, retries), loop)
            asyncfuture = wrap_future(concurrentfuture)
            return await asyncfuture
        else:
            return await self._command(at_command, timeout, retries)

    async def _command(self,
                       at_command: str,
                       timeout: int,
                       retries: int) -> list:
        """Submits an AT command and returns the response asynchronously.
        
        Args:
            at_command: The AT command string
            timeout: The maximum time in seconds to await a response.
            retries: Optional number of additional attempts on failure.
        
        Returns:
            A list of response strings finishing with 'OK', or 
                ['ERROR', '<error_code>']
        
        Raises:
            AtException if no response was received.
            AtException if bad CRC response count exceeds retries

        """
        try:
            self._event.set()
            try:
                if self._verbose:
                    _log.debug('Opening serial port {}'.format(self.port))
                self._serial = AioSerial(port=self.port,
                                            baudrate=self.baudrate,
                                            loop=self.loop)
            except Exception as e:
                _log.error('Error connecting to aioserial: {}'.format(e))
            try:
                if self._verbose:
                    _log.debug('Checking unsolicited data'
                               f' prior to {at_command}')
                self._pending_command_time = time()
                unsolicited = await self._recv(timeout=0.25)
                if unsolicited:
                    _log.warning('Unsolicited data: {}'.format(unsolicited))
                    # raise AtUnsolicited('Unsolicited data: {}'.format(unsolicited))
            except AtTimeout:
                if self._verbose:
                    _log.debug('No unsolicited data found')
            tasks = [self._send(at_command),
                self._recv(timeout=timeout)]
            echo, response = await gather(*tasks)
            if echo in response:
                response.remove(echo)
            if len(response) > 0:
                self._retry_count = 0
                if response[0] == 'ERROR':
                    _log.debug('AT error detected - getting reason')
                    error_code = await self.command('ATS80?')
                    if error_code is not None:
                        response.append(error_code[0])
                    else:
                        _log.error('Failed to get error_code from S80')
                return response
            raise AtException('No response received for {}'.format(at_command))
        except AtCrcError:
            self._retry_count += 1
            if self._retry_count < retries:
                _log.error('CRC error retrying')
                return await self.command(
                    at_command, timeout=timeout, retries=retries)
            else:
                error_message = 'Too many failed CRC ({})'.format(
                    self._retry_count)
                self._retry_count = 0
                raise AtException(error_message)
        finally:
            if self._serial:
                if self._verbose:
                    _log.debug('Closing serial port {}'.format(self.port))
                self._serial.close()
                self._serial = None
            self._event.clear()
    
    async def initialize(self, crc: bool = False) -> bool:
        """Initializes the modem using ATZ and sets up CRC.

        Args:
            crc: desired initial CRC enabled if True

        Returns:
            True if successful
        
        Raises:
            AtException on errors other than CRC enabled

        """
        _log.debug('Initializing modem{}'.format(
            ' (CRC enabled)' if crc else ''))
        cmd = 'ATZ;E1;V1'
        cmd += ';%CRC=1' if crc else ''
        success = await self.command(cmd)
        if success[0] == 'ERROR':
            if int(success[1]) == 100:
                if crc and self.crc:
                    _log.debug('CRC already enabled')
                    return True
                else:
                    self.crc = True
                    await self.initialize(crc)
            else:
                return self._handle_at_error(cmd, success[1], return_value=False)
        self.crc = crc
        return True
    
    async def config_restore_nvm(self) -> bool:
        """Sends the ATZ command to restore from non-volatile memory.
        
        Returns:
            Boolean success.
        """
        _log.debug('Restoring non-volatile configuration')
        cmd = 'ATZ'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], return_value=False)
        return True

    async def config_restore_factory(self) -> bool:
        """Sends the AT&F command and returns True on success."""
        _log.debug('Restoring factory defaults')
        cmd = 'AT&F'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], return_value=False)
        return True
    
    async def config_report(self) -> Tuple[dict, dict]:
        """Sends the AT&V command to retrive S-register settings.
        
        Returns:
            A tuple with two dictionaries or both None if failed
            at_config with booleans crc, echo, quiet and verbose
            reg_config with S-register tags and integer values

        """
        _log.debug('Querying configuration')
        cmd = 'AT&V'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], (None, None))
        at_config = response[1]
        s_regs = response[2]
        echo, quiet, verbose, crc = at_config.split(' ')
        at_config = {
            "crc": bool(int(crc[4])),
            "echo": bool(int(echo[1])),
            "quiet": bool(int(quiet[1])),
            "verbose": bool(int(verbose[1])),
        }
        reg_config = {}
        for reg in s_regs.split(' '):
            name, value = reg.split(':')
            reg_config[name] = int(value)
        return (at_config, reg_config)

    async def config_save(self) -> bool:
        """Sends the AT&W command and returns True if successful."""
        _log.debug('Saving S-registers to non-volatile memory')
        cmd = 'AT&W'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def config_crc_enable(self, crc: bool) -> bool:
        """Enables or disables CRC error checking (for long serial cable).
        
        Args:
            crc: enable CRC if true
        """
        _log.debug('{} CRC'.format('Enabling' if crc else 'Disabling'))
        cmd = 'AT%CRC={}'.format(1 if crc else 0)
        response = await self.command(cmd)
        if response[0] == 'ERROR' and self.crc != crc:
            return self._handle_at_error(cmd, response[1], False)
        self.crc = crc
        return True
    
    async def device_mobile_id(self) -> str:
        """Returns the unique Mobile ID (Inmarsat serial number).
        
        Returns:
            MobileID string.
        
        Raises:
            AtException

        """
        _log.debug('Querying device Mobile ID')
        cmd = 'AT+GSN'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            self._handle_at_error(cmd, response[1])
        return response[0].replace('+GSN:', '').strip()

    async def device_version(self) -> Tuple[str, str, str]:
        """Returns the hardware, firmware and AT versions.
        
        Returns:
            Dict with hardware, firmware, at version.
        
        Raises:
            AtException

        """
        _log.debug('Querying device version info')
        cmd = 'AT+GMR'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            self._handle_at_error(cmd, response[1])
        versions = response[0].replace('+GMR:', '').strip()
        fw_ver, hw_ver, at_ver = versions.split(',')
        return {'hardware': hw_ver, 'firmware': fw_ver, 'at': at_ver}

    async def gnss_continuous_set(self,
                                  interval: int=0,
                                  doppler: bool=True) -> bool:
        """Sets the GNSS continous mode (0 = on-demand).
        
        Args:
            interval: Seconds between GNSS refresh.
            doppler: Often required for moving assets.
        
        Returns:
            True if successful setting.

        """
        _log.debug('Setting GNSS refresh to {} seconds'.format(interval))
        cmd = 'AT%TRK={}{}'.format(interval, ',{}'.format(1 if doppler else 0))
        if interval < 0 or interval > 30:
            raise ValueError('GNSS continuous interval must be in range 0..30')
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def gnss_nmea_get(self,
                            stale_secs: int = 1,
                            wait_secs: int = 35,
                            sentences: list = ['RMC', 'GSA', 'GGA', 'GSV']
                            ) -> Union[list, str]:
        """Returns a list of NMEA-formatted sentences from GNSS.

        Args:
            stale_secs: Maximum age of fix in seconds (1..600)
            wait_secs: Maximum time to wait for fix (1..600)
            sentences: Optional list of NMEA sentence types to get

        Returns:
            List of NMEA sentences

        Raises:
            ValueError if parameter out of range
            AtGnssTimeout if no response from GNSS
            AtException

        """
        _log.debug('Requesting GNSS fix information')
        NMEA_SUPPORTED = ['RMC', 'GGA', 'GSA', 'GSV']
        BUFFER_SECONDS = 5
        if (stale_secs not in range(1, 600+1) or
            wait_secs not in range(1, 600+1)):
            raise ValueError('stale_secs and wait_secs must be 1..600')
        sentence_list = ''
        for sentence in sentences:
            sentence = sentence.upper()
            if sentence in NMEA_SUPPORTED:
                if len(sentence_list) > 0:
                    sentence_list += ','
                sentence_list += '"{}"'.format(sentence)
            else:
                raise ValueError('Unsupported NMEA sentence: {}'
                                 .format(sentence))
        cmd = 'AT%GPS={},{},{}'.format(stale_secs, wait_secs, sentence_list)
        response = await self.command(cmd, timeout=wait_secs + BUFFER_SECONDS)
        if response[0] == 'ERROR':
            if int(response[1]) == 108:
                raise AtGnssTimeout('Timed out waiting for GNSS fix')
            else:
                return self._handle_at_error(cmd, response[1], None)
        if 'OK' in response:
            response.remove('OK')
        response[0] = response[0].replace('%GPS: ', '')
        return response

    async def location(self,
                       stale_secs: int = 1,
                       wait_secs: int = 35) -> Location:
        """Returns a location object.
        
        Args:
            stale_secs: the maximum fix age to accept
            wait_secs: the maximum time to wait for a new fix
        
        Returns:
            nmea.Location object
        
        Raises:
            AtGnssTimeout if no location data is available
        
        """
        _log.debug('Querying location')
        nmea_sentences = await self.gnss_nmea_get(stale_secs, wait_secs)
        return location_from_nmea(nmea_sentences)

    async def lowpower_mode_set(self, power_mode: int) -> bool:
        """Sets the modem power mode (for blockage recovery).

        Args:
            power_mode (int): The new power mode

        Returns:
            True if successful
        
        Raises:
            ValueError on invalid power_mode
        """
        if power_mode not in POWER_MODES:
            raise ValueError('Invalid power mode {}'.format(power_mode))
        _log.debug('Setting power mode {}'.format(
            POWER_MODES[power_mode]))
        cmd = 'ATS50={}'.format(power_mode)
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def lowpower_mode_get(self) -> int:
        """Gets the modem power mode.

        Returns:
            The integer value of the power mode
        
        Raises:
            AtException if an error was returned

        """
        _log.debug('Getting power mode')
        cmd = 'ATS50?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], None)
        return int(response[0])

    async def lowpower_wakeup_set(self, wakeup_period: int) -> bool:
        """Sets the modem wakeup period.

        Args:
            wakeup_period (int): The new wakeup period

        Returns:
            True if successful
        
        Raises:
            ValueError on invalid wakeup_period

        """
        if wakeup_period not in WAKEUP_PERIODS:
            raise ValueError('Invalid wakeup period {}'.format(wakeup_period))
        _log.debug('Setting wakeup period {}'.format(
            WAKEUP_PERIODS[wakeup_period]))
        cmd = 'ATS51={}'.format(wakeup_period)
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def lowpower_wakeup_get(self) -> int:
        """Gets the modem wakeup period.

        Returns:
            The integer value of the wakeup period
        
        Raises:
            AtException if an error was returned

        """
        _log.debug('Getting wakeup period')
        cmd = 'ATS51?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], None)
        return int(response[0])

    async def lowpower_notifications_enable(self) -> bool:
        """Configures low power satellite status and notification assertion.

        The following events trigger assertion of the notification output:
        - New Forward Message received
        - Return Message completed (success or failure)
        - Trace event update (satellite status change)

        Returns:
            True if successful
        """
        _log.debug('Enabling low power notifications')
        cmd = 'AT%EVMON=3.1;S88=1030'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def lowpower_notifications_check(self) -> list:
        """Returns a list of relevant events."""
        _log.debug('Querying low power notifications')
        relevant = []
        try:
            reason = await self.notification_check()
            if reason is not None:
                if reason['event_cached'] == True:
                    relevant.append('event_cached')
                if reason['message_mt_received'] == True:
                    relevant.append('message_mt_received')
                if reason['message_mo_complete'] == True:
                    relevant.append('message_mo_complete')
        except AtException:
            _log.warning('Notification check returned AT exception')
        finally:
            return relevant

    async def message_mo_send(self,
                              data: str,
                              data_format: int,
                              sin: int,
                              min: int = None,
                              name: str = None,
                              priority: int = 4) -> str:
        """Submits a mobile-originated message to send.
        
        Args:
            data: The data to be sent formatted as base64, hex or text according
                to `data_format`.
            data_format: 1: Text, 2: ASCII-Hex, 3: Base64 (MIME)
            name: (Optional) A unique name for the message, if none is provided
                a name based on unix timestamp will be assigned
            priority: 1: High .. 4: Low (default)
            sin: Service Identification Number (15..255) becomes the first byte
                of message payload
            min: (Optional) Message Identification Number (0..255) becomes the
                second byte of message payload if specified

        Returns:
            Name of the message if successful, or the error string
        """
        _log.debug('Submitting message named {}'.format(name))
        if name is None:
            # Use the 8 least-signficant numbers of unix timestamp as unique
            name = str(int(time()))[-8:]
            _log.debug('Assigned name {}'.format(name))
        elif len(name) > 8:
            name = name[0:8]   # risk duplicates create an ERROR resposne
            _log.warning('Truncated name to {}'.format(name))
        _min = '.{}'.format(min) if min is not None else ''
        if data_format == 1:
            data = '"{}"'.format(data)
        cmd = ('AT%MGRT="{}",{},{}{},{},{}'.format(name,
                                                    priority,
                                                    sin,
                                                    _min,
                                                    data_format,
                                                    data))
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], None)
        return name

    async def message_mo_state(self, name: str = None) -> list:
        """Returns the message state(s) requested.
        
        If no name filter is passed in, all available messages states
        are returned.  Returns False is the request failed.

        Args:
            name: The unique message name in the modem queue

        Returns:
            `list` of `dict` with `name`, `state`, `size` and `sent`

        Raises:
            AtException

        """
        _log.debug('Querying transmit message state{}'.format(
            ' ={}'.format(name) if name else 's'))
        cmd = 'AT%MGRS{}'.format('="{}"'.format(name) if name else '')
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], None)
        # %MGRS: "<name>",<msg_no>,<priority>,<sin>,<state>,<size>,<sent_bytes>
        if 'OK' in response:
            response.remove('OK')
        states = []
        for res in response:
            res = res.replace('%MGRS:', '').strip()
            if len(res) > 0:
                name, number, priority, sin, state, size, sent = res.split(',')
                del number
                del priority
                del sin
                states.append({
                    'name': name.replace('"', ''),
                    'state': int(state),
                    'size': int(size),
                    'bytes_sent': int(sent),
                    })
        return states
    
    @staticmethod
    def message_state_name(state: int):
        return MessageState(state).name

    async def message_mo_cancel(self, name: str) -> bool:
        """Cancels a mobile-originated message in the Tx ready state."""
        _log.debug('Cancelling message {}'.format(name))
        cmd = 'AT%MGRC="{}"'.format(name)
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def message_mo_clear(self) -> int:
        """Clears the modem transmit queue.
        
        Returns:
            Count of messages deleted
        
        Raises:
            AtException

        """
        _log.debug('Clearing transmit queue of return messages')
        cancelled_count = 0
        open_count = 0
        cmd = 'AT%MGRS'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        if 'OK' in response:
            response.remove('OK')
        if '%MGRS:' in response:
            response.remove('%MGRS:')
        for message in response:
            if '%MGRS:' in message:
                message = message.replace('%MGRS:', '').strip()
            parts = message.split(',')
            status = int(parts[4])
            name = parts[0].replace('"', '')
            if status < 6:
                cancel_explicit = await self.message_mo_cancel(name)
                if not cancel_explicit:
                    open_count += 1
                else:
                    cancelled_count += 1
        if open_count > 0:
            _log.warning('{} messages still in transmit queue'.format(
                open_count))
        return cancelled_count

    async def message_mt_waiting(self) -> list:
        """Returns a list of received mobile-terminated message information.
        
        Returns:
            List of (name, number, priority, sin, state, length, received)
        
        Raises:
            AtException

        """
        _log.debug('Checking receive queue for forward messages')
        cmd = 'AT%MGFN'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        if 'OK' in response:
            response.remove('OK')
        waiting = []
        #: %MGFN: name, number, priority, sin, state, length, bytes_received
        for res in response:
            msg = res.replace('%MGFN:', '').strip()
            if msg.startswith('"FM'):
                parts = msg.split(',')
                name, number, priority, sin, state, length, received = parts
                del number   #: unused
                waiting.append({'name': name.replace('"', ''),
                                'sin': int(sin),
                                'priority': int(priority),
                                'state': int(state),
                                'length': int(length),
                                'received': int(received)})
        return waiting

    @staticmethod
    def _message_mt_parse(mgfg_response: str,
                          data_format: int) -> dict:
        #:%MGFG:"<msgName>",<msgNum>,<priority>,<sin>,<state>,<length>,<data_format>,<data>
        parts = mgfg_response.replace('%MGFG:', '').strip().split(',')
        sys_msg_num, sys_msg_seq = parts[1].split('.')
        msg_sin = int(parts[3])
        data_str_no_sin = parts[7]
        if data_format == DataFormat.HEX:
            data = '{:02X}'.format(msg_sin) + data_str_no_sin
            databytes = bytes.fromhex(data)
        elif data_format == DataFormat.BASE64:
            databytes = bytes([msg_sin]) + b64decode(data_str_no_sin)
            data = b64encode(databytes).decode('ascii')
        elif data_format == DataFormat.TEXT:
            data_str_no_sin = data_str_no_sin[1:len(data_str_no_sin) - 1]
            data = '\\{:02x}'.format(msg_sin) + data_str_no_sin
            databytes = bytes([msg_sin])
            i = 0
            while i < len(data_str_no_sin):
                if data_str_no_sin[i] == '\\' and i < len(data_str_no_sin) - 1:
                    if data_str_no_sin[i + 1] in '0123456789ABCDEF':
                        databytes += bytes([int(data_str_no_sin[i+1:i+3], 16)])
                        i += 3
                else:
                    databytes += data_str_no_sin[i].encode('utf-8')
                    i += 1
        return {
            'name': parts[0].replace('"', ''),
            'system_message_number': int(sys_msg_num),
            'system_message_sequence': int(sys_msg_seq),
            'priority': int(parts[2]),
            'sin': msg_sin,
            'min': databytes[1],
            'state': int(parts[4]),
            'length': int(parts[5]),
            'data_format': data_format,
            'raw_payload': data,
            'bytes': databytes,
        }

    async def message_mt_get(self,
                             name: str,
                             data_format: int = DataFormat.BASE64,
                             verbose: bool = True) -> Union[dict, bytes]:
        """Returns the payload of a specified mobile-terminated message.
        
        Payload is presented as a string with encoding based on data_format. 

        Args:
            name: The unique name in the modem queue e.g. FM01.01
            data_format: text=1, hex=2, base64=3 (default)
            verbose: if True returns a dictionary, otherwise raw payload bytes

        Returns:
            The encoded data as a string
        
        Raises:
            AtException

        """
        _log.debug('Retrieving forward message {}'.format(name))
        cmd = 'AT%MGFG="{}",{}'.format(name, data_format)
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        message = self._message_mt_parse(response[0], data_format=data_format)
        return message if verbose else message['bytes']

    async def message_mt_delete(self, name: str) -> bool:
        """Marks a Return message for deletion by the modem.
        
        Args:
            name: The unique mobile-terminated name in the queue

        Returns:
            True if the operation succeeded

        """
        _log.debug('Marking forward message {} for deletion'.format(name))
        cmd = 'AT%MGFM="{}"'.format(name)
        try:
            response = await self.command(cmd)
            if response[0] == 'ERROR':
                return self._handle_at_error(cmd, response[1], False)
            return True
        except:
            return False

    async def event_monitor_get(self) -> list:
        """Returns a list of monitored/cached events.
        As a list of <class.subclass> strings which includes an asterisk
        for each new event that can be retrieved.

        Returns:
            list of strings <class.subclass[*]> or None
        
        Raises:
            AtException

        """
        _log.debug('Querying monitored events')
        cmd = 'AT%EVMON'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        events = response[0].replace('%EVMON: ', '').split(',')
        '''
        for i in range(len(events)):
            c, s = events[i].strip().split('.')
            if s[-1] == '*':
                s = s.replace('*', '')
                # TODO flag change for retrieval
            events[i] = (int(c), int(s))
        '''
        return [event for event in events if event != '']

    async def event_monitor_set(self, eventlist: list) -> bool:
        """Sets trace events to monitor.

        Args:
            eventlist: list of tuples (class, subclass)

        Returns:
            True if successfully set

        """
        _log.debug('Setting event monitors: {}'.format(eventlist))
        #: AT%EVMON{ = <c1.s1>[, <c2.s2> ..]}
        cmd = 'AT%EVMON='
        if eventlist is not None:
            for monitor in eventlist:
                if isinstance(monitor, tuple):
                    if len(cmd) > 9:
                        cmd += ','
                    cmd += '{}.{}'.format(monitor[0], monitor[1])
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def event_get(self,
                        event: tuple,
                        raw: bool = True) -> Union[str, dict]:
        """Gets the cached event by class/subclass.

        Args:
            event: tuple of (class, subclass)
            raw: Returns the raw text string if True
        
        Returns:
            String if raw=True, dictionary if raw=False
        
        Raises:
            AtException

        """
        _log.debug('Querying events: {}'.format(event))
        #: AT%EVNT=c,s
        #: res %EVNT: <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
        # <class>,<subclass>,<priority>,<data0>,<data1>,..,<dataN>
        if not (isinstance(event, tuple) and len(event) == 2):
            raise AtException('event_get expects (class, subclass)')
        cmd = 'AT%EVNT={},{}'.format(event[0], event[1])
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        eventdata = response[0].replace('%EVNT: ', '').split(',')
        event = {
            'data_count': int(eventdata[0]),
            'signed_bitmask': bin(int(eventdata[1]))[2:],
            'mobile_id': eventdata[2],
            'timestamp': eventdata[3],
            'class': eventdata[4],
            'subclass': eventdata[5],
            'priority': eventdata[6],
            'data': eventdata[7:]
        }
        bitmask = event['signed_bitmask']
        while len(bitmask) < event['data_count']:
            bitmask = '0' + bitmask
        i = 0
        for bit in reversed(bitmask):
            #: 32-bit signed conversion redundant since response is string
            if bit == '1':
                event['data'][i] = _to_signed32(int(event['data'][i]))
            else:
                event['data'][i] = int(event['data'][i])
            i += 1
        # TODO lookup class/subclass definitions
        return response[0] if raw else event

    async def notification_control_set(self, event_map: list) -> bool:
        """Sets the event notification bitmask.

        Args:
            event_map: list of tuples (event_name, bool)
        
        Returns:
            True if successful.
            
        """
        _log.debug('Setting event notifications: {}'.format(event_map))
        #: ATS88=bitmask
        notifications_changed = False
        old_notifications = await self.notification_control_get()
        if old_notifications is None:
            return False
        bitmask = list('0' * len(old_notifications))
        i = 0
        for event in event_map:
            if event[0] not in NOTIFICATION_BITMASK:
                raise ValueError('Invalid event {}'.format(event[0]))
            i = 0
            for key in reversed(old_notifications):
                bit = '1' if old_notifications[key] or bitmask[i] == '1' else '0'
                if key == event[0]:
                    notify = event[1]
                    if old_notifications[key] != notify:
                        bit = '1' if notify else '0'
                        notifications_changed = True
                        # self.notifications[key] = notify
                bitmask[i] = bit
                i += 1
        if notifications_changed:
            cmd = 'ATS88={}'.format(int('0b' + ''.join(bitmask), 2))
            response = await self.command(cmd)
            if response[0] == 'ERROR':
                return self._handle_at_error(cmd, response[1], False)
        return True
    
    async def notification_control_get(self) -> OrderedDict:
        """Returns the current notification configuration bitmask.
        
        Returns:
            OrderedDict
        
        Raises:
            AtException

        """
        _log.debug('Querying event notification controls')
        cmd =  'ATS88?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        return _notifications_dict(int(response[0]))

    async def notification_check(self) -> OrderedDict:
        """Returns the current active event notification bitmask (S89).
        
        The value of S89 register is cleared upon reading.

        Returns:
            OrderedDict
        
        Raises:
            AtException

        """
        _log.debug('Querying event notification triggers')
        cmd = 'ATS89?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        return _notifications_dict(int(response[0]))

    async def satellite_status(self) -> dict:
        """Returns the control state and C/No.
        
        Returns:
            Dictionary with state (int), snr (float), beamsearch (int),
                state_name (str), beamsearch_name (str), or None if error.

        Raises:
            AtException

        """
        _log.debug('Querying satellite status/SNR')
        cmd = 'ATS90=3 S91=1 S92=1 S116? S122? S123?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        if 'OK' in response:
            response.remove('OK')
        cn_0, ctrl_state, beamsearch_state = response
        cn_0 = int(cn_0) / 100.0
        ctrl_state = int(ctrl_state)
        beamsearch_state = int(beamsearch_state)
        return {
            'state': ctrl_state,
            'state_name': CONTROL_STATES[ctrl_state],
            'snr': cn_0,
            'beamsearch': beamsearch_state,
            'beamsearch_name': BeamSearchState(beamsearch_state).name,
        }

    @staticmethod
    def sat_status_name(ctrl_state: int) -> str:
        """Returns human-readable definition of a control state value.
        
        Raises:
            ValueError if ctrl_state is not found.
        """
        if ctrl_state not in CONTROL_STATES:
            raise ValueError('Control state {} not found'.format(ctrl_state))
        return CONTROL_STATES[ctrl_state]

    @staticmethod
    def sat_beamsearch_name(beamsearch_state: int) -> str:
        return BeamSearchState(beamsearch_state).name

    async def transmit_status(self) -> dict:
        """Returns the transmitter status.
        
        Returns:
            Transmit status (5 = OK)

        Raises:
            AtException if error returned by modem

        """
        _log.debug('Querying transmitter status')
        cmd = 'ATS54?'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        status = int(response[0])
        return TransmitterStatus(status)

    async def shutdown(self) -> bool:
        """Tell the modem to prepare for power-down."""
        _log.debug('Requesting power down')
        cmd = 'AT%OFF'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True

    async def time_utc(self) -> str:
        """Returns current UTC time of the modem in ISO format.
        
        Returns:
            UTC as ISO-formatted string
        
        Raises:
            AtException

        """
        _log.debug('Requesting UTC network time')
        cmd = 'AT%UTC'
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        return response[0].replace('%UTC: ', '').replace(' ', 'T') + 'Z'

    async def s_register_get(self, register: int) -> Union[int, None]:
        """Returns the value of the S-register requested.

        Args:
            register: The S-register number

        Returns:
            integer value of register
        
        Raises:
            AtException

        """
        _log.debug('Querying register value S{}'.format(register))
        cmd = 'ATS{}?'.format(register)
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1])
        return int(response[0])

    async def s_register_get_all(self) -> list:
        """Returns a list of S-register definitions.
        R=read-only, S=signed, V=volatile
        
        Returns:
            tuple(register, RSV, current, default, minimum, maximum)
        
        Raises:
            AtException

        """
        _log.debug('Querying S-register values')
        cmd = 'AT%SREG'
        #: Sreg, RSV, CurrentVal, DefaultVal, MinimumVal, MaximumVal
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[0])
        if 'OK' in response:
            response.remove('OK')
        reg_defs = response[2:]
        registers = []
        for row in reg_defs:
            reg_def = row.split(' ')
            reg_def = tuple(filter(None, reg_def))
            registers.append(reg_def)
        return registers

Static methods

def message_state_name(state: int)
Expand source code
@staticmethod
def message_state_name(state: int):
    return MessageState(state).name
def sat_beamsearch_name(beamsearch_state: int) ‑> str
Expand source code
@staticmethod
def sat_beamsearch_name(beamsearch_state: int) -> str:
    return BeamSearchState(beamsearch_state).name
def sat_status_name(ctrl_state: int) ‑> str

Returns human-readable definition of a control state value.

Raises

ValueError if ctrl_state is not found.

Expand source code
@staticmethod
def sat_status_name(ctrl_state: int) -> str:
    """Returns human-readable definition of a control state value.
    
    Raises:
        ValueError if ctrl_state is not found.
    """
    if ctrl_state not in CONTROL_STATES:
        raise ValueError('Control state {} not found'.format(ctrl_state))
    return CONTROL_STATES[ctrl_state]

Instance variables

var baudrate
Expand source code
@property
def baudrate(self):
    return self._baudrate
var port
Expand source code
@property
def port(self):
    return self._port

Methods

async def command(self, at_command: str, timeout: int = 5, retries: int = 0) ‑> list

Submits an AT command and returns the response asynchronously.

Proxies a private function to allow for multi-threaded operation.

Args

at_command
The AT command string
timeout
The maximum time in seconds to await a response.
retries
Optional number of additional attempts on failure.

Returns

A list of response strings finishing with 'OK', or ['ERROR', '']

Raises

AtException if no response was received. AtException if bad CRC response count exceeds retries

Expand source code
async def command(self,
                  at_command: str,
                  timeout: int = 5,
                  retries: int = 0) -> list:
    """Submits an AT command and returns the response asynchronously.
    
    Proxies a private function to allow for multi-threaded operation.

    Args:
        at_command: The AT command string
        timeout: The maximum time in seconds to await a response.
        retries: Optional number of additional attempts on failure.
    
    Returns:
        A list of response strings finishing with 'OK', or 
            ['ERROR', '<error_code>']
    
    Raises:
        AtException if no response was received.
        AtException if bad CRC response count exceeds retries

    """
    if current_thread() != self._thread:
        _log.warning('Call from external thread may crash or hang')
        loop = get_running_loop()
        set_event_loop(loop)
        await sleep(1)   #: add a slight delay to mitigate race condition
        while self._event.is_set():
            pass
        concurrentfuture = run_coroutine_threadsafe(
            self._command(at_command, timeout, retries), loop)
        asyncfuture = wrap_future(concurrentfuture)
        return await asyncfuture
    else:
        return await self._command(at_command, timeout, retries)
async def config_crc_enable(self, crc: bool) ‑> bool

Enables or disables CRC error checking (for long serial cable).

Args

crc
enable CRC if true
Expand source code
async def config_crc_enable(self, crc: bool) -> bool:
    """Enables or disables CRC error checking (for long serial cable).
    
    Args:
        crc: enable CRC if true
    """
    _log.debug('{} CRC'.format('Enabling' if crc else 'Disabling'))
    cmd = 'AT%CRC={}'.format(1 if crc else 0)
    response = await self.command(cmd)
    if response[0] == 'ERROR' and self.crc != crc:
        return self._handle_at_error(cmd, response[1], False)
    self.crc = crc
    return True
async def config_report(self) ‑> Tuple[dict, dict]

Sends the AT&V command to retrive S-register settings.

Returns

A tuple with two dictionaries or both None if failed at_config with booleans crc, echo, quiet and verbose reg_config with S-register tags and integer values

Expand source code
async def config_report(self) -> Tuple[dict, dict]:
    """Sends the AT&V command to retrive S-register settings.
    
    Returns:
        A tuple with two dictionaries or both None if failed
        at_config with booleans crc, echo, quiet and verbose
        reg_config with S-register tags and integer values

    """
    _log.debug('Querying configuration')
    cmd = 'AT&V'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], (None, None))
    at_config = response[1]
    s_regs = response[2]
    echo, quiet, verbose, crc = at_config.split(' ')
    at_config = {
        "crc": bool(int(crc[4])),
        "echo": bool(int(echo[1])),
        "quiet": bool(int(quiet[1])),
        "verbose": bool(int(verbose[1])),
    }
    reg_config = {}
    for reg in s_regs.split(' '):
        name, value = reg.split(':')
        reg_config[name] = int(value)
    return (at_config, reg_config)
async def config_restore_factory(self) ‑> bool

Sends the AT&F command and returns True on success.

Expand source code
async def config_restore_factory(self) -> bool:
    """Sends the AT&F command and returns True on success."""
    _log.debug('Restoring factory defaults')
    cmd = 'AT&F'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], return_value=False)
    return True
async def config_restore_nvm(self) ‑> bool

Sends the ATZ command to restore from non-volatile memory.

Returns

Boolean success.

Expand source code
async def config_restore_nvm(self) -> bool:
    """Sends the ATZ command to restore from non-volatile memory.
    
    Returns:
        Boolean success.
    """
    _log.debug('Restoring non-volatile configuration')
    cmd = 'ATZ'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], return_value=False)
    return True
async def config_save(self) ‑> bool

Sends the AT&W command and returns True if successful.

Expand source code
async def config_save(self) -> bool:
    """Sends the AT&W command and returns True if successful."""
    _log.debug('Saving S-registers to non-volatile memory')
    cmd = 'AT&W'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def device_mobile_id(self) ‑> str

Returns the unique Mobile ID (Inmarsat serial number).

Returns

MobileID string.

Raises

AtException

Expand source code
async def device_mobile_id(self) -> str:
    """Returns the unique Mobile ID (Inmarsat serial number).
    
    Returns:
        MobileID string.
    
    Raises:
        AtException

    """
    _log.debug('Querying device Mobile ID')
    cmd = 'AT+GSN'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        self._handle_at_error(cmd, response[1])
    return response[0].replace('+GSN:', '').strip()
async def device_version(self) ‑> Tuple[str, str, str]

Returns the hardware, firmware and AT versions.

Returns

Dict with hardware, firmware, at version.

Raises

AtException

Expand source code
async def device_version(self) -> Tuple[str, str, str]:
    """Returns the hardware, firmware and AT versions.
    
    Returns:
        Dict with hardware, firmware, at version.
    
    Raises:
        AtException

    """
    _log.debug('Querying device version info')
    cmd = 'AT+GMR'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        self._handle_at_error(cmd, response[1])
    versions = response[0].replace('+GMR:', '').strip()
    fw_ver, hw_ver, at_ver = versions.split(',')
    return {'hardware': hw_ver, 'firmware': fw_ver, 'at': at_ver}
async def event_get(self, event: tuple, raw: bool = True) ‑> Union[str, dict]

Gets the cached event by class/subclass.

Args

event
tuple of (class, subclass)
raw
Returns the raw text string if True

Returns

String if raw=True, dictionary if raw=False

Raises

AtException

Expand source code
async def event_get(self,
                    event: tuple,
                    raw: bool = True) -> Union[str, dict]:
    """Gets the cached event by class/subclass.

    Args:
        event: tuple of (class, subclass)
        raw: Returns the raw text string if True
    
    Returns:
        String if raw=True, dictionary if raw=False
    
    Raises:
        AtException

    """
    _log.debug('Querying events: {}'.format(event))
    #: AT%EVNT=c,s
    #: res %EVNT: <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
    # <class>,<subclass>,<priority>,<data0>,<data1>,..,<dataN>
    if not (isinstance(event, tuple) and len(event) == 2):
        raise AtException('event_get expects (class, subclass)')
    cmd = 'AT%EVNT={},{}'.format(event[0], event[1])
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    eventdata = response[0].replace('%EVNT: ', '').split(',')
    event = {
        'data_count': int(eventdata[0]),
        'signed_bitmask': bin(int(eventdata[1]))[2:],
        'mobile_id': eventdata[2],
        'timestamp': eventdata[3],
        'class': eventdata[4],
        'subclass': eventdata[5],
        'priority': eventdata[6],
        'data': eventdata[7:]
    }
    bitmask = event['signed_bitmask']
    while len(bitmask) < event['data_count']:
        bitmask = '0' + bitmask
    i = 0
    for bit in reversed(bitmask):
        #: 32-bit signed conversion redundant since response is string
        if bit == '1':
            event['data'][i] = _to_signed32(int(event['data'][i]))
        else:
            event['data'][i] = int(event['data'][i])
        i += 1
    # TODO lookup class/subclass definitions
    return response[0] if raw else event
async def event_monitor_get(self) ‑> list

Returns a list of monitored/cached events. As a list of strings which includes an asterisk for each new event that can be retrieved.

Returns

list of strings or None

Raises

AtException

Expand source code
async def event_monitor_get(self) -> list:
    """Returns a list of monitored/cached events.
    As a list of <class.subclass> strings which includes an asterisk
    for each new event that can be retrieved.

    Returns:
        list of strings <class.subclass[*]> or None
    
    Raises:
        AtException

    """
    _log.debug('Querying monitored events')
    cmd = 'AT%EVMON'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    events = response[0].replace('%EVMON: ', '').split(',')
    '''
    for i in range(len(events)):
        c, s = events[i].strip().split('.')
        if s[-1] == '*':
            s = s.replace('*', '')
            # TODO flag change for retrieval
        events[i] = (int(c), int(s))
    '''
    return [event for event in events if event != '']
async def event_monitor_set(self, eventlist: list) ‑> bool

Sets trace events to monitor.

Args

eventlist
list of tuples (class, subclass)

Returns

True if successfully set

Expand source code
async def event_monitor_set(self, eventlist: list) -> bool:
    """Sets trace events to monitor.

    Args:
        eventlist: list of tuples (class, subclass)

    Returns:
        True if successfully set

    """
    _log.debug('Setting event monitors: {}'.format(eventlist))
    #: AT%EVMON{ = <c1.s1>[, <c2.s2> ..]}
    cmd = 'AT%EVMON='
    if eventlist is not None:
        for monitor in eventlist:
            if isinstance(monitor, tuple):
                if len(cmd) > 9:
                    cmd += ','
                cmd += '{}.{}'.format(monitor[0], monitor[1])
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def gnss_continuous_set(self, interval: int = 0, doppler: bool = True) ‑> bool

Sets the GNSS continous mode (0 = on-demand).

Args

interval
Seconds between GNSS refresh.
doppler
Often required for moving assets.

Returns

True if successful setting.

Expand source code
async def gnss_continuous_set(self,
                              interval: int=0,
                              doppler: bool=True) -> bool:
    """Sets the GNSS continous mode (0 = on-demand).
    
    Args:
        interval: Seconds between GNSS refresh.
        doppler: Often required for moving assets.
    
    Returns:
        True if successful setting.

    """
    _log.debug('Setting GNSS refresh to {} seconds'.format(interval))
    cmd = 'AT%TRK={}{}'.format(interval, ',{}'.format(1 if doppler else 0))
    if interval < 0 or interval > 30:
        raise ValueError('GNSS continuous interval must be in range 0..30')
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def gnss_nmea_get(self, stale_secs: int = 1, wait_secs: int = 35, sentences: list = ['RMC', 'GSA', 'GGA', 'GSV']) ‑> Union[list, str]

Returns a list of NMEA-formatted sentences from GNSS.

Args

stale_secs
Maximum age of fix in seconds (1..600)
wait_secs
Maximum time to wait for fix (1..600)
sentences
Optional list of NMEA sentence types to get

Returns

List of NMEA sentences

Raises

ValueError if parameter out of range AtGnssTimeout if no response from GNSS AtException

Expand source code
async def gnss_nmea_get(self,
                        stale_secs: int = 1,
                        wait_secs: int = 35,
                        sentences: list = ['RMC', 'GSA', 'GGA', 'GSV']
                        ) -> Union[list, str]:
    """Returns a list of NMEA-formatted sentences from GNSS.

    Args:
        stale_secs: Maximum age of fix in seconds (1..600)
        wait_secs: Maximum time to wait for fix (1..600)
        sentences: Optional list of NMEA sentence types to get

    Returns:
        List of NMEA sentences

    Raises:
        ValueError if parameter out of range
        AtGnssTimeout if no response from GNSS
        AtException

    """
    _log.debug('Requesting GNSS fix information')
    NMEA_SUPPORTED = ['RMC', 'GGA', 'GSA', 'GSV']
    BUFFER_SECONDS = 5
    if (stale_secs not in range(1, 600+1) or
        wait_secs not in range(1, 600+1)):
        raise ValueError('stale_secs and wait_secs must be 1..600')
    sentence_list = ''
    for sentence in sentences:
        sentence = sentence.upper()
        if sentence in NMEA_SUPPORTED:
            if len(sentence_list) > 0:
                sentence_list += ','
            sentence_list += '"{}"'.format(sentence)
        else:
            raise ValueError('Unsupported NMEA sentence: {}'
                             .format(sentence))
    cmd = 'AT%GPS={},{},{}'.format(stale_secs, wait_secs, sentence_list)
    response = await self.command(cmd, timeout=wait_secs + BUFFER_SECONDS)
    if response[0] == 'ERROR':
        if int(response[1]) == 108:
            raise AtGnssTimeout('Timed out waiting for GNSS fix')
        else:
            return self._handle_at_error(cmd, response[1], None)
    if 'OK' in response:
        response.remove('OK')
    response[0] = response[0].replace('%GPS: ', '')
    return response
async def initialize(self, crc: bool = False) ‑> bool

Initializes the modem using ATZ and sets up CRC.

Args

crc
desired initial CRC enabled if True

Returns

True if successful

Raises

AtException on errors other than CRC enabled

Expand source code
async def initialize(self, crc: bool = False) -> bool:
    """Initializes the modem using ATZ and sets up CRC.

    Args:
        crc: desired initial CRC enabled if True

    Returns:
        True if successful
    
    Raises:
        AtException on errors other than CRC enabled

    """
    _log.debug('Initializing modem{}'.format(
        ' (CRC enabled)' if crc else ''))
    cmd = 'ATZ;E1;V1'
    cmd += ';%CRC=1' if crc else ''
    success = await self.command(cmd)
    if success[0] == 'ERROR':
        if int(success[1]) == 100:
            if crc and self.crc:
                _log.debug('CRC already enabled')
                return True
            else:
                self.crc = True
                await self.initialize(crc)
        else:
            return self._handle_at_error(cmd, success[1], return_value=False)
    self.crc = crc
    return True
async def location(self, stale_secs: int = 1, wait_secs: int = 35) ‑> Location

Returns a location object.

Args

stale_secs
the maximum fix age to accept
wait_secs
the maximum time to wait for a new fix

Returns

nmea.Location object

Raises

AtGnssTimeout if no location data is available

Expand source code
async def location(self,
                   stale_secs: int = 1,
                   wait_secs: int = 35) -> Location:
    """Returns a location object.
    
    Args:
        stale_secs: the maximum fix age to accept
        wait_secs: the maximum time to wait for a new fix
    
    Returns:
        nmea.Location object
    
    Raises:
        AtGnssTimeout if no location data is available
    
    """
    _log.debug('Querying location')
    nmea_sentences = await self.gnss_nmea_get(stale_secs, wait_secs)
    return location_from_nmea(nmea_sentences)
async def lowpower_mode_get(self) ‑> int

Gets the modem power mode.

Returns

The integer value of the power mode

Raises

AtException if an error was returned

Expand source code
async def lowpower_mode_get(self) -> int:
    """Gets the modem power mode.

    Returns:
        The integer value of the power mode
    
    Raises:
        AtException if an error was returned

    """
    _log.debug('Getting power mode')
    cmd = 'ATS50?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], None)
    return int(response[0])
async def lowpower_mode_set(self, power_mode: int) ‑> bool

Sets the modem power mode (for blockage recovery).

Args

power_mode : int
The new power mode

Returns

True if successful

Raises

ValueError on invalid power_mode

Expand source code
async def lowpower_mode_set(self, power_mode: int) -> bool:
    """Sets the modem power mode (for blockage recovery).

    Args:
        power_mode (int): The new power mode

    Returns:
        True if successful
    
    Raises:
        ValueError on invalid power_mode
    """
    if power_mode not in POWER_MODES:
        raise ValueError('Invalid power mode {}'.format(power_mode))
    _log.debug('Setting power mode {}'.format(
        POWER_MODES[power_mode]))
    cmd = 'ATS50={}'.format(power_mode)
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def lowpower_notifications_check(self) ‑> list

Returns a list of relevant events.

Expand source code
async def lowpower_notifications_check(self) -> list:
    """Returns a list of relevant events."""
    _log.debug('Querying low power notifications')
    relevant = []
    try:
        reason = await self.notification_check()
        if reason is not None:
            if reason['event_cached'] == True:
                relevant.append('event_cached')
            if reason['message_mt_received'] == True:
                relevant.append('message_mt_received')
            if reason['message_mo_complete'] == True:
                relevant.append('message_mo_complete')
    except AtException:
        _log.warning('Notification check returned AT exception')
    finally:
        return relevant
async def lowpower_notifications_enable(self) ‑> bool

Configures low power satellite status and notification assertion.

The following events trigger assertion of the notification output: - New Forward Message received - Return Message completed (success or failure) - Trace event update (satellite status change)

Returns

True if successful

Expand source code
async def lowpower_notifications_enable(self) -> bool:
    """Configures low power satellite status and notification assertion.

    The following events trigger assertion of the notification output:
    - New Forward Message received
    - Return Message completed (success or failure)
    - Trace event update (satellite status change)

    Returns:
        True if successful
    """
    _log.debug('Enabling low power notifications')
    cmd = 'AT%EVMON=3.1;S88=1030'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def lowpower_wakeup_get(self) ‑> int

Gets the modem wakeup period.

Returns

The integer value of the wakeup period

Raises

AtException if an error was returned

Expand source code
async def lowpower_wakeup_get(self) -> int:
    """Gets the modem wakeup period.

    Returns:
        The integer value of the wakeup period
    
    Raises:
        AtException if an error was returned

    """
    _log.debug('Getting wakeup period')
    cmd = 'ATS51?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], None)
    return int(response[0])
async def lowpower_wakeup_set(self, wakeup_period: int) ‑> bool

Sets the modem wakeup period.

Args

wakeup_period : int
The new wakeup period

Returns

True if successful

Raises

ValueError on invalid wakeup_period

Expand source code
async def lowpower_wakeup_set(self, wakeup_period: int) -> bool:
    """Sets the modem wakeup period.

    Args:
        wakeup_period (int): The new wakeup period

    Returns:
        True if successful
    
    Raises:
        ValueError on invalid wakeup_period

    """
    if wakeup_period not in WAKEUP_PERIODS:
        raise ValueError('Invalid wakeup period {}'.format(wakeup_period))
    _log.debug('Setting wakeup period {}'.format(
        WAKEUP_PERIODS[wakeup_period]))
    cmd = 'ATS51={}'.format(wakeup_period)
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def message_mo_cancel(self, name: str) ‑> bool

Cancels a mobile-originated message in the Tx ready state.

Expand source code
async def message_mo_cancel(self, name: str) -> bool:
    """Cancels a mobile-originated message in the Tx ready state."""
    _log.debug('Cancelling message {}'.format(name))
    cmd = 'AT%MGRC="{}"'.format(name)
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def message_mo_clear(self) ‑> int

Clears the modem transmit queue.

Returns

Count of messages deleted

Raises

AtException

Expand source code
async def message_mo_clear(self) -> int:
    """Clears the modem transmit queue.
    
    Returns:
        Count of messages deleted
    
    Raises:
        AtException

    """
    _log.debug('Clearing transmit queue of return messages')
    cancelled_count = 0
    open_count = 0
    cmd = 'AT%MGRS'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    if 'OK' in response:
        response.remove('OK')
    if '%MGRS:' in response:
        response.remove('%MGRS:')
    for message in response:
        if '%MGRS:' in message:
            message = message.replace('%MGRS:', '').strip()
        parts = message.split(',')
        status = int(parts[4])
        name = parts[0].replace('"', '')
        if status < 6:
            cancel_explicit = await self.message_mo_cancel(name)
            if not cancel_explicit:
                open_count += 1
            else:
                cancelled_count += 1
    if open_count > 0:
        _log.warning('{} messages still in transmit queue'.format(
            open_count))
    return cancelled_count
async def message_mo_send(self, data: str, data_format: int, sin: int, min: int = None, name: str = None, priority: int = 4) ‑> str

Submits a mobile-originated message to send.

Args

data
The data to be sent formatted as base64, hex or text according to data_format.
data_format
1: Text, 2: ASCII-Hex, 3: Base64 (MIME)
name
(Optional) A unique name for the message, if none is provided a name based on unix timestamp will be assigned
priority
1: High .. 4: Low (default)
sin
Service Identification Number (15..255) becomes the first byte of message payload
min
(Optional) Message Identification Number (0..255) becomes the second byte of message payload if specified

Returns

Name of the message if successful, or the error string

Expand source code
async def message_mo_send(self,
                          data: str,
                          data_format: int,
                          sin: int,
                          min: int = None,
                          name: str = None,
                          priority: int = 4) -> str:
    """Submits a mobile-originated message to send.
    
    Args:
        data: The data to be sent formatted as base64, hex or text according
            to `data_format`.
        data_format: 1: Text, 2: ASCII-Hex, 3: Base64 (MIME)
        name: (Optional) A unique name for the message, if none is provided
            a name based on unix timestamp will be assigned
        priority: 1: High .. 4: Low (default)
        sin: Service Identification Number (15..255) becomes the first byte
            of message payload
        min: (Optional) Message Identification Number (0..255) becomes the
            second byte of message payload if specified

    Returns:
        Name of the message if successful, or the error string
    """
    _log.debug('Submitting message named {}'.format(name))
    if name is None:
        # Use the 8 least-signficant numbers of unix timestamp as unique
        name = str(int(time()))[-8:]
        _log.debug('Assigned name {}'.format(name))
    elif len(name) > 8:
        name = name[0:8]   # risk duplicates create an ERROR resposne
        _log.warning('Truncated name to {}'.format(name))
    _min = '.{}'.format(min) if min is not None else ''
    if data_format == 1:
        data = '"{}"'.format(data)
    cmd = ('AT%MGRT="{}",{},{}{},{},{}'.format(name,
                                                priority,
                                                sin,
                                                _min,
                                                data_format,
                                                data))
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], None)
    return name
async def message_mo_state(self, name: str = None) ‑> list

Returns the message state(s) requested.

If no name filter is passed in, all available messages states are returned. Returns False is the request failed.

Args

name
The unique message name in the modem queue

Returns

list of dict with name, state, size and sent

Raises

AtException

Expand source code
async def message_mo_state(self, name: str = None) -> list:
    """Returns the message state(s) requested.
    
    If no name filter is passed in, all available messages states
    are returned.  Returns False is the request failed.

    Args:
        name: The unique message name in the modem queue

    Returns:
        `list` of `dict` with `name`, `state`, `size` and `sent`

    Raises:
        AtException

    """
    _log.debug('Querying transmit message state{}'.format(
        ' ={}'.format(name) if name else 's'))
    cmd = 'AT%MGRS{}'.format('="{}"'.format(name) if name else '')
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], None)
    # %MGRS: "<name>",<msg_no>,<priority>,<sin>,<state>,<size>,<sent_bytes>
    if 'OK' in response:
        response.remove('OK')
    states = []
    for res in response:
        res = res.replace('%MGRS:', '').strip()
        if len(res) > 0:
            name, number, priority, sin, state, size, sent = res.split(',')
            del number
            del priority
            del sin
            states.append({
                'name': name.replace('"', ''),
                'state': int(state),
                'size': int(size),
                'bytes_sent': int(sent),
                })
    return states
async def message_mt_delete(self, name: str) ‑> bool

Marks a Return message for deletion by the modem.

Args

name
The unique mobile-terminated name in the queue

Returns

True if the operation succeeded

Expand source code
async def message_mt_delete(self, name: str) -> bool:
    """Marks a Return message for deletion by the modem.
    
    Args:
        name: The unique mobile-terminated name in the queue

    Returns:
        True if the operation succeeded

    """
    _log.debug('Marking forward message {} for deletion'.format(name))
    cmd = 'AT%MGFM="{}"'.format(name)
    try:
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
        return True
    except:
        return False
async def message_mt_get(self, name: str, data_format: int = DataFormat.BASE64, verbose: bool = True) ‑> Union[dict, bytes]

Returns the payload of a specified mobile-terminated message.

Payload is presented as a string with encoding based on data_format.

Args

name
The unique name in the modem queue e.g. FM01.01
data_format
text=1, hex=2, base64=3 (default)
verbose
if True returns a dictionary, otherwise raw payload bytes

Returns

The encoded data as a string

Raises

AtException

Expand source code
async def message_mt_get(self,
                         name: str,
                         data_format: int = DataFormat.BASE64,
                         verbose: bool = True) -> Union[dict, bytes]:
    """Returns the payload of a specified mobile-terminated message.
    
    Payload is presented as a string with encoding based on data_format. 

    Args:
        name: The unique name in the modem queue e.g. FM01.01
        data_format: text=1, hex=2, base64=3 (default)
        verbose: if True returns a dictionary, otherwise raw payload bytes

    Returns:
        The encoded data as a string
    
    Raises:
        AtException

    """
    _log.debug('Retrieving forward message {}'.format(name))
    cmd = 'AT%MGFG="{}",{}'.format(name, data_format)
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    message = self._message_mt_parse(response[0], data_format=data_format)
    return message if verbose else message['bytes']
async def message_mt_waiting(self) ‑> list

Returns a list of received mobile-terminated message information.

Returns

List of (name, number, priority, sin, state, length, received)

Raises

AtException

Expand source code
async def message_mt_waiting(self) -> list:
    """Returns a list of received mobile-terminated message information.
    
    Returns:
        List of (name, number, priority, sin, state, length, received)
    
    Raises:
        AtException

    """
    _log.debug('Checking receive queue for forward messages')
    cmd = 'AT%MGFN'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    if 'OK' in response:
        response.remove('OK')
    waiting = []
    #: %MGFN: name, number, priority, sin, state, length, bytes_received
    for res in response:
        msg = res.replace('%MGFN:', '').strip()
        if msg.startswith('"FM'):
            parts = msg.split(',')
            name, number, priority, sin, state, length, received = parts
            del number   #: unused
            waiting.append({'name': name.replace('"', ''),
                            'sin': int(sin),
                            'priority': int(priority),
                            'state': int(state),
                            'length': int(length),
                            'received': int(received)})
    return waiting
async def notification_check(self) ‑> collections.OrderedDict

Returns the current active event notification bitmask (S89).

The value of S89 register is cleared upon reading.

Returns

OrderedDict

Raises

AtException

Expand source code
async def notification_check(self) -> OrderedDict:
    """Returns the current active event notification bitmask (S89).
    
    The value of S89 register is cleared upon reading.

    Returns:
        OrderedDict
    
    Raises:
        AtException

    """
    _log.debug('Querying event notification triggers')
    cmd = 'ATS89?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    return _notifications_dict(int(response[0]))
async def notification_control_get(self) ‑> collections.OrderedDict

Returns the current notification configuration bitmask.

Returns

OrderedDict

Raises

AtException

Expand source code
async def notification_control_get(self) -> OrderedDict:
    """Returns the current notification configuration bitmask.
    
    Returns:
        OrderedDict
    
    Raises:
        AtException

    """
    _log.debug('Querying event notification controls')
    cmd =  'ATS88?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    return _notifications_dict(int(response[0]))
async def notification_control_set(self, event_map: list) ‑> bool

Sets the event notification bitmask.

Args

event_map
list of tuples (event_name, bool)

Returns

True if successful.

Expand source code
async def notification_control_set(self, event_map: list) -> bool:
    """Sets the event notification bitmask.

    Args:
        event_map: list of tuples (event_name, bool)
    
    Returns:
        True if successful.
        
    """
    _log.debug('Setting event notifications: {}'.format(event_map))
    #: ATS88=bitmask
    notifications_changed = False
    old_notifications = await self.notification_control_get()
    if old_notifications is None:
        return False
    bitmask = list('0' * len(old_notifications))
    i = 0
    for event in event_map:
        if event[0] not in NOTIFICATION_BITMASK:
            raise ValueError('Invalid event {}'.format(event[0]))
        i = 0
        for key in reversed(old_notifications):
            bit = '1' if old_notifications[key] or bitmask[i] == '1' else '0'
            if key == event[0]:
                notify = event[1]
                if old_notifications[key] != notify:
                    bit = '1' if notify else '0'
                    notifications_changed = True
                    # self.notifications[key] = notify
            bitmask[i] = bit
            i += 1
    if notifications_changed:
        cmd = 'ATS88={}'.format(int('0b' + ''.join(bitmask), 2))
        response = await self.command(cmd)
        if response[0] == 'ERROR':
            return self._handle_at_error(cmd, response[1], False)
    return True
async def s_register_get(self, register: int) ‑> Optional[int]

Returns the value of the S-register requested.

Args

register
The S-register number

Returns

integer value of register

Raises

AtException

Expand source code
async def s_register_get(self, register: int) -> Union[int, None]:
    """Returns the value of the S-register requested.

    Args:
        register: The S-register number

    Returns:
        integer value of register
    
    Raises:
        AtException

    """
    _log.debug('Querying register value S{}'.format(register))
    cmd = 'ATS{}?'.format(register)
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    return int(response[0])
async def s_register_get_all(self) ‑> list

Returns a list of S-register definitions. R=read-only, S=signed, V=volatile

Returns

tuple(register, RSV, current, default, minimum, maximum)

Raises

AtException

Expand source code
async def s_register_get_all(self) -> list:
    """Returns a list of S-register definitions.
    R=read-only, S=signed, V=volatile
    
    Returns:
        tuple(register, RSV, current, default, minimum, maximum)
    
    Raises:
        AtException

    """
    _log.debug('Querying S-register values')
    cmd = 'AT%SREG'
    #: Sreg, RSV, CurrentVal, DefaultVal, MinimumVal, MaximumVal
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[0])
    if 'OK' in response:
        response.remove('OK')
    reg_defs = response[2:]
    registers = []
    for row in reg_defs:
        reg_def = row.split(' ')
        reg_def = tuple(filter(None, reg_def))
        registers.append(reg_def)
    return registers
async def satellite_status(self) ‑> dict

Returns the control state and C/No.

Returns

Dictionary with state (int), snr (float), beamsearch (int), state_name (str), beamsearch_name (str), or None if error.

Raises

AtException

Expand source code
async def satellite_status(self) -> dict:
    """Returns the control state and C/No.
    
    Returns:
        Dictionary with state (int), snr (float), beamsearch (int),
            state_name (str), beamsearch_name (str), or None if error.

    Raises:
        AtException

    """
    _log.debug('Querying satellite status/SNR')
    cmd = 'ATS90=3 S91=1 S92=1 S116? S122? S123?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    if 'OK' in response:
        response.remove('OK')
    cn_0, ctrl_state, beamsearch_state = response
    cn_0 = int(cn_0) / 100.0
    ctrl_state = int(ctrl_state)
    beamsearch_state = int(beamsearch_state)
    return {
        'state': ctrl_state,
        'state_name': CONTROL_STATES[ctrl_state],
        'snr': cn_0,
        'beamsearch': beamsearch_state,
        'beamsearch_name': BeamSearchState(beamsearch_state).name,
    }
async def shutdown(self) ‑> bool

Tell the modem to prepare for power-down.

Expand source code
async def shutdown(self) -> bool:
    """Tell the modem to prepare for power-down."""
    _log.debug('Requesting power down')
    cmd = 'AT%OFF'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1], False)
    return True
async def time_utc(self) ‑> str

Returns current UTC time of the modem in ISO format.

Returns

UTC as ISO-formatted string

Raises

AtException

Expand source code
async def time_utc(self) -> str:
    """Returns current UTC time of the modem in ISO format.
    
    Returns:
        UTC as ISO-formatted string
    
    Raises:
        AtException

    """
    _log.debug('Requesting UTC network time')
    cmd = 'AT%UTC'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    return response[0].replace('%UTC: ', '').replace(' ', 'T') + 'Z'
async def transmit_status(self) ‑> dict

Returns the transmitter status.

Returns

Transmit status (5 = OK)

Raises

AtException if error returned by modem

Expand source code
async def transmit_status(self) -> dict:
    """Returns the transmitter status.
    
    Returns:
        Transmit status (5 = OK)

    Raises:
        AtException if error returned by modem

    """
    _log.debug('Querying transmitter status')
    cmd = 'ATS54?'
    response = await self.command(cmd)
    if response[0] == 'ERROR':
        return self._handle_at_error(cmd, response[1])
    status = int(response[0])
    return TransmitterStatus(status)