Module pynimomodem.modem

Class for a Non-IP Modem using Orbcomm network protocols.

This module encapsulates specific AT commands from the following supported NIMO modem manufacturers:

  • Quectel

Key concepts include modem properties, configuration, and satellite acquisition details including signal level.

Expand source code
"""Class for a Non-IP Modem using Orbcomm network protocols.

This module encapsulates specific AT commands from the following supported
NIMO modem manufacturers:

* **Quectel**

Key concepts include modem properties, configuration, and satellite acquisition
details including signal level.

import base64
import logging
import time
from dataclasses import dataclass
from enum import IntEnum

from serial import Serial

from .atcommandbuffer import DEFAULT_AT_TIMEOUT, AtCommandBuffer
from .constants import (
from .location import (
from .message import MoMessage, MtMessage, NimoMessage
from .nimoutils import iso_to_ts, vlog

VLOG_TAG = 'nimomodem'

_log = logging.getLogger(__name__)

class Manufacturer(IntEnum):
    """Supported NIMO modem implementations."""
    NONE = 0
    ORBCOMM = 1
    QUECTEL = 2

class AcquisitionInfo:
    """Details about the satellite acquisition state of the modem.
        ctrl_state (ControlState): Primary network acquisition state.
        beam_state (BeamState): Secondary beam acquistion state.
        rssi (float): Signal indicator Carrier to Noise ratio (dB-Hz).
        vcid (int): Virtual carrier identifier for low-level sanity check.
    ctrl_state: ControlState = ControlState.STOPPED
    beam_state: BeamState = BeamState.IDLE
    rssi: float = 0.0
    vcid: int = 0

class ModemError(Exception):
    """Base class for NIMO modem errors."""

class ModemTimeout(ModemError):
    """Serial response timeout."""

class ModemCrc(ModemError):
    """Error calculating response CRC."""

class ModemCrcConfig(ModemError):
    """Request/response mismatch of CRC presence."""

class ModemAtError(ModemError):
    """AT command related errors with error_code property."""
    def error_code(self) -> AtErrorCode:
        return AtErrorCode[self.args[0]]

class NimoModem:
    """A class for NIMO satellite IoT modem interaction."""
    # __slots__ = ('_modem', '_mobile_id',
    #              '_mfr_code', '_modem_booted',
    #              )
    def __init__(self, serial_port: str, **kwargs) -> None:
        """Instantiate the NimoModem object.
        For additional kwargs see PySerial API:
            serial_port (str): The path to the modem's serial port.
        Keyword Args:
            baudrate (int): The baud rate of the modem (default 9600)
            `ConnectionError` if unable to connect to the serial port.
            self._serial = Serial(serial_port, **kwargs)
        except Exception as exc:
            raise ConnectionError('Unable to connect to serial') from exc
        self._modem: AtCommandBuffer = AtCommandBuffer(self._serial)
        self._baudrate: int = self._serial.baudrate
        self._is_connected: bool = False
        self._modem_booted: bool = False
        self._mobile_id: str = ''
        self._manufacturer: Manufacturer = Manufacturer.NONE
    def is_ready(self) -> bool:
        return not self._modem._lock.locked()
    def crc_enabled(self) -> bool:
        return self._modem.crc
    def modem_booted(self) -> bool:
        return self._modem_booted
    def _mfr(self) -> Manufacturer:
        """Used internally to support different manufacturer commands."""
        if not self._manufacturer:
            _log.debug('Using %s manufacturer commands', self._manufacturer)
        return self._manufacturer
    def _mo_msg_name_len_max(self) -> int:
        """Used internally to restrict the length of the MO message name."""
        maxlen = MSG_MO_NAME_MAX_LEN
        if self._mfr == Manufacturer.QUECTEL:
            maxlen = MSG_MO_NAME_QMAX_LEN
        return maxlen
    def _at_command_response(self,
                             command: str,
                             prefix: str = '',
                             timeout: int = DEFAULT_AT_TIMEOUT) -> str:
        """Send a command and return the response.
        Blocks until response has been received.
            command (str): The AT command to send.
            prefix (str): Optional prefix to remove from response.
            timeout (int): Maximum time in seconds to wait for response.
            `ModemTimeout` if no response is received.
            `ModemCrcConfig` if CRC is not used on both request/response.
            `ModemAtError` for other cases of errored response.
        err = self._modem.read_at_response(prefix, timeout)
        if err == AtErrorCode.OK:
            return self._modem.get_response()
        elif err == AtErrorCode.TIMEOUT:
            raise ModemTimeout(f'AT response timed out after {timeout} seconds')
        elif err == AtErrorCode.CRC_CONFIG_MISMATCH:
            raise ModemCrcConfig('Reponse checksum {}expected'.format(
                                 '' if self.crc_enabled else 'un'))
        elif err == AtErrorCode.INVALID_RESPONSE_CRC:
            raise ModemCrc(
            err = self.get_last_error_code()
            if err == AtErrorCode.INVALID_CRC:
                raise ModemCrc(
            raise ModemAtError(
    def connect(self) -> None:
        """Attach to the modem via serial communications.
        Provided for backward compatibility. Connection is usually automatic
        when instantiating `NimoModem`.
        if not self._serial.is_open:
    def disconnect(self) -> None:
        """Detach from the modem serial communications.
        Provided for backward compatibility. Not typically required.
        self._is_connected = False
        self._modem_booted = False
        if self._serial.is_open:
    def is_connected(self) -> bool:
        """Indicates if the modem is responding to a basic AT query."""
            self._is_connected = True
            self._modem_booted = True
            return True
        except ModemError:
            self._is_connected = False
            self._modem_booted = False
            return False
    def baudrate(self) -> int:
        """The baudrate of the serial connection."""
        return self._modem.serial.baudrate
    def baudrate(self, baudrate: int):
        """Set the baud rate of the modem and adjust the serial rate."""
        if baudrate not in [9600, 115200]:
            raise ValueError('Invalid baudrate')
        self._modem.serial.baudrate = baudrate
    def retry_baudrate(self) -> bool:
        for baud in BAUDRATES:
            self._modem.serial.baudrate = baud
            if self.is_connected():
                return True
        return False
    def await_boot(self, boot_timeout: int = 10) -> bool:
        """Indicates if a boot string is received within a timeout window.
        Use `is_connected` before waiting for boot.
            boot_timeout (int): The maximum time to wait in seconds.
            True if a valid boot string was received inside the timeout.
        boot_strings = ['ST Version', 'RDY']
        _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout)
        rx_data = ''
        started = time.time()
        while time.time() - started < boot_timeout and not self._modem_booted:
            while self._modem.is_data_waiting():
                rx_data += self._modem.read_rx_buffer()
            if rx_data and any(b in rx_data for b in boot_strings):
                self._modem_booted = True
                _log.debug('Found boot string - clearing Rx buffer')
                while self._modem.is_data_waiting():
                    rx_data += self._modem.read_rx_buffer()
        return self._modem_booted
    def get_last_error_code(self) -> AtErrorCode:
        """Get the last error code from the modem."""
        return AtErrorCode(int(self._at_command_response('ATS80?')))
    def initialize(self,
                   echo: bool = True,
                   verbose: bool = True,
                   ) -> bool:
        """Initialize the modem AT configuration for Echo and Verbose."""
        at_command = (f'ATZ;E{int(echo)};V{int(verbose)}')
            return True
        except ModemCrcConfig:
  'Attempting re-initialize with CRC enabled')
            return True
    def set_crc(self, enable: bool = False) -> bool:
        """Enable or disable CRC error checking on the modem serial port."""
            return True
        except ModemCrcConfig:
            if ((self._modem.crc and enable) or
                (not self._modem.crc and not enable)):
                return True
            return False
    def reset_factory_config(self) -> None:
        """Reset the modem's factory default configuration."""
    def save_config(self) -> None:
        """Store the current configuration to modem non-volatile memory."""
    def get_mobile_id(self) -> str:
        """Get the modem's globally unique identifier."""
        if not self._mobile_id:
                self._mobile_id = self._at_command_response('AT+GSN', '+GSN:')
                if vlog(VLOG_TAG):
                    _log.debug('Cached Mobile ID %s', self._mobile_id)
            except ModemError:
                self._mobile_id = ''
        return self._mobile_id
    def _is_simulator(self) -> bool:
        return self.get_mobile_id().startswith('00000000')
    def get_manufacturer(self) -> str:
        """Get the manufacturer name."""
        if not self._manufacturer:
                mfr = self._at_command_response('ATI')
                if 'quectel' in mfr.lower():
                    self._manufacturer = Manufacturer.QUECTEL
                    if not any(m in mfr.lower()
                               for m in ['orbcomm', 'skywave']):
                        _log.warning('Unsupported manufacturer %s', mfr)
                    self._manufacturer = Manufacturer.ORBCOMM
                if vlog(VLOG_TAG):
                    _log.debug('Caching manufacturer: %s',
            except ModemError:
                self._manufacturer = Manufacturer.NONE
    def get_model(self) -> str:
        """Get the manufacturer model name."""
        cmd = 'ATI4'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'ATI'
            response = self._at_command_response(cmd)
            if response:
                if self._mfr == Manufacturer.QUECTEL:
                    response = response.split('\n')[1]
            return response
        except ModemError:
            return ''
    def get_firmware_version(self) -> str:
        """Get the modem's firmware version."""
        # TODO: Firmware structure with hardware, firmware, software?
        return self._at_command_response('AT+GMR', '+GMR:')
    def get_system_time(self) -> int:
        """Get the system/GNSS time from the modem."""
            nimo_time = self._at_command_response('AT%UTC', '%UTC:')
            iso_time = nimo_time.replace(' ', 'T') + 'Z'
            return iso_to_ts(iso_time)
        except ModemError:
            return 0
    def get_temperature(self) -> 'int|None':
        """Get the processor temperature in Celsius."""
            return int(int(self._at_command_response('ATS85?')) / 10)
        except ValueError:
            return None
    def is_transmit_allowed(self) -> bool:
        """Indicates if the modem is able to transmit data."""
        return self.get_network_status() == 5
    def is_blocked(self) -> bool:
        """Indicates if line-of-sight to the satellite is blocked."""
        return self.get_network_status() == 8
    def is_muted(self) -> bool:
        """Indicates if the modem has been muted (disallowed to transmit data).
        return self.get_network_status() == 7
    def is_updating_network(self) -> bool:
        """Indicates if the modem is updating network information.
        The modem should not be powered down during a network update.
        return self.get_network_status() == 4
    def get_network_status(self) -> NetworkStatus:
        """Get the current satellite acquisition status."""
        cmd = 'ATS54?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QREG?'
            prefix = '+QREG:'
        return NetworkStatus(int(self._at_command_response(cmd, prefix)))
    def get_rssi(self) -> float:
        """Get the current Received Signal Strength Indicator.
        Also referred to as SNR or C/N0 (dB-Hz)
        cmd = 'ATS90=3 S91=1 S92=1 S116?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSCN'
            prefix = '+QSCN:'
            return int(self._at_command_response(cmd, prefix)) / 100
        except ValueError:
            return 0
    def get_signal_quality(self) -> SignalQuality:
        """Get a qualitative indicator from 0..5 of the satellite signal."""
        snr = self.get_rssi()
        if snr >= SignalLevelRegional.INVALID.value:
            return SignalQuality.WARNING
        if snr >= SignalLevelRegional.BARS_5.value:
            return SignalQuality.STRONG
        if snr >= SignalLevelRegional.BARS_4.value:
            return SignalQuality.GOOD
        if snr >= SignalLevelRegional.BARS_3.value:
            return SignalQuality.MID
        if snr >= SignalLevelRegional.BARS_2.value:
            return SignalQuality.LOW
        if snr >= SignalLevelRegional.BARS_1.value:
            return SignalQuality.WEAK
        return SignalQuality.NONE
    def get_acquisition_detail(self) -> AcquisitionInfo:
        """Get the detailed satellite acquisition status.
        Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr`
        cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QEVNT=3,1'
            prefix = '+QEVNT:'
        result_str = self._at_command_response(cmd, prefix, timeout=10)
        if self._mfr == Manufacturer.ORBCOMM:
            results = [int(x) for x in result_str.split('\n')]
            ctrl_state = ControlState(results[0])
            beam_state = BeamState(results[1])
            rssi = float(results[2]) / 100
            vcid = results[3]
        elif self._mfr == Manufacturer.QUECTEL:
            # Workaround Quectel 20230731 documentation error says +QEVNT:
            result_str = result_str.replace('+QEVENT:', '').strip()
            results = [int(x) for x in result_str.split(',')]
            # <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
            #   <class>,<subclass>,<priority>,<data0>,...
            data0 = 7   # list index where trace data starts
            ctrl_state = ControlState(results[data0+22])
            beam_state = BeamState(results[data0+23])
            rssi = float(results[data0+16]) / 100
            vcid = results[data0+1]
        return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
    def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage':
        """Submits data to send as a mobile-originated message.
        If a `message_name` is not supplied one will be generated using the
        least significant 8 digits of unix timestamp.
            data (bytes): The data to send.
        Keyword Args:
            message_name (str): Optional handle for message in Tx queue. Max 8
                characters for Orbcomm modem or 12 for Quectel.
            priority (int): Optional priority 1 (highest) .. 4 (low, default).
                May use `MessagePriority`.
            codec_sin (int): Optional first byte of payload to add as a codec
                service identifier, must be in range 16..255.
            codec_min (int): Optional second byte of payload to add as a codec
                message identifier, must be in range 0..255.
            return_message (bool): If set, returns a `MoMessage` instead of the
                message handle.
            Message handle (str) or `MoMessage` if `return_message` kwarg is set.
            `ValueError` for various parameter limit violations.
        data_size = len(data)
        msg_payload_sin_min = b''
        message_name = kwargs.get('message_name', '')
        priority = MessagePriority(kwargs.get('priority',
        codec_sin: int = kwargs.get('codec_sin', -1)
        codec_min: int = kwargs.get('codec_min', -1)
        if codec_sin > -1:
            data_size += 1
            msg_payload_sin_min += codec_sin.to_bytes(1, 'big')
        if codec_min > -1:
            data_size += 1
            msg_payload_sin_min += codec_min.to_bytes(1, 'big')
        if not 2 <= data_size <= MSG_MO_MAX_SIZE:
            raise ValueError('Invalid mobile-originated message size')
        if message_name and len(message_name) > self._mo_msg_name_len_max:
            raise ValueError('Message name too long')
        data_index = 0
        if codec_sin <= -1:
            codec_sin = data[0]
            data_index += 1
            data_size -= 1
        if codec_sin not in range(16, 256):
            raise ValueError('Illegal first payload byte SIN must be 16..255')
        if codec_min <= -1:
            codec_min = data[1]
            data_index += 1
            data_size -= 1
        if codec_min > 255:
            raise ValueError('Invalid second payload byte MIN must be 0..255')
        max_name_len = self._mo_msg_name_len_max
        if message_name and len(message_name) > max_name_len:
            raise ValueError(f'Invalid message name longer than {max_name_len}')
        if len(message_name) == 0:
            message_name = f'{int(time.time())}'[-max_name_len:]
        # Convert to base64 string for serial efficiency
        #   no effect on OTA size, modem always decodes and sends raw bytes OTA
        data_format = DataFormat.BASE64
        formatted_data = base64.b64encode(data[data_index:]).decode('utf-8')
        cmd = 'AT%MGRT='
        codec_sep = '.'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGT='
            codec_sep = ','
        cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}'
        if kwargs.get('return_message', False) is True:
            return MoMessage(message_name, priority, MessageState.TX_READY,
                                payload=(msg_payload_sin_min + data))
        return message_name
    def send_text(self, text: str, **kwargs) -> 'str|MoMessage':
        """Submits a text string to send as data.
        If `codec_sin` kwarg is not provided 128 is prepended as the first byte.
        If `codec_min` kwarg is not provided 1 is prepended as the second byte.
        Other kwargs as per `send_data`.
            text (str): The text message to send.
            (str) The message name assigned or MoMessage if kwarg
                `return_message` is set.
        data = b''
        codec_sin = int(kwargs.get('codec_sin', 128))
        data += codec_sin.to_bytes(1, 'big')
        codec_min = int(kwargs.get('codec_min', 1))
        data += codec_min.to_bytes(1, 'big')
        data += text.encode()
        flowthru = ['message_name', 'priority', 'return_message']
        next_kwargs = { k:v for k, v in kwargs if k in flowthru }
        return self.send_data(data, **next_kwargs)
    def cancel_mo_message(self, message_name: str) -> bool:
        """Attempts to cancel a previously submitted mobile-originated message.
            message_name (str): The mobile-originated message handle to delete.
        _log.debug('Attempting to cancel MO message %s', message_name)
        cmd = 'AT%MGRC'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGC'
        cmd += f'="{message_name}"'
        message_states = self.get_mo_message_states(message_name)
        if len(message_states) > 0:
            state = message_states[0].state
            if state == MessageState.TX_CANCELLED:
                return True
        elif self._is_simulator:
            return True
        _log.warn('Failed to cancel message %s', message_name)
        return False
    def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]':
        """Get a list of mobile-originated message states in the modem Tx queue.
            message_name (str): Optional filter on message name.
            A list of `MoMessage` objects including state and metadata.
        cmd = 'AT%MGRS'
        prefix = '%MGRS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGS'
            prefix = '+QSMGS:'
        if message_name and not self._is_simulator:
            # Orbcomm Modem Simulator returns ERROR for %MGRS= command
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=True)
    def _parse_message_states(self,
                              response_str: str,
                              is_mo: bool,
                              ) -> 'list[NimoMessage]':
        """Parses textual metadata to build a SatelliteMessageState."""
        mo_states = []
        if not response_str:
            return mo_states
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message states from %s',
                       'MO' if is_mo else 'MT', response_str)
        states_meta = [m for m in response_str.split('\n') if m != '']
        for meta in states_meta:
            message = MoMessage() if is_mo else MtMessage()
            for field_idx, field_data in enumerate(meta.split(',')):
                self._update_message_state(message, field_idx,
                                           field_data, is_mo)
        return mo_states
    def _update_message_state(self,
                              message_state: NimoMessage,
                              field_idx: int,
                              field_data: str,
                              is_mo: bool) -> None:
        """Parse textual metadata to update a message's state."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message state index %d: %s',
                       'MO' if is_mo else 'MT', field_idx, field_data)
        mfr = self._mfr
        if field_idx == 0:
   = field_data.replace('"', '')
            if vlog(VLOG_TAG):
                _log.debug('Message name: %s',
        elif field_idx == 1 and mfr == Manufacturer.ORBCOMM:
            if vlog(VLOG_TAG):
                _log.debug('Ignoring msgNum %s', field_data)
        elif ((field_idx == 2 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
            message_state.priority = MessagePriority(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message priority %s',
        elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
            if vlog(VLOG_TAG):
                _log.debug('Ignoring codec SIN %s', field_data)
        elif ((field_idx == 4 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
            message_state.state = MessageState(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message state: %s',
        elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
            message_state.length = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Message size: %d bytes', message_state.length)
        elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 5 and mfr == Manufacturer.QUECTEL)):
            message_state.bytes_delivered = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Bytes delivered: %d', message_state.bytes_delivered)
            _log.warning('Unhandled field index %d (%s) for manufacturer %s',
                         field_idx, 'MO' if is_mo else 'MT',
    def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]':
        """Get a list of mobile-terminated message states in the modem Tx queue.
            message_name (str): Optional filter on message name.
            A list of `MtMessage` objects including state and metadata.
        cmd = 'AT%MGFN' if not message_name else 'AT%MGFS'
        prefix = '%MGFN:' if not message_name else 'AT%MGFS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS'
            prefix = '+QRMGN:' if not message_name else '+QRMGS:'
        if message_name and not self._is_simulator:
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=False)
    def get_mt_message(self, message_name: str) -> 'MtMessage|None':
        """Get a mobile-terminated message from the modem's Rx queue by name."""
        cmd = 'AT%MGFG'
        prefix = '%MGFG:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+GRMGR'
            prefix = '+GRMGR:'
        data_format = DataFormat.BASE64
        cmd += f'="{message_name}",{data_format}'
        response = self._at_command_response(cmd, prefix)
        if response:
            return self._parse_mt_message(response)
        return None
    def _parse_mt_message(self, meta: str) -> MtMessage:
        """Parse textual metadata to build a MtMessage."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing MT message from meta: %s', meta)
        data_includes_sin = False
        mfr = self._mfr
        message = MtMessage()
        for field_idx, field_data in enumerate(meta.split(',')):
            if field_idx == 0:
       = field_data.replace('"', '')
                if vlog(VLOG_TAG):
                    _log.debug('Message name: %s',
            elif (field_idx == 1 and mfr == Manufacturer.ORBCOMM):
                if vlog(VLOG_TAG):
                    _log.debug('Ignoring msgNum %s', field_data)
            elif (field_idx == 2 and mfr == Manufacturer.ORBCOMM):
                message.priority = MessagePriority(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message priority %s',
            elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
                codec_sin = int(field_data)
                if not data_includes_sin:
                    message.payload += codec_sin.to_bytes(1, 'big')
                if vlog(VLOG_TAG):
                    _log.debug('Added SIN as first payload byte: %d', codec_sin)
            elif (field_idx == 4 and mfr == Manufacturer.ORBCOMM):
                message.state = MessageState(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message state %s',
            elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
                message.length = int(field_data)
                if vlog(VLOG_TAG):
                    _log.debug('Message size: %d bytes', message.length)
            elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
                data_format = DataFormat(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Data format %s',
            elif ((field_idx == 7 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
                if vlog(VLOG_TAG):
                    _log.debug('Decoding payload from: %s', field_data)
                if message.length > 0:
                    if data_format == DataFormat.BASE64:
                        message.payload += base64.b64decode(field_data)
                    elif data_format == DataFormat.HEX:
                        message.payload += bytes.fromhex(field_data)
                    else:   # DataFormat.TEXT
                        message.payload += field_data.encode()
                    if message.length != len(message.payload):
                        _log.warn('Message length mismatch')
        return message
    def delete_mt_message(self, message_name: str) -> bool:
        """Remove a mobile-terminated message from the modem's Rx queue."""
        cmd = 'AT%MGFM'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGM'
        cmd += f'="{message_name}"'
        check = self.get_mt_message_states(message_name)
        if check and check[0].state == MessageState.RX_RETRIEVED:
            return True
        return False
    def receive_data(self, message_name: str) -> 'bytes|None':
        """Get the raw data from a mobile-terminated message."""
        message = self.get_mt_message(message_name)
        if message:
            return message.payload
        return None
    def get_gnss_mode(self) -> GnssMode:
        """Get the modem's GNSS receiver mode."""
        cmd = 'ATS39?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSMOD?'
            prefix = '+QGNSSMOD:'
        response = self._at_command_response(cmd, prefix)
        if self._mfr == Manufacturer.QUECTEL:
            return GnssModeQuectel(int(response))
        return GnssModeOrbcomm(int(response))
    def set_gnss_mode(self, gnss_mode: GnssMode) -> None:
        """Get the modem's GNSS receiver mode."""
        cmd = f'ATS39={gnss_mode}'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            if not GnssModeQuectel.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
            cmd = f'AT+QGNSSMOD={gnss_mode}'
            prefix = '+QGNSSMOD:'
            if not GnssModeOrbcomm.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
        self._at_command_response(cmd, prefix)
    def get_gnss_continuous(self) -> int:
        """Get the modem's GNSS continuous refresh interval in seconds."""
        cmd = 'ATS55?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSCW?'
            prefix = '+QGNSSCW:'
            return int(self._at_command_response(cmd, prefix))
        except ValueError:
            return 0
    def set_gnss_continuous(self, interval: int) -> None:
        """Set the modem's GNSS continuous refresh interval in seconds.
            interval (int): Automatic update interval 0..30 seconds.
            `True` if successful.
            `ValueError` if invalid interval is specified.
        if interval not in range (0, 31):
            raise ValueError('Invalid GNSS refresh interval')
        cmd = f'ATS55={interval}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QGNSSCW={interval}'
    def get_nmea_data(self,
                      stale_secs: int = 1,
                      wait_secs: int = 35,
                      rmc: bool = True,
                      gga: bool = True,
                      gsa: bool = True,
                      gsv: bool = False,
                      ) -> str:
        """Get a set of NMEA data detailing the modem's location.
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
            rmc (bool): Include Recommended Minimum data.
            gga (bool): Include altitude and fix quality data.
            gsa (bool): Include Dilution of Precision data.
            gsv (bool): Include verbose GNSS satellite details.
        cmd = 'AT%GPS'
        prefix = '%GPS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSS'
            prefix = '+QGNSS:'
        cmd += f'={stale_secs},{wait_secs}'
        if rmc:
            cmd += ',"RMC"'
        if gga:
            cmd += ',"GGA"'
        if gsa:
            cmd += ',"GSA"'
        if gsv:
            cmd += ',"GSV"'
            response = self._at_command_response(cmd, prefix, wait_secs + 5)
            return response
        except ModemAtError as exc:
            if exc.error_code != AtErrorCode.GNSS_TIMEOUT:
        return ''
    def get_location(self,
                     stale_secs: int = 1,
                     wait_secs: int = 35) -> 'ModemLocation|None':
        """Get the modem's location.
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
            ModemLocation object if GNSS does not time out waiting for fix.
        nmea_data = self.get_nmea_data(stale_secs, wait_secs)
        if nmea_data:
            return get_location_from_nmea_data(nmea_data)
        return None
    def get_satellite_info(self) -> 'SatelliteLocation|None':
        """Get the satellite's information including azimuth and elevation.
        Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
            `SatelliteLocation` object (azimuth, elevation) if determinable.
        geobeam = None
        modem_location = self.get_location()
        if (modem_location is not None and
            self.get_network_status() > NetworkStatus.RX_SEARCHING):
            # satellite has been found
            cmd = 'ATS90=3 S91=5 S92=1 S102?'
            prefix = ''
            if self._mfr == Manufacturer.QUECTEL:
                cmd = 'AT+QEVNT=3,5'
                prefix = '+QEVNT:'
            response = self._at_command_response(cmd, prefix)
            if self._mfr == Manufacturer.QUECTEL:
                # workaround documentation error
                response = response.replace('+QEVENT:', '').strip()
                response = response.split(',')[9]
            geobeam = GeoBeam(int(response))
            return get_satellite_location(modem_location, geobeam)
        return None
    def get_event_mask(self) -> int:
        """Get the set of monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS88?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    def set_event_mask(self, event_mask: int) -> None:
        """Set monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        max_bits = 12
        if not isinstance(event_mask, int) or event_mask > 2**max_bits-1:
            raise ValueError('Invalid event bitmask')
        cmd = f'ATS88={event_mask}'
    def get_events_asserted_mask(self) -> int:
        """Get the set of events that are active following a notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS89?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    def get_trace_event_monitor(self,
                                asserted_only: bool = False,
                                ) -> 'list[tuple[int, int]]':
        """Get the list of monitored Trace Events.
            asserted_only (bool): If True returns only asserted monitored events
            A list of tuples (trace_class, trace_subclass)
            `ModemError` if unsupported by the modem type.
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT%EVMON'
        prefix = '%EVMON:'
        trace_events = []
        events = self._at_command_response(cmd, prefix).split(',')
        for event in events:
            trace_class = int(event.split('.')[0])
            trace_subclass = int(event.split('.')[1].replace('*', ''))
            if not asserted_only or event.endswith('*'):
                trace_events.append((trace_class, trace_subclass))
        return trace_events
    def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None:
        """Set the list of monitored trace events."""
        cmd = 'AT%EVMON='
        for event in events:
            if not cmd.endswith('='):
                cmd += ','
            cmd += f'{event[0]}.{event[1]}'
    def get_trace_events_cached(self) -> 'list[tuple[int, int]]':
        """Get a list of trace events cached."""
        return self.get_trace_event_monitor(True)
    def get_trace_event_data(self,
                             event: 'tuple[int, int]',
                             decode: bool = False,
                             ) -> 'list[int]|dict[str, int]':
        """Get the trace event data.
            event (tuple): The trace (class, subclass)
            decode (bool): Decodes raw data to dictionary (not implemented)
        cmd = f'AT%EVNT={event[0]},{event[1]}'
        prefix = '%EVNT:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = cmd.replace('%EVNT', '+QEVNT')
            prefix = '+QEVENT:'   # documented as +QEVNT
        trace = self._at_command_response(cmd, prefix)
        if decode:
            raise NotImplementedError
        return [int(i) for i in trace.split(',')]
    def get_urc_ctl(self) -> int:
        """Get the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = 'AT+QURCCTL?'
        prefix = '+QURCCTL:'
            return int(self._at_command_response(cmd, prefix), 16)
        except ValueError:
            return 0
    def set_urc_ctl(self, qurc_mask: int) -> None:
        """Set the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = f'AT+QURCCTL=0x{qurc_mask:04X}'
    def get_urc(self) -> 'UrcCode|None':
        """Get the pending Unsolicited Result Code if one is present."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        eol = '\r\n' if self._modem.verbose else '\r'
        result = self._modem.read_rx_buffer(read_until=eol)
        if result:
            result = result.replace('+QURC:', '').strip()
                return UrcCode(int(result))
            except ValueError:
                return UrcCode[result]
        return None
    def get_power_mode(self) -> PowerMode:
        """Get the modem's power mode configuration."""
        cmd = 'ATS50?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPMD?'
            prefix = '+QPMD:'
        return PowerMode(int(self._at_command_response(cmd, prefix)))
    def set_power_mode(self, power_mode: PowerMode) -> None:
        """Set the modem's power mode configuration."""
        if not PowerMode.is_valid(power_mode):
            raise ValueError('Invalid Power Mode')
        cmd = f'ATS50={power_mode}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QPMD={power_mode}'
    def get_wakeup_period(self) -> WakeupPeriod:
        """Get the modem's wakeup period configuration."""
        cmd = 'ATS51?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QWKUPCFG?'
            prefix = '+QWKUPCFG:'
        if self._mfr == Manufacturer.QUECTEL:
            return WakeupPeriod(int(
                self._at_command_response(cmd, prefix).split(',')[0]))
        return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
    def set_wakeup_period(self,
                          wakeup_period: WakeupPeriod,
                          wakeup_way: 'WakeupWay|None' = None,
                          ) -> None:
        """Set the modem's wakeup period configuration.
        The configuration does not update until confimed by the network.
        if not WakeupPeriod.is_valid(wakeup_period):
            raise ValueError('Invalid wakeup period')
        cmd = f'ATS51={wakeup_period}'
        if self._mfr == Manufacturer.QUECTEL:
            if wakeup_way is None:
                query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:')
                wakeup_way = WakeupWay(int(query.split(',')[1]))
            cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}'
    def get_wakeup_way(self) -> WakeupWay:
        """Get the modem wakeup method."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QWKUPCFG?'
        prefix = '+QWKUPCFG:'
        wakeup_way = self._at_command_response(cmd, prefix).split(',')[1]
        return WakeupWay(int(wakeup_way))
    def power_down(self) -> None:
        """Prepare the modem for power-down."""
        cmd = 'AT%OFF'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPOWD=2'
    def get_workmode(self) -> WorkMode:
        """Get the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QMOD?'
        prefix = '+QMOD:'
        return WorkMode(int(self._at_command_response(cmd, prefix)))
    def set_workmode(self, workmode: WorkMode) -> None:
        """Set the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        if not WorkMode.is_valid(workmode):
            raise ValueError('Invalid workmode')
        cmd = f'AT+QMOD={workmode}'
    def get_deepsleep_enable(self) -> bool:
        """Get the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QSCLK?'
        prefix = '+QSCLK:'
        return bool(int(self._at_command_response(cmd, prefix)))
    def set_deepsleep_enable(self, enable: bool) -> None:
        """Set the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
    def get_register(self, s_register_number: int) -> 'int|None':
        """Get a modem register value."""
        cmd = f'ATS{s_register_number}?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return None
    def set_register(self, s_register_number: int, value: int) -> None:
        """Set a modem register value."""
        cmd = f'ATS{s_register_number}={value}'
    def get_all_registers(self) -> dict:
        """Get a dictionary of modem register values."""
        raise NotImplementedError


class AcquisitionInfo (ctrl_state: ControlState = ControlState.STOPPED, beam_state: BeamState = BeamState.IDLE, rssi: float = 0.0, vcid: int = 0)

Details about the satellite acquisition state of the modem.


ctrl_state : ControlState
Primary network acquisition state.
beam_state : BeamState
Secondary beam acquistion state.
rssi : float
Signal indicator Carrier to Noise ratio (dB-Hz).
vcid : int
Virtual carrier identifier for low-level sanity check.
Expand source code
class AcquisitionInfo:
    """Details about the satellite acquisition state of the modem.
        ctrl_state (ControlState): Primary network acquisition state.
        beam_state (BeamState): Secondary beam acquistion state.
        rssi (float): Signal indicator Carrier to Noise ratio (dB-Hz).
        vcid (int): Virtual carrier identifier for low-level sanity check.
    ctrl_state: ControlState = ControlState.STOPPED
    beam_state: BeamState = BeamState.IDLE
    rssi: float = 0.0
    vcid: int = 0

Class variables

var beam_stateBeamState
var ctrl_stateControlState
var rssi : float
var vcid : int
class Manufacturer (value, names=None, *, module=None, qualname=None, type=None, start=1)

Supported NIMO modem implementations.

Expand source code
class Manufacturer(IntEnum):
    """Supported NIMO modem implementations."""
    NONE = 0
    ORBCOMM = 1
    QUECTEL = 2


  • enum.IntEnum
  • enum.Enum

Class variables

var NONE
class ModemAtError (*args, **kwargs)

AT command related errors with error_code property.

Expand source code
class ModemAtError(ModemError):
    """AT command related errors with error_code property."""
    def error_code(self) -> AtErrorCode:
        return AtErrorCode[self.args[0]]


  • ModemError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var error_codeAtErrorCode
Expand source code
def error_code(self) -> AtErrorCode:
    return AtErrorCode[self.args[0]]
class ModemCrc (*args, **kwargs)

Error calculating response CRC.

Expand source code
class ModemCrc(ModemError):
    """Error calculating response CRC."""


  • ModemError
  • builtins.Exception
  • builtins.BaseException
class ModemCrcConfig (*args, **kwargs)

Request/response mismatch of CRC presence.

Expand source code
class ModemCrcConfig(ModemError):
    """Request/response mismatch of CRC presence."""


  • ModemError
  • builtins.Exception
  • builtins.BaseException
class ModemError (*args, **kwargs)

Base class for NIMO modem errors.

Expand source code
class ModemError(Exception):
    """Base class for NIMO modem errors."""


  • builtins.Exception
  • builtins.BaseException


class ModemTimeout (*args, **kwargs)

Serial response timeout.

Expand source code
class ModemTimeout(ModemError):
    """Serial response timeout."""


  • ModemError
  • builtins.Exception
  • builtins.BaseException
class NimoModem (serial_port: str, **kwargs)

A class for NIMO satellite IoT modem interaction.

Instantiate the NimoModem object.

For additional kwargs see PySerial API:


serial_port : str
The path to the modem's serial port.

Keyword Args: baudrate (int): The baud rate of the modem (default 9600)


ConnectionError if unable to connect to the serial port.

Expand source code
class NimoModem:
    """A class for NIMO satellite IoT modem interaction."""
    # __slots__ = ('_modem', '_mobile_id',
    #              '_mfr_code', '_modem_booted',
    #              )
    def __init__(self, serial_port: str, **kwargs) -> None:
        """Instantiate the NimoModem object.
        For additional kwargs see PySerial API:
            serial_port (str): The path to the modem's serial port.
        Keyword Args:
            baudrate (int): The baud rate of the modem (default 9600)
            `ConnectionError` if unable to connect to the serial port.
            self._serial = Serial(serial_port, **kwargs)
        except Exception as exc:
            raise ConnectionError('Unable to connect to serial') from exc
        self._modem: AtCommandBuffer = AtCommandBuffer(self._serial)
        self._baudrate: int = self._serial.baudrate
        self._is_connected: bool = False
        self._modem_booted: bool = False
        self._mobile_id: str = ''
        self._manufacturer: Manufacturer = Manufacturer.NONE
    def is_ready(self) -> bool:
        return not self._modem._lock.locked()
    def crc_enabled(self) -> bool:
        return self._modem.crc
    def modem_booted(self) -> bool:
        return self._modem_booted
    def _mfr(self) -> Manufacturer:
        """Used internally to support different manufacturer commands."""
        if not self._manufacturer:
            _log.debug('Using %s manufacturer commands', self._manufacturer)
        return self._manufacturer
    def _mo_msg_name_len_max(self) -> int:
        """Used internally to restrict the length of the MO message name."""
        maxlen = MSG_MO_NAME_MAX_LEN
        if self._mfr == Manufacturer.QUECTEL:
            maxlen = MSG_MO_NAME_QMAX_LEN
        return maxlen
    def _at_command_response(self,
                             command: str,
                             prefix: str = '',
                             timeout: int = DEFAULT_AT_TIMEOUT) -> str:
        """Send a command and return the response.
        Blocks until response has been received.
            command (str): The AT command to send.
            prefix (str): Optional prefix to remove from response.
            timeout (int): Maximum time in seconds to wait for response.
            `ModemTimeout` if no response is received.
            `ModemCrcConfig` if CRC is not used on both request/response.
            `ModemAtError` for other cases of errored response.
        err = self._modem.read_at_response(prefix, timeout)
        if err == AtErrorCode.OK:
            return self._modem.get_response()
        elif err == AtErrorCode.TIMEOUT:
            raise ModemTimeout(f'AT response timed out after {timeout} seconds')
        elif err == AtErrorCode.CRC_CONFIG_MISMATCH:
            raise ModemCrcConfig('Reponse checksum {}expected'.format(
                                 '' if self.crc_enabled else 'un'))
        elif err == AtErrorCode.INVALID_RESPONSE_CRC:
            raise ModemCrc(
            err = self.get_last_error_code()
            if err == AtErrorCode.INVALID_CRC:
                raise ModemCrc(
            raise ModemAtError(
    def connect(self) -> None:
        """Attach to the modem via serial communications.
        Provided for backward compatibility. Connection is usually automatic
        when instantiating `NimoModem`.
        if not self._serial.is_open:
    def disconnect(self) -> None:
        """Detach from the modem serial communications.
        Provided for backward compatibility. Not typically required.
        self._is_connected = False
        self._modem_booted = False
        if self._serial.is_open:
    def is_connected(self) -> bool:
        """Indicates if the modem is responding to a basic AT query."""
            self._is_connected = True
            self._modem_booted = True
            return True
        except ModemError:
            self._is_connected = False
            self._modem_booted = False
            return False
    def baudrate(self) -> int:
        """The baudrate of the serial connection."""
        return self._modem.serial.baudrate
    def baudrate(self, baudrate: int):
        """Set the baud rate of the modem and adjust the serial rate."""
        if baudrate not in [9600, 115200]:
            raise ValueError('Invalid baudrate')
        self._modem.serial.baudrate = baudrate
    def retry_baudrate(self) -> bool:
        for baud in BAUDRATES:
            self._modem.serial.baudrate = baud
            if self.is_connected():
                return True
        return False
    def await_boot(self, boot_timeout: int = 10) -> bool:
        """Indicates if a boot string is received within a timeout window.
        Use `is_connected` before waiting for boot.
            boot_timeout (int): The maximum time to wait in seconds.
            True if a valid boot string was received inside the timeout.
        boot_strings = ['ST Version', 'RDY']
        _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout)
        rx_data = ''
        started = time.time()
        while time.time() - started < boot_timeout and not self._modem_booted:
            while self._modem.is_data_waiting():
                rx_data += self._modem.read_rx_buffer()
            if rx_data and any(b in rx_data for b in boot_strings):
                self._modem_booted = True
                _log.debug('Found boot string - clearing Rx buffer')
                while self._modem.is_data_waiting():
                    rx_data += self._modem.read_rx_buffer()
        return self._modem_booted
    def get_last_error_code(self) -> AtErrorCode:
        """Get the last error code from the modem."""
        return AtErrorCode(int(self._at_command_response('ATS80?')))
    def initialize(self,
                   echo: bool = True,
                   verbose: bool = True,
                   ) -> bool:
        """Initialize the modem AT configuration for Echo and Verbose."""
        at_command = (f'ATZ;E{int(echo)};V{int(verbose)}')
            return True
        except ModemCrcConfig:
  'Attempting re-initialize with CRC enabled')
            return True
    def set_crc(self, enable: bool = False) -> bool:
        """Enable or disable CRC error checking on the modem serial port."""
            return True
        except ModemCrcConfig:
            if ((self._modem.crc and enable) or
                (not self._modem.crc and not enable)):
                return True
            return False
    def reset_factory_config(self) -> None:
        """Reset the modem's factory default configuration."""
    def save_config(self) -> None:
        """Store the current configuration to modem non-volatile memory."""
    def get_mobile_id(self) -> str:
        """Get the modem's globally unique identifier."""
        if not self._mobile_id:
                self._mobile_id = self._at_command_response('AT+GSN', '+GSN:')
                if vlog(VLOG_TAG):
                    _log.debug('Cached Mobile ID %s', self._mobile_id)
            except ModemError:
                self._mobile_id = ''
        return self._mobile_id
    def _is_simulator(self) -> bool:
        return self.get_mobile_id().startswith('00000000')
    def get_manufacturer(self) -> str:
        """Get the manufacturer name."""
        if not self._manufacturer:
                mfr = self._at_command_response('ATI')
                if 'quectel' in mfr.lower():
                    self._manufacturer = Manufacturer.QUECTEL
                    if not any(m in mfr.lower()
                               for m in ['orbcomm', 'skywave']):
                        _log.warning('Unsupported manufacturer %s', mfr)
                    self._manufacturer = Manufacturer.ORBCOMM
                if vlog(VLOG_TAG):
                    _log.debug('Caching manufacturer: %s',
            except ModemError:
                self._manufacturer = Manufacturer.NONE
    def get_model(self) -> str:
        """Get the manufacturer model name."""
        cmd = 'ATI4'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'ATI'
            response = self._at_command_response(cmd)
            if response:
                if self._mfr == Manufacturer.QUECTEL:
                    response = response.split('\n')[1]
            return response
        except ModemError:
            return ''
    def get_firmware_version(self) -> str:
        """Get the modem's firmware version."""
        # TODO: Firmware structure with hardware, firmware, software?
        return self._at_command_response('AT+GMR', '+GMR:')
    def get_system_time(self) -> int:
        """Get the system/GNSS time from the modem."""
            nimo_time = self._at_command_response('AT%UTC', '%UTC:')
            iso_time = nimo_time.replace(' ', 'T') + 'Z'
            return iso_to_ts(iso_time)
        except ModemError:
            return 0
    def get_temperature(self) -> 'int|None':
        """Get the processor temperature in Celsius."""
            return int(int(self._at_command_response('ATS85?')) / 10)
        except ValueError:
            return None
    def is_transmit_allowed(self) -> bool:
        """Indicates if the modem is able to transmit data."""
        return self.get_network_status() == 5
    def is_blocked(self) -> bool:
        """Indicates if line-of-sight to the satellite is blocked."""
        return self.get_network_status() == 8
    def is_muted(self) -> bool:
        """Indicates if the modem has been muted (disallowed to transmit data).
        return self.get_network_status() == 7
    def is_updating_network(self) -> bool:
        """Indicates if the modem is updating network information.
        The modem should not be powered down during a network update.
        return self.get_network_status() == 4
    def get_network_status(self) -> NetworkStatus:
        """Get the current satellite acquisition status."""
        cmd = 'ATS54?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QREG?'
            prefix = '+QREG:'
        return NetworkStatus(int(self._at_command_response(cmd, prefix)))
    def get_rssi(self) -> float:
        """Get the current Received Signal Strength Indicator.
        Also referred to as SNR or C/N0 (dB-Hz)
        cmd = 'ATS90=3 S91=1 S92=1 S116?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSCN'
            prefix = '+QSCN:'
            return int(self._at_command_response(cmd, prefix)) / 100
        except ValueError:
            return 0
    def get_signal_quality(self) -> SignalQuality:
        """Get a qualitative indicator from 0..5 of the satellite signal."""
        snr = self.get_rssi()
        if snr >= SignalLevelRegional.INVALID.value:
            return SignalQuality.WARNING
        if snr >= SignalLevelRegional.BARS_5.value:
            return SignalQuality.STRONG
        if snr >= SignalLevelRegional.BARS_4.value:
            return SignalQuality.GOOD
        if snr >= SignalLevelRegional.BARS_3.value:
            return SignalQuality.MID
        if snr >= SignalLevelRegional.BARS_2.value:
            return SignalQuality.LOW
        if snr >= SignalLevelRegional.BARS_1.value:
            return SignalQuality.WEAK
        return SignalQuality.NONE
    def get_acquisition_detail(self) -> AcquisitionInfo:
        """Get the detailed satellite acquisition status.
        Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr`
        cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QEVNT=3,1'
            prefix = '+QEVNT:'
        result_str = self._at_command_response(cmd, prefix, timeout=10)
        if self._mfr == Manufacturer.ORBCOMM:
            results = [int(x) for x in result_str.split('\n')]
            ctrl_state = ControlState(results[0])
            beam_state = BeamState(results[1])
            rssi = float(results[2]) / 100
            vcid = results[3]
        elif self._mfr == Manufacturer.QUECTEL:
            # Workaround Quectel 20230731 documentation error says +QEVNT:
            result_str = result_str.replace('+QEVENT:', '').strip()
            results = [int(x) for x in result_str.split(',')]
            # <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
            #   <class>,<subclass>,<priority>,<data0>,...
            data0 = 7   # list index where trace data starts
            ctrl_state = ControlState(results[data0+22])
            beam_state = BeamState(results[data0+23])
            rssi = float(results[data0+16]) / 100
            vcid = results[data0+1]
        return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
    def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage':
        """Submits data to send as a mobile-originated message.
        If a `message_name` is not supplied one will be generated using the
        least significant 8 digits of unix timestamp.
            data (bytes): The data to send.
        Keyword Args:
            message_name (str): Optional handle for message in Tx queue. Max 8
                characters for Orbcomm modem or 12 for Quectel.
            priority (int): Optional priority 1 (highest) .. 4 (low, default).
                May use `MessagePriority`.
            codec_sin (int): Optional first byte of payload to add as a codec
                service identifier, must be in range 16..255.
            codec_min (int): Optional second byte of payload to add as a codec
                message identifier, must be in range 0..255.
            return_message (bool): If set, returns a `MoMessage` instead of the
                message handle.
            Message handle (str) or `MoMessage` if `return_message` kwarg is set.
            `ValueError` for various parameter limit violations.
        data_size = len(data)
        msg_payload_sin_min = b''
        message_name = kwargs.get('message_name', '')
        priority = MessagePriority(kwargs.get('priority',
        codec_sin: int = kwargs.get('codec_sin', -1)
        codec_min: int = kwargs.get('codec_min', -1)
        if codec_sin > -1:
            data_size += 1
            msg_payload_sin_min += codec_sin.to_bytes(1, 'big')
        if codec_min > -1:
            data_size += 1
            msg_payload_sin_min += codec_min.to_bytes(1, 'big')
        if not 2 <= data_size <= MSG_MO_MAX_SIZE:
            raise ValueError('Invalid mobile-originated message size')
        if message_name and len(message_name) > self._mo_msg_name_len_max:
            raise ValueError('Message name too long')
        data_index = 0
        if codec_sin <= -1:
            codec_sin = data[0]
            data_index += 1
            data_size -= 1
        if codec_sin not in range(16, 256):
            raise ValueError('Illegal first payload byte SIN must be 16..255')
        if codec_min <= -1:
            codec_min = data[1]
            data_index += 1
            data_size -= 1
        if codec_min > 255:
            raise ValueError('Invalid second payload byte MIN must be 0..255')
        max_name_len = self._mo_msg_name_len_max
        if message_name and len(message_name) > max_name_len:
            raise ValueError(f'Invalid message name longer than {max_name_len}')
        if len(message_name) == 0:
            message_name = f'{int(time.time())}'[-max_name_len:]
        # Convert to base64 string for serial efficiency
        #   no effect on OTA size, modem always decodes and sends raw bytes OTA
        data_format = DataFormat.BASE64
        formatted_data = base64.b64encode(data[data_index:]).decode('utf-8')
        cmd = 'AT%MGRT='
        codec_sep = '.'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGT='
            codec_sep = ','
        cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}'
        if kwargs.get('return_message', False) is True:
            return MoMessage(message_name, priority, MessageState.TX_READY,
                                payload=(msg_payload_sin_min + data))
        return message_name
    def send_text(self, text: str, **kwargs) -> 'str|MoMessage':
        """Submits a text string to send as data.
        If `codec_sin` kwarg is not provided 128 is prepended as the first byte.
        If `codec_min` kwarg is not provided 1 is prepended as the second byte.
        Other kwargs as per `send_data`.
            text (str): The text message to send.
            (str) The message name assigned or MoMessage if kwarg
                `return_message` is set.
        data = b''
        codec_sin = int(kwargs.get('codec_sin', 128))
        data += codec_sin.to_bytes(1, 'big')
        codec_min = int(kwargs.get('codec_min', 1))
        data += codec_min.to_bytes(1, 'big')
        data += text.encode()
        flowthru = ['message_name', 'priority', 'return_message']
        next_kwargs = { k:v for k, v in kwargs if k in flowthru }
        return self.send_data(data, **next_kwargs)
    def cancel_mo_message(self, message_name: str) -> bool:
        """Attempts to cancel a previously submitted mobile-originated message.
            message_name (str): The mobile-originated message handle to delete.
        _log.debug('Attempting to cancel MO message %s', message_name)
        cmd = 'AT%MGRC'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGC'
        cmd += f'="{message_name}"'
        message_states = self.get_mo_message_states(message_name)
        if len(message_states) > 0:
            state = message_states[0].state
            if state == MessageState.TX_CANCELLED:
                return True
        elif self._is_simulator:
            return True
        _log.warn('Failed to cancel message %s', message_name)
        return False
    def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]':
        """Get a list of mobile-originated message states in the modem Tx queue.
            message_name (str): Optional filter on message name.
            A list of `MoMessage` objects including state and metadata.
        cmd = 'AT%MGRS'
        prefix = '%MGRS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGS'
            prefix = '+QSMGS:'
        if message_name and not self._is_simulator:
            # Orbcomm Modem Simulator returns ERROR for %MGRS= command
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=True)
    def _parse_message_states(self,
                              response_str: str,
                              is_mo: bool,
                              ) -> 'list[NimoMessage]':
        """Parses textual metadata to build a SatelliteMessageState."""
        mo_states = []
        if not response_str:
            return mo_states
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message states from %s',
                       'MO' if is_mo else 'MT', response_str)
        states_meta = [m for m in response_str.split('\n') if m != '']
        for meta in states_meta:
            message = MoMessage() if is_mo else MtMessage()
            for field_idx, field_data in enumerate(meta.split(',')):
                self._update_message_state(message, field_idx,
                                           field_data, is_mo)
        return mo_states
    def _update_message_state(self,
                              message_state: NimoMessage,
                              field_idx: int,
                              field_data: str,
                              is_mo: bool) -> None:
        """Parse textual metadata to update a message's state."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message state index %d: %s',
                       'MO' if is_mo else 'MT', field_idx, field_data)
        mfr = self._mfr
        if field_idx == 0:
   = field_data.replace('"', '')
            if vlog(VLOG_TAG):
                _log.debug('Message name: %s',
        elif field_idx == 1 and mfr == Manufacturer.ORBCOMM:
            if vlog(VLOG_TAG):
                _log.debug('Ignoring msgNum %s', field_data)
        elif ((field_idx == 2 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
            message_state.priority = MessagePriority(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message priority %s',
        elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
            if vlog(VLOG_TAG):
                _log.debug('Ignoring codec SIN %s', field_data)
        elif ((field_idx == 4 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
            message_state.state = MessageState(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message state: %s',
        elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
            message_state.length = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Message size: %d bytes', message_state.length)
        elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 5 and mfr == Manufacturer.QUECTEL)):
            message_state.bytes_delivered = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Bytes delivered: %d', message_state.bytes_delivered)
            _log.warning('Unhandled field index %d (%s) for manufacturer %s',
                         field_idx, 'MO' if is_mo else 'MT',
    def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]':
        """Get a list of mobile-terminated message states in the modem Tx queue.
            message_name (str): Optional filter on message name.
            A list of `MtMessage` objects including state and metadata.
        cmd = 'AT%MGFN' if not message_name else 'AT%MGFS'
        prefix = '%MGFN:' if not message_name else 'AT%MGFS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS'
            prefix = '+QRMGN:' if not message_name else '+QRMGS:'
        if message_name and not self._is_simulator:
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=False)
    def get_mt_message(self, message_name: str) -> 'MtMessage|None':
        """Get a mobile-terminated message from the modem's Rx queue by name."""
        cmd = 'AT%MGFG'
        prefix = '%MGFG:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+GRMGR'
            prefix = '+GRMGR:'
        data_format = DataFormat.BASE64
        cmd += f'="{message_name}",{data_format}'
        response = self._at_command_response(cmd, prefix)
        if response:
            return self._parse_mt_message(response)
        return None
    def _parse_mt_message(self, meta: str) -> MtMessage:
        """Parse textual metadata to build a MtMessage."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing MT message from meta: %s', meta)
        data_includes_sin = False
        mfr = self._mfr
        message = MtMessage()
        for field_idx, field_data in enumerate(meta.split(',')):
            if field_idx == 0:
       = field_data.replace('"', '')
                if vlog(VLOG_TAG):
                    _log.debug('Message name: %s',
            elif (field_idx == 1 and mfr == Manufacturer.ORBCOMM):
                if vlog(VLOG_TAG):
                    _log.debug('Ignoring msgNum %s', field_data)
            elif (field_idx == 2 and mfr == Manufacturer.ORBCOMM):
                message.priority = MessagePriority(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message priority %s',
            elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
                codec_sin = int(field_data)
                if not data_includes_sin:
                    message.payload += codec_sin.to_bytes(1, 'big')
                if vlog(VLOG_TAG):
                    _log.debug('Added SIN as first payload byte: %d', codec_sin)
            elif (field_idx == 4 and mfr == Manufacturer.ORBCOMM):
                message.state = MessageState(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message state %s',
            elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
                message.length = int(field_data)
                if vlog(VLOG_TAG):
                    _log.debug('Message size: %d bytes', message.length)
            elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
                data_format = DataFormat(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Data format %s',
            elif ((field_idx == 7 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
                if vlog(VLOG_TAG):
                    _log.debug('Decoding payload from: %s', field_data)
                if message.length > 0:
                    if data_format == DataFormat.BASE64:
                        message.payload += base64.b64decode(field_data)
                    elif data_format == DataFormat.HEX:
                        message.payload += bytes.fromhex(field_data)
                    else:   # DataFormat.TEXT
                        message.payload += field_data.encode()
                    if message.length != len(message.payload):
                        _log.warn('Message length mismatch')
        return message
    def delete_mt_message(self, message_name: str) -> bool:
        """Remove a mobile-terminated message from the modem's Rx queue."""
        cmd = 'AT%MGFM'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGM'
        cmd += f'="{message_name}"'
        check = self.get_mt_message_states(message_name)
        if check and check[0].state == MessageState.RX_RETRIEVED:
            return True
        return False
    def receive_data(self, message_name: str) -> 'bytes|None':
        """Get the raw data from a mobile-terminated message."""
        message = self.get_mt_message(message_name)
        if message:
            return message.payload
        return None
    def get_gnss_mode(self) -> GnssMode:
        """Get the modem's GNSS receiver mode."""
        cmd = 'ATS39?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSMOD?'
            prefix = '+QGNSSMOD:'
        response = self._at_command_response(cmd, prefix)
        if self._mfr == Manufacturer.QUECTEL:
            return GnssModeQuectel(int(response))
        return GnssModeOrbcomm(int(response))
    def set_gnss_mode(self, gnss_mode: GnssMode) -> None:
        """Get the modem's GNSS receiver mode."""
        cmd = f'ATS39={gnss_mode}'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            if not GnssModeQuectel.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
            cmd = f'AT+QGNSSMOD={gnss_mode}'
            prefix = '+QGNSSMOD:'
            if not GnssModeOrbcomm.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
        self._at_command_response(cmd, prefix)
    def get_gnss_continuous(self) -> int:
        """Get the modem's GNSS continuous refresh interval in seconds."""
        cmd = 'ATS55?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSCW?'
            prefix = '+QGNSSCW:'
            return int(self._at_command_response(cmd, prefix))
        except ValueError:
            return 0
    def set_gnss_continuous(self, interval: int) -> None:
        """Set the modem's GNSS continuous refresh interval in seconds.
            interval (int): Automatic update interval 0..30 seconds.
            `True` if successful.
            `ValueError` if invalid interval is specified.
        if interval not in range (0, 31):
            raise ValueError('Invalid GNSS refresh interval')
        cmd = f'ATS55={interval}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QGNSSCW={interval}'
    def get_nmea_data(self,
                      stale_secs: int = 1,
                      wait_secs: int = 35,
                      rmc: bool = True,
                      gga: bool = True,
                      gsa: bool = True,
                      gsv: bool = False,
                      ) -> str:
        """Get a set of NMEA data detailing the modem's location.
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
            rmc (bool): Include Recommended Minimum data.
            gga (bool): Include altitude and fix quality data.
            gsa (bool): Include Dilution of Precision data.
            gsv (bool): Include verbose GNSS satellite details.
        cmd = 'AT%GPS'
        prefix = '%GPS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSS'
            prefix = '+QGNSS:'
        cmd += f'={stale_secs},{wait_secs}'
        if rmc:
            cmd += ',"RMC"'
        if gga:
            cmd += ',"GGA"'
        if gsa:
            cmd += ',"GSA"'
        if gsv:
            cmd += ',"GSV"'
            response = self._at_command_response(cmd, prefix, wait_secs + 5)
            return response
        except ModemAtError as exc:
            if exc.error_code != AtErrorCode.GNSS_TIMEOUT:
        return ''
    def get_location(self,
                     stale_secs: int = 1,
                     wait_secs: int = 35) -> 'ModemLocation|None':
        """Get the modem's location.
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
            ModemLocation object if GNSS does not time out waiting for fix.
        nmea_data = self.get_nmea_data(stale_secs, wait_secs)
        if nmea_data:
            return get_location_from_nmea_data(nmea_data)
        return None
    def get_satellite_info(self) -> 'SatelliteLocation|None':
        """Get the satellite's information including azimuth and elevation.
        Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
            `SatelliteLocation` object (azimuth, elevation) if determinable.
        geobeam = None
        modem_location = self.get_location()
        if (modem_location is not None and
            self.get_network_status() > NetworkStatus.RX_SEARCHING):
            # satellite has been found
            cmd = 'ATS90=3 S91=5 S92=1 S102?'
            prefix = ''
            if self._mfr == Manufacturer.QUECTEL:
                cmd = 'AT+QEVNT=3,5'
                prefix = '+QEVNT:'
            response = self._at_command_response(cmd, prefix)
            if self._mfr == Manufacturer.QUECTEL:
                # workaround documentation error
                response = response.replace('+QEVENT:', '').strip()
                response = response.split(',')[9]
            geobeam = GeoBeam(int(response))
            return get_satellite_location(modem_location, geobeam)
        return None
    def get_event_mask(self) -> int:
        """Get the set of monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS88?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    def set_event_mask(self, event_mask: int) -> None:
        """Set monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        max_bits = 12
        if not isinstance(event_mask, int) or event_mask > 2**max_bits-1:
            raise ValueError('Invalid event bitmask')
        cmd = f'ATS88={event_mask}'
    def get_events_asserted_mask(self) -> int:
        """Get the set of events that are active following a notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS89?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    def get_trace_event_monitor(self,
                                asserted_only: bool = False,
                                ) -> 'list[tuple[int, int]]':
        """Get the list of monitored Trace Events.
            asserted_only (bool): If True returns only asserted monitored events
            A list of tuples (trace_class, trace_subclass)
            `ModemError` if unsupported by the modem type.
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT%EVMON'
        prefix = '%EVMON:'
        trace_events = []
        events = self._at_command_response(cmd, prefix).split(',')
        for event in events:
            trace_class = int(event.split('.')[0])
            trace_subclass = int(event.split('.')[1].replace('*', ''))
            if not asserted_only or event.endswith('*'):
                trace_events.append((trace_class, trace_subclass))
        return trace_events
    def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None:
        """Set the list of monitored trace events."""
        cmd = 'AT%EVMON='
        for event in events:
            if not cmd.endswith('='):
                cmd += ','
            cmd += f'{event[0]}.{event[1]}'
    def get_trace_events_cached(self) -> 'list[tuple[int, int]]':
        """Get a list of trace events cached."""
        return self.get_trace_event_monitor(True)
    def get_trace_event_data(self,
                             event: 'tuple[int, int]',
                             decode: bool = False,
                             ) -> 'list[int]|dict[str, int]':
        """Get the trace event data.
            event (tuple): The trace (class, subclass)
            decode (bool): Decodes raw data to dictionary (not implemented)
        cmd = f'AT%EVNT={event[0]},{event[1]}'
        prefix = '%EVNT:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = cmd.replace('%EVNT', '+QEVNT')
            prefix = '+QEVENT:'   # documented as +QEVNT
        trace = self._at_command_response(cmd, prefix)
        if decode:
            raise NotImplementedError
        return [int(i) for i in trace.split(',')]
    def get_urc_ctl(self) -> int:
        """Get the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = 'AT+QURCCTL?'
        prefix = '+QURCCTL:'
            return int(self._at_command_response(cmd, prefix), 16)
        except ValueError:
            return 0
    def set_urc_ctl(self, qurc_mask: int) -> None:
        """Set the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = f'AT+QURCCTL=0x{qurc_mask:04X}'
    def get_urc(self) -> 'UrcCode|None':
        """Get the pending Unsolicited Result Code if one is present."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        eol = '\r\n' if self._modem.verbose else '\r'
        result = self._modem.read_rx_buffer(read_until=eol)
        if result:
            result = result.replace('+QURC:', '').strip()
                return UrcCode(int(result))
            except ValueError:
                return UrcCode[result]
        return None
    def get_power_mode(self) -> PowerMode:
        """Get the modem's power mode configuration."""
        cmd = 'ATS50?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPMD?'
            prefix = '+QPMD:'
        return PowerMode(int(self._at_command_response(cmd, prefix)))
    def set_power_mode(self, power_mode: PowerMode) -> None:
        """Set the modem's power mode configuration."""
        if not PowerMode.is_valid(power_mode):
            raise ValueError('Invalid Power Mode')
        cmd = f'ATS50={power_mode}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QPMD={power_mode}'
    def get_wakeup_period(self) -> WakeupPeriod:
        """Get the modem's wakeup period configuration."""
        cmd = 'ATS51?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QWKUPCFG?'
            prefix = '+QWKUPCFG:'
        if self._mfr == Manufacturer.QUECTEL:
            return WakeupPeriod(int(
                self._at_command_response(cmd, prefix).split(',')[0]))
        return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
    def set_wakeup_period(self,
                          wakeup_period: WakeupPeriod,
                          wakeup_way: 'WakeupWay|None' = None,
                          ) -> None:
        """Set the modem's wakeup period configuration.
        The configuration does not update until confimed by the network.
        if not WakeupPeriod.is_valid(wakeup_period):
            raise ValueError('Invalid wakeup period')
        cmd = f'ATS51={wakeup_period}'
        if self._mfr == Manufacturer.QUECTEL:
            if wakeup_way is None:
                query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:')
                wakeup_way = WakeupWay(int(query.split(',')[1]))
            cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}'
    def get_wakeup_way(self) -> WakeupWay:
        """Get the modem wakeup method."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QWKUPCFG?'
        prefix = '+QWKUPCFG:'
        wakeup_way = self._at_command_response(cmd, prefix).split(',')[1]
        return WakeupWay(int(wakeup_way))
    def power_down(self) -> None:
        """Prepare the modem for power-down."""
        cmd = 'AT%OFF'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPOWD=2'
    def get_workmode(self) -> WorkMode:
        """Get the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QMOD?'
        prefix = '+QMOD:'
        return WorkMode(int(self._at_command_response(cmd, prefix)))
    def set_workmode(self, workmode: WorkMode) -> None:
        """Set the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        if not WorkMode.is_valid(workmode):
            raise ValueError('Invalid workmode')
        cmd = f'AT+QMOD={workmode}'
    def get_deepsleep_enable(self) -> bool:
        """Get the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QSCLK?'
        prefix = '+QSCLK:'
        return bool(int(self._at_command_response(cmd, prefix)))
    def set_deepsleep_enable(self, enable: bool) -> None:
        """Set the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
    def get_register(self, s_register_number: int) -> 'int|None':
        """Get a modem register value."""
        cmd = f'ATS{s_register_number}?'
            return int(self._at_command_response(cmd))
        except ValueError:
            return None
    def set_register(self, s_register_number: int, value: int) -> None:
        """Set a modem register value."""
        cmd = f'ATS{s_register_number}={value}'
    def get_all_registers(self) -> dict:
        """Get a dictionary of modem register values."""
        raise NotImplementedError

Instance variables

var baudrate : int

The baudrate of the serial connection.

Expand source code
def baudrate(self) -> int:
    """The baudrate of the serial connection."""
    return self._modem.serial.baudrate
var crc_enabled : bool
Expand source code
def crc_enabled(self) -> bool:
    return self._modem.crc
var is_ready : bool
Expand source code
def is_ready(self) -> bool:
    return not self._modem._lock.locked()
var modem_booted : bool
Expand source code
def modem_booted(self) -> bool:
    return self._modem_booted


def await_boot(self, boot_timeout: int = 10) ‑> bool

Indicates if a boot string is received within a timeout window.

Use is_connected before waiting for boot.


boot_timeout : int
The maximum time to wait in seconds.


True if a valid boot string was received inside the timeout.

Expand source code
def await_boot(self, boot_timeout: int = 10) -> bool:
    """Indicates if a boot string is received within a timeout window.
    Use `is_connected` before waiting for boot.
        boot_timeout (int): The maximum time to wait in seconds.
        True if a valid boot string was received inside the timeout.
    boot_strings = ['ST Version', 'RDY']
    _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout)
    rx_data = ''
    started = time.time()
    while time.time() - started < boot_timeout and not self._modem_booted:
        while self._modem.is_data_waiting():
            rx_data += self._modem.read_rx_buffer()
        if rx_data and any(b in rx_data for b in boot_strings):
            self._modem_booted = True
            _log.debug('Found boot string - clearing Rx buffer')
            while self._modem.is_data_waiting():
                rx_data += self._modem.read_rx_buffer()
    return self._modem_booted
def cancel_mo_message(self, message_name: str) ‑> bool

Attempts to cancel a previously submitted mobile-originated message.


message_name : str
The mobile-originated message handle to delete.
Expand source code
def cancel_mo_message(self, message_name: str) -> bool:
    """Attempts to cancel a previously submitted mobile-originated message.
        message_name (str): The mobile-originated message handle to delete.
    _log.debug('Attempting to cancel MO message %s', message_name)
    cmd = 'AT%MGRC'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGC'
    cmd += f'="{message_name}"'
    message_states = self.get_mo_message_states(message_name)
    if len(message_states) > 0:
        state = message_states[0].state
        if state == MessageState.TX_CANCELLED:
            return True
    elif self._is_simulator:
        return True
    _log.warn('Failed to cancel message %s', message_name)
    return False
def connect(self) ‑> None

Attach to the modem via serial communications.

Provided for backward compatibility. Connection is usually automatic when instantiating NimoModem.

Expand source code
def connect(self) -> None:
    """Attach to the modem via serial communications.
    Provided for backward compatibility. Connection is usually automatic
    when instantiating `NimoModem`.
    if not self._serial.is_open:
def delete_mt_message(self, message_name: str) ‑> bool

Remove a mobile-terminated message from the modem's Rx queue.

Expand source code
def delete_mt_message(self, message_name: str) -> bool:
    """Remove a mobile-terminated message from the modem's Rx queue."""
    cmd = 'AT%MGFM'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QRMGM'
    cmd += f'="{message_name}"'
    check = self.get_mt_message_states(message_name)
    if check and check[0].state == MessageState.RX_RETRIEVED:
        return True
    return False
def disconnect(self) ‑> None

Detach from the modem serial communications.

Provided for backward compatibility. Not typically required.

Expand source code
def disconnect(self) -> None:
    """Detach from the modem serial communications.
    Provided for backward compatibility. Not typically required.
    self._is_connected = False
    self._modem_booted = False
    if self._serial.is_open:
def get_acquisition_detail(self) ‑> AcquisitionInfo

Get the detailed satellite acquisition status.

Includes acquisition_state, beamsearch_state, vcid and snr indicators.

Expand source code
def get_acquisition_detail(self) -> AcquisitionInfo:
    """Get the detailed satellite acquisition status.
    Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr`
    cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QEVNT=3,1'
        prefix = '+QEVNT:'
    result_str = self._at_command_response(cmd, prefix, timeout=10)
    if self._mfr == Manufacturer.ORBCOMM:
        results = [int(x) for x in result_str.split('\n')]
        ctrl_state = ControlState(results[0])
        beam_state = BeamState(results[1])
        rssi = float(results[2]) / 100
        vcid = results[3]
    elif self._mfr == Manufacturer.QUECTEL:
        # Workaround Quectel 20230731 documentation error says +QEVNT:
        result_str = result_str.replace('+QEVENT:', '').strip()
        results = [int(x) for x in result_str.split(',')]
        # <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
        #   <class>,<subclass>,<priority>,<data0>,...
        data0 = 7   # list index where trace data starts
        ctrl_state = ControlState(results[data0+22])
        beam_state = BeamState(results[data0+23])
        rssi = float(results[data0+16]) / 100
        vcid = results[data0+1]
    return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
def get_all_registers(self) ‑> dict

Get a dictionary of modem register values.

Expand source code
def get_all_registers(self) -> dict:
    """Get a dictionary of modem register values."""
    raise NotImplementedError
def get_deepsleep_enable(self) ‑> bool

Get the deepsleep configuration flag.

Expand source code
def get_deepsleep_enable(self) -> bool:
    """Get the deepsleep configuration flag."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QSCLK?'
    prefix = '+QSCLK:'
    return bool(int(self._at_command_response(cmd, prefix)))
def get_event_mask(self) ‑> int

Get the set of monitored events that trigger event notification.

Expand source code
def get_event_mask(self) -> int:
    """Get the set of monitored events that trigger event notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'ATS88?'
        return int(self._at_command_response(cmd))
    except ValueError:
        return 0
def get_events_asserted_mask(self) ‑> int

Get the set of events that are active following a notification.

Expand source code
def get_events_asserted_mask(self) -> int:
    """Get the set of events that are active following a notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'ATS89?'
        return int(self._at_command_response(cmd))
    except ValueError:
        return 0
def get_firmware_version(self) ‑> str

Get the modem's firmware version.

Expand source code
def get_firmware_version(self) -> str:
    """Get the modem's firmware version."""
    # TODO: Firmware structure with hardware, firmware, software?
    return self._at_command_response('AT+GMR', '+GMR:')
def get_gnss_continuous(self) ‑> int

Get the modem's GNSS continuous refresh interval in seconds.

Expand source code
def get_gnss_continuous(self) -> int:
    """Get the modem's GNSS continuous refresh interval in seconds."""
    cmd = 'ATS55?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSSCW?'
        prefix = '+QGNSSCW:'
        return int(self._at_command_response(cmd, prefix))
    except ValueError:
        return 0
def get_gnss_mode(self) ‑> GnssMode

Get the modem's GNSS receiver mode.

Expand source code
def get_gnss_mode(self) -> GnssMode:
    """Get the modem's GNSS receiver mode."""
    cmd = 'ATS39?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSSMOD?'
        prefix = '+QGNSSMOD:'
    response = self._at_command_response(cmd, prefix)
    if self._mfr == Manufacturer.QUECTEL:
        return GnssModeQuectel(int(response))
    return GnssModeOrbcomm(int(response))
def get_last_error_code(self) ‑> AtErrorCode

Get the last error code from the modem.

Expand source code
def get_last_error_code(self) -> AtErrorCode:
    """Get the last error code from the modem."""
    return AtErrorCode(int(self._at_command_response('ATS80?')))
def get_location(self, stale_secs: int = 1, wait_secs: int = 35) ‑> ModemLocation|None

Get the modem's location.


stale_secs : int
Maximum cached fix age to use in seconds.
wait_secs : int
Maximum duration to wait for a fix in seconds.


ModemLocation object if GNSS does not time out waiting for fix.

Expand source code
def get_location(self,
                 stale_secs: int = 1,
                 wait_secs: int = 35) -> 'ModemLocation|None':
    """Get the modem's location.
        stale_secs (int): Maximum cached fix age to use in seconds.
        wait_secs (int): Maximum duration to wait for a fix in seconds.
        ModemLocation object if GNSS does not time out waiting for fix.
    nmea_data = self.get_nmea_data(stale_secs, wait_secs)
    if nmea_data:
        return get_location_from_nmea_data(nmea_data)
    return None
def get_manufacturer(self) ‑> str

Get the manufacturer name.

Expand source code
def get_manufacturer(self) -> str:
    """Get the manufacturer name."""
    if not self._manufacturer:
            mfr = self._at_command_response('ATI')
            if 'quectel' in mfr.lower():
                self._manufacturer = Manufacturer.QUECTEL
                if not any(m in mfr.lower()
                           for m in ['orbcomm', 'skywave']):
                    _log.warning('Unsupported manufacturer %s', mfr)
                self._manufacturer = Manufacturer.ORBCOMM
            if vlog(VLOG_TAG):
                _log.debug('Caching manufacturer: %s',
        except ModemError:
            self._manufacturer = Manufacturer.NONE
def get_mo_message_states(self, message_name: str = '') ‑> list[MoMessage]

Get a list of mobile-originated message states in the modem Tx queue.


message_name : str
Optional filter on message name.


A list of MoMessage objects including state and metadata.

Expand source code
def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]':
    """Get a list of mobile-originated message states in the modem Tx queue.
        message_name (str): Optional filter on message name.
        A list of `MoMessage` objects including state and metadata.
    cmd = 'AT%MGRS'
    prefix = '%MGRS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGS'
        prefix = '+QSMGS:'
    if message_name and not self._is_simulator:
        # Orbcomm Modem Simulator returns ERROR for %MGRS= command
        cmd += f'="{message_name}"'
    response_str = self._at_command_response(cmd, prefix)
    return self._parse_message_states(response_str, is_mo=True)
def get_mobile_id(self) ‑> str

Get the modem's globally unique identifier.

Expand source code
def get_mobile_id(self) -> str:
    """Get the modem's globally unique identifier."""
    if not self._mobile_id:
            self._mobile_id = self._at_command_response('AT+GSN', '+GSN:')
            if vlog(VLOG_TAG):
                _log.debug('Cached Mobile ID %s', self._mobile_id)
        except ModemError:
            self._mobile_id = ''
    return self._mobile_id
def get_model(self) ‑> str

Get the manufacturer model name.

Expand source code
def get_model(self) -> str:
    """Get the manufacturer model name."""
    cmd = 'ATI4'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'ATI'
        response = self._at_command_response(cmd)
        if response:
            if self._mfr == Manufacturer.QUECTEL:
                response = response.split('\n')[1]
        return response
    except ModemError:
        return ''
def get_mt_message(self, message_name: str) ‑> MtMessage|None

Get a mobile-terminated message from the modem's Rx queue by name.

Expand source code
def get_mt_message(self, message_name: str) -> 'MtMessage|None':
    """Get a mobile-terminated message from the modem's Rx queue by name."""
    cmd = 'AT%MGFG'
    prefix = '%MGFG:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+GRMGR'
        prefix = '+GRMGR:'
    data_format = DataFormat.BASE64
    cmd += f'="{message_name}",{data_format}'
    response = self._at_command_response(cmd, prefix)
    if response:
        return self._parse_mt_message(response)
    return None
def get_mt_message_states(self, message_name: str = '') ‑> list[MtMessage]

Get a list of mobile-terminated message states in the modem Tx queue.


message_name : str
Optional filter on message name.


A list of MtMessage objects including state and metadata.

Expand source code
def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]':
    """Get a list of mobile-terminated message states in the modem Tx queue.
        message_name (str): Optional filter on message name.
        A list of `MtMessage` objects including state and metadata.
    cmd = 'AT%MGFN' if not message_name else 'AT%MGFS'
    prefix = '%MGFN:' if not message_name else 'AT%MGFS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS'
        prefix = '+QRMGN:' if not message_name else '+QRMGS:'
    if message_name and not self._is_simulator:
        cmd += f'="{message_name}"'
    response_str = self._at_command_response(cmd, prefix)
    return self._parse_message_states(response_str, is_mo=False)
def get_network_status(self) ‑> NetworkStatus

Get the current satellite acquisition status.

Expand source code
def get_network_status(self) -> NetworkStatus:
    """Get the current satellite acquisition status."""
    cmd = 'ATS54?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QREG?'
        prefix = '+QREG:'
    return NetworkStatus(int(self._at_command_response(cmd, prefix)))
def get_nmea_data(self, stale_secs: int = 1, wait_secs: int = 35, rmc: bool = True, gga: bool = True, gsa: bool = True, gsv: bool = False) ‑> str

Get a set of NMEA data detailing the modem's location.


stale_secs : int
Maximum cached fix age to use in seconds.
wait_secs : int
Maximum duration to wait for a fix in seconds.
rmc : bool
Include Recommended Minimum data.
gga : bool
Include altitude and fix quality data.
gsa : bool
Include Dilution of Precision data.
gsv : bool
Include verbose GNSS satellite details.
Expand source code
def get_nmea_data(self,
                  stale_secs: int = 1,
                  wait_secs: int = 35,
                  rmc: bool = True,
                  gga: bool = True,
                  gsa: bool = True,
                  gsv: bool = False,
                  ) -> str:
    """Get a set of NMEA data detailing the modem's location.
        stale_secs (int): Maximum cached fix age to use in seconds.
        wait_secs (int): Maximum duration to wait for a fix in seconds.
        rmc (bool): Include Recommended Minimum data.
        gga (bool): Include altitude and fix quality data.
        gsa (bool): Include Dilution of Precision data.
        gsv (bool): Include verbose GNSS satellite details.
    cmd = 'AT%GPS'
    prefix = '%GPS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSS'
        prefix = '+QGNSS:'
    cmd += f'={stale_secs},{wait_secs}'
    if rmc:
        cmd += ',"RMC"'
    if gga:
        cmd += ',"GGA"'
    if gsa:
        cmd += ',"GSA"'
    if gsv:
        cmd += ',"GSV"'
        response = self._at_command_response(cmd, prefix, wait_secs + 5)
        return response
    except ModemAtError as exc:
        if exc.error_code != AtErrorCode.GNSS_TIMEOUT:
    return ''
def get_power_mode(self) ‑> PowerMode

Get the modem's power mode configuration.

Expand source code
def get_power_mode(self) -> PowerMode:
    """Get the modem's power mode configuration."""
    cmd = 'ATS50?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QPMD?'
        prefix = '+QPMD:'
    return PowerMode(int(self._at_command_response(cmd, prefix)))
def get_register(self, s_register_number: int) ‑> int|None

Get a modem register value.

Expand source code
def get_register(self, s_register_number: int) -> 'int|None':
    """Get a modem register value."""
    cmd = f'ATS{s_register_number}?'
        return int(self._at_command_response(cmd))
    except ValueError:
        return None
def get_rssi(self) ‑> float

Get the current Received Signal Strength Indicator.

Also referred to as SNR or C/N0 (dB-Hz)

Expand source code
def get_rssi(self) -> float:
    """Get the current Received Signal Strength Indicator.
    Also referred to as SNR or C/N0 (dB-Hz)
    cmd = 'ATS90=3 S91=1 S92=1 S116?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSCN'
        prefix = '+QSCN:'
        return int(self._at_command_response(cmd, prefix)) / 100
    except ValueError:
        return 0
def get_satellite_info(self) ‑> SatelliteLocation|None

Get the satellite's information including azimuth and elevation.

Derives which satellite/GeoBeam is used from trace class 3 subclass 5.


SatelliteLocation object (azimuth, elevation) if determinable.

Expand source code
def get_satellite_info(self) -> 'SatelliteLocation|None':
    """Get the satellite's information including azimuth and elevation.
    Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
        `SatelliteLocation` object (azimuth, elevation) if determinable.
    geobeam = None
    modem_location = self.get_location()
    if (modem_location is not None and
        self.get_network_status() > NetworkStatus.RX_SEARCHING):
        # satellite has been found
        cmd = 'ATS90=3 S91=5 S92=1 S102?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QEVNT=3,5'
            prefix = '+QEVNT:'
        response = self._at_command_response(cmd, prefix)
        if self._mfr == Manufacturer.QUECTEL:
            # workaround documentation error
            response = response.replace('+QEVENT:', '').strip()
            response = response.split(',')[9]
        geobeam = GeoBeam(int(response))
        return get_satellite_location(modem_location, geobeam)
    return None
def get_signal_quality(self) ‑> SignalQuality

Get a qualitative indicator from 0..5 of the satellite signal.

Expand source code
def get_signal_quality(self) -> SignalQuality:
    """Get a qualitative indicator from 0..5 of the satellite signal."""
    snr = self.get_rssi()
    if snr >= SignalLevelRegional.INVALID.value:
        return SignalQuality.WARNING
    if snr >= SignalLevelRegional.BARS_5.value:
        return SignalQuality.STRONG
    if snr >= SignalLevelRegional.BARS_4.value:
        return SignalQuality.GOOD
    if snr >= SignalLevelRegional.BARS_3.value:
        return SignalQuality.MID
    if snr >= SignalLevelRegional.BARS_2.value:
        return SignalQuality.LOW
    if snr >= SignalLevelRegional.BARS_1.value:
        return SignalQuality.WEAK
    return SignalQuality.NONE
def get_system_time(self) ‑> int

Get the system/GNSS time from the modem.

Expand source code
def get_system_time(self) -> int:
    """Get the system/GNSS time from the modem."""
        nimo_time = self._at_command_response('AT%UTC', '%UTC:')
        iso_time = nimo_time.replace(' ', 'T') + 'Z'
        return iso_to_ts(iso_time)
    except ModemError:
        return 0
def get_temperature(self) ‑> int|None

Get the processor temperature in Celsius.

Expand source code
def get_temperature(self) -> 'int|None':
    """Get the processor temperature in Celsius."""
        return int(int(self._at_command_response('ATS85?')) / 10)
    except ValueError:
        return None
def get_trace_event_data(self, event: tuple[int, int], decode: bool = False) ‑> list[int]|dict[str, int]

Get the trace event data.


event : tuple
The trace (class, subclass)
decode : bool
Decodes raw data to dictionary (not implemented)
Expand source code
def get_trace_event_data(self,
                         event: 'tuple[int, int]',
                         decode: bool = False,
                         ) -> 'list[int]|dict[str, int]':
    """Get the trace event data.
        event (tuple): The trace (class, subclass)
        decode (bool): Decodes raw data to dictionary (not implemented)
    cmd = f'AT%EVNT={event[0]},{event[1]}'
    prefix = '%EVNT:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = cmd.replace('%EVNT', '+QEVNT')
        prefix = '+QEVENT:'   # documented as +QEVNT
    trace = self._at_command_response(cmd, prefix)
    if decode:
        raise NotImplementedError
    return [int(i) for i in trace.split(',')]
def get_trace_event_monitor(self, asserted_only: bool = False) ‑> list[tuple[int, int]]

Get the list of monitored Trace Events.


asserted_only : bool
If True returns only asserted monitored events


A list of tuples (trace_class, trace_subclass)


ModemError if unsupported by the modem type.

Expand source code
def get_trace_event_monitor(self,
                            asserted_only: bool = False,
                            ) -> 'list[tuple[int, int]]':
    """Get the list of monitored Trace Events.
        asserted_only (bool): If True returns only asserted monitored events
        A list of tuples (trace_class, trace_subclass)
        `ModemError` if unsupported by the modem type.
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT%EVMON'
    prefix = '%EVMON:'
    trace_events = []
    events = self._at_command_response(cmd, prefix).split(',')
    for event in events:
        trace_class = int(event.split('.')[0])
        trace_subclass = int(event.split('.')[1].replace('*', ''))
        if not asserted_only or event.endswith('*'):
            trace_events.append((trace_class, trace_subclass))
    return trace_events
def get_trace_events_cached(self) ‑> list[tuple[int, int]]

Get a list of trace events cached.

Expand source code
def get_trace_events_cached(self) -> 'list[tuple[int, int]]':
    """Get a list of trace events cached."""
    return self.get_trace_event_monitor(True)
def get_urc(self) ‑> UrcCode|None

Get the pending Unsolicited Result Code if one is present.

Expand source code
def get_urc(self) -> 'UrcCode|None':
    """Get the pending Unsolicited Result Code if one is present."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    eol = '\r\n' if self._modem.verbose else '\r'
    result = self._modem.read_rx_buffer(read_until=eol)
    if result:
        result = result.replace('+QURC:', '').strip()
            return UrcCode(int(result))
        except ValueError:
            return UrcCode[result]
    return None
def get_urc_ctl(self) ‑> int

Get the event list that trigger Unsolicited Report Codes.

Expand source code
def get_urc_ctl(self) -> int:
    """Get the event list that trigger Unsolicited Report Codes."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    cmd = 'AT+QURCCTL?'
    prefix = '+QURCCTL:'
        return int(self._at_command_response(cmd, prefix), 16)
    except ValueError:
        return 0
def get_wakeup_period(self) ‑> WakeupPeriod

Get the modem's wakeup period configuration.

Expand source code
def get_wakeup_period(self) -> WakeupPeriod:
    """Get the modem's wakeup period configuration."""
    cmd = 'ATS51?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QWKUPCFG?'
        prefix = '+QWKUPCFG:'
    if self._mfr == Manufacturer.QUECTEL:
        return WakeupPeriod(int(
            self._at_command_response(cmd, prefix).split(',')[0]))
    return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
def get_wakeup_way(self) ‑> WakeupWay

Get the modem wakeup method.

Expand source code
def get_wakeup_way(self) -> WakeupWay:
    """Get the modem wakeup method."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QWKUPCFG?'
    prefix = '+QWKUPCFG:'
    wakeup_way = self._at_command_response(cmd, prefix).split(',')[1]
    return WakeupWay(int(wakeup_way))
def get_workmode(self) ‑> WorkMode

Get the modem working mode.

Expand source code
def get_workmode(self) -> WorkMode:
    """Get the modem working mode."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QMOD?'
    prefix = '+QMOD:'
    return WorkMode(int(self._at_command_response(cmd, prefix)))
def initialize(self, echo: bool = True, verbose: bool = True) ‑> bool

Initialize the modem AT configuration for Echo and Verbose.

Expand source code
def initialize(self,
               echo: bool = True,
               verbose: bool = True,
               ) -> bool:
    """Initialize the modem AT configuration for Echo and Verbose."""
    at_command = (f'ATZ;E{int(echo)};V{int(verbose)}')
        return True
    except ModemCrcConfig:'Attempting re-initialize with CRC enabled')
        return True
def is_blocked(self) ‑> bool

Indicates if line-of-sight to the satellite is blocked.

Expand source code
def is_blocked(self) -> bool:
    """Indicates if line-of-sight to the satellite is blocked."""
    return self.get_network_status() == 8
def is_connected(self) ‑> bool

Indicates if the modem is responding to a basic AT query.

Expand source code
def is_connected(self) -> bool:
    """Indicates if the modem is responding to a basic AT query."""
        self._is_connected = True
        self._modem_booted = True
        return True
    except ModemError:
        self._is_connected = False
        self._modem_booted = False
        return False
def is_muted(self) ‑> bool

Indicates if the modem has been muted (disallowed to transmit data).

Expand source code
def is_muted(self) -> bool:
    """Indicates if the modem has been muted (disallowed to transmit data).
    return self.get_network_status() == 7
def is_transmit_allowed(self) ‑> bool

Indicates if the modem is able to transmit data.

Expand source code
def is_transmit_allowed(self) -> bool:
    """Indicates if the modem is able to transmit data."""
    return self.get_network_status() == 5
def is_updating_network(self) ‑> bool

Indicates if the modem is updating network information.

The modem should not be powered down during a network update.

Expand source code
def is_updating_network(self) -> bool:
    """Indicates if the modem is updating network information.
    The modem should not be powered down during a network update.
    return self.get_network_status() == 4
def power_down(self) ‑> None

Prepare the modem for power-down.

Expand source code
def power_down(self) -> None:
    """Prepare the modem for power-down."""
    cmd = 'AT%OFF'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QPOWD=2'
def receive_data(self, message_name: str) ‑> bytes|None

Get the raw data from a mobile-terminated message.

Expand source code
def receive_data(self, message_name: str) -> 'bytes|None':
    """Get the raw data from a mobile-terminated message."""
    message = self.get_mt_message(message_name)
    if message:
        return message.payload
    return None
def reset_factory_config(self) ‑> None

Reset the modem's factory default configuration.

Expand source code
def reset_factory_config(self) -> None:
    """Reset the modem's factory default configuration."""
def retry_baudrate(self) ‑> bool
Expand source code
def retry_baudrate(self) -> bool:
    for baud in BAUDRATES:
        self._modem.serial.baudrate = baud
        if self.is_connected():
            return True
    return False
def save_config(self) ‑> None

Store the current configuration to modem non-volatile memory.

Expand source code
def save_config(self) -> None:
    """Store the current configuration to modem non-volatile memory."""
def send_data(self, data: bytes, **kwargs) ‑> str|MoMessage

Submits data to send as a mobile-originated message.

If a message_name is not supplied one will be generated using the least significant 8 digits of unix timestamp.


data : bytes
The data to send.

Keyword Args: message_name (str): Optional handle for message in Tx queue. Max 8 characters for Orbcomm modem or 12 for Quectel. priority (int): Optional priority 1 (highest) .. 4 (low, default). May use MessagePriority. codec_sin (int): Optional first byte of payload to add as a codec service identifier, must be in range 16..255. codec_min (int): Optional second byte of payload to add as a codec message identifier, must be in range 0..255. return_message (bool): If set, returns a MoMessage instead of the message handle.


Message handle (str) or MoMessage if return_message kwarg is set.


ValueError for various parameter limit violations.

Expand source code
def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage':
    """Submits data to send as a mobile-originated message.
    If a `message_name` is not supplied one will be generated using the
    least significant 8 digits of unix timestamp.
        data (bytes): The data to send.
    Keyword Args:
        message_name (str): Optional handle for message in Tx queue. Max 8
            characters for Orbcomm modem or 12 for Quectel.
        priority (int): Optional priority 1 (highest) .. 4 (low, default).
            May use `MessagePriority`.
        codec_sin (int): Optional first byte of payload to add as a codec
            service identifier, must be in range 16..255.
        codec_min (int): Optional second byte of payload to add as a codec
            message identifier, must be in range 0..255.
        return_message (bool): If set, returns a `MoMessage` instead of the
            message handle.
        Message handle (str) or `MoMessage` if `return_message` kwarg is set.
        `ValueError` for various parameter limit violations.
    data_size = len(data)
    msg_payload_sin_min = b''
    message_name = kwargs.get('message_name', '')
    priority = MessagePriority(kwargs.get('priority',
    codec_sin: int = kwargs.get('codec_sin', -1)
    codec_min: int = kwargs.get('codec_min', -1)
    if codec_sin > -1:
        data_size += 1
        msg_payload_sin_min += codec_sin.to_bytes(1, 'big')
    if codec_min > -1:
        data_size += 1
        msg_payload_sin_min += codec_min.to_bytes(1, 'big')
    if not 2 <= data_size <= MSG_MO_MAX_SIZE:
        raise ValueError('Invalid mobile-originated message size')
    if message_name and len(message_name) > self._mo_msg_name_len_max:
        raise ValueError('Message name too long')
    data_index = 0
    if codec_sin <= -1:
        codec_sin = data[0]
        data_index += 1
        data_size -= 1
    if codec_sin not in range(16, 256):
        raise ValueError('Illegal first payload byte SIN must be 16..255')
    if codec_min <= -1:
        codec_min = data[1]
        data_index += 1
        data_size -= 1
    if codec_min > 255:
        raise ValueError('Invalid second payload byte MIN must be 0..255')
    max_name_len = self._mo_msg_name_len_max
    if message_name and len(message_name) > max_name_len:
        raise ValueError(f'Invalid message name longer than {max_name_len}')
    if len(message_name) == 0:
        message_name = f'{int(time.time())}'[-max_name_len:]
    # Convert to base64 string for serial efficiency
    #   no effect on OTA size, modem always decodes and sends raw bytes OTA
    data_format = DataFormat.BASE64
    formatted_data = base64.b64encode(data[data_index:]).decode('utf-8')
    cmd = 'AT%MGRT='
    codec_sep = '.'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGT='
        codec_sep = ','
    cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}'
    if kwargs.get('return_message', False) is True:
        return MoMessage(message_name, priority, MessageState.TX_READY,
                            payload=(msg_payload_sin_min + data))
    return message_name
def send_text(self, text: str, **kwargs) ‑> str|MoMessage

Submits a text string to send as data.

If codec_sin kwarg is not provided 128 is prepended as the first byte. If codec_min kwarg is not provided 1 is prepended as the second byte. Other kwargs as per send_data.


text : str
The text message to send.


(str) The message name assigned or MoMessage if kwarg return_message is set.

Expand source code
def send_text(self, text: str, **kwargs) -> 'str|MoMessage':
    """Submits a text string to send as data.
    If `codec_sin` kwarg is not provided 128 is prepended as the first byte.
    If `codec_min` kwarg is not provided 1 is prepended as the second byte.
    Other kwargs as per `send_data`.
        text (str): The text message to send.
        (str) The message name assigned or MoMessage if kwarg
            `return_message` is set.
    data = b''
    codec_sin = int(kwargs.get('codec_sin', 128))
    data += codec_sin.to_bytes(1, 'big')
    codec_min = int(kwargs.get('codec_min', 1))
    data += codec_min.to_bytes(1, 'big')
    data += text.encode()
    flowthru = ['message_name', 'priority', 'return_message']
    next_kwargs = { k:v for k, v in kwargs if k in flowthru }
    return self.send_data(data, **next_kwargs)
def set_crc(self, enable: bool = False) ‑> bool

Enable or disable CRC error checking on the modem serial port.

Expand source code
def set_crc(self, enable: bool = False) -> bool:
    """Enable or disable CRC error checking on the modem serial port."""
        return True
    except ModemCrcConfig:
        if ((self._modem.crc and enable) or
            (not self._modem.crc and not enable)):
            return True
        return False
def set_deepsleep_enable(self, enable: bool) ‑> None

Set the deepsleep configuration flag.

Expand source code
def set_deepsleep_enable(self, enable: bool) -> None:
    """Set the deepsleep configuration flag."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
def set_event_mask(self, event_mask: int) ‑> None

Set monitored events that trigger event notification.

Expand source code
def set_event_mask(self, event_mask: int) -> None:
    """Set monitored events that trigger event notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    max_bits = 12
    if not isinstance(event_mask, int) or event_mask > 2**max_bits-1:
        raise ValueError('Invalid event bitmask')
    cmd = f'ATS88={event_mask}'
def set_gnss_continuous(self, interval: int) ‑> None

Set the modem's GNSS continuous refresh interval in seconds.


interval : int
Automatic update interval 0..30 seconds.


True if successful.


ValueError if invalid interval is specified.

Expand source code
def set_gnss_continuous(self, interval: int) -> None:
    """Set the modem's GNSS continuous refresh interval in seconds.
        interval (int): Automatic update interval 0..30 seconds.
        `True` if successful.
        `ValueError` if invalid interval is specified.
    if interval not in range (0, 31):
        raise ValueError('Invalid GNSS refresh interval')
    cmd = f'ATS55={interval}'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = f'AT+QGNSSCW={interval}'
def set_gnss_mode(self, gnss_mode: GnssMode) ‑> None

Get the modem's GNSS receiver mode.

Expand source code
def set_gnss_mode(self, gnss_mode: GnssMode) -> None:
    """Get the modem's GNSS receiver mode."""
    cmd = f'ATS39={gnss_mode}'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        if not GnssModeQuectel.is_valid(gnss_mode):
            raise ValueError('Invalid GNSS mode')
        cmd = f'AT+QGNSSMOD={gnss_mode}'
        prefix = '+QGNSSMOD:'
        if not GnssModeOrbcomm.is_valid(gnss_mode):
            raise ValueError('Invalid GNSS mode')
    self._at_command_response(cmd, prefix)
def set_power_mode(self, power_mode: PowerMode) ‑> None

Set the modem's power mode configuration.

Expand source code
def set_power_mode(self, power_mode: PowerMode) -> None:
    """Set the modem's power mode configuration."""
    if not PowerMode.is_valid(power_mode):
        raise ValueError('Invalid Power Mode')
    cmd = f'ATS50={power_mode}'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = f'AT+QPMD={power_mode}'
def set_register(self, s_register_number: int, value: int) ‑> None

Set a modem register value.

Expand source code
def set_register(self, s_register_number: int, value: int) -> None:
    """Set a modem register value."""
    cmd = f'ATS{s_register_number}={value}'
def set_trace_event_monitor(self, events: list[tuple[int, int]]) ‑> None

Set the list of monitored trace events.

Expand source code
def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None:
    """Set the list of monitored trace events."""
    cmd = 'AT%EVMON='
    for event in events:
        if not cmd.endswith('='):
            cmd += ','
        cmd += f'{event[0]}.{event[1]}'
def set_urc_ctl(self, qurc_mask: int) ‑> None

Set the event list that trigger Unsolicited Report Codes.

Expand source code
def set_urc_ctl(self, qurc_mask: int) -> None:
    """Set the event list that trigger Unsolicited Report Codes."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    cmd = f'AT+QURCCTL=0x{qurc_mask:04X}'
def set_wakeup_period(self, wakeup_period: WakeupPeriod, wakeup_way: WakeupWay|None = None)

Set the modem's wakeup period configuration.

The configuration does not update until confimed by the network.

Expand source code
def set_wakeup_period(self,
                      wakeup_period: WakeupPeriod,
                      wakeup_way: 'WakeupWay|None' = None,
                      ) -> None:
    """Set the modem's wakeup period configuration.
    The configuration does not update until confimed by the network.
    if not WakeupPeriod.is_valid(wakeup_period):
        raise ValueError('Invalid wakeup period')
    cmd = f'ATS51={wakeup_period}'
    if self._mfr == Manufacturer.QUECTEL:
        if wakeup_way is None:
            query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:')
            wakeup_way = WakeupWay(int(query.split(',')[1]))
        cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}'
def set_workmode(self, workmode: WorkMode) ‑> None

Set the modem working mode.

Expand source code
def set_workmode(self, workmode: WorkMode) -> None:
    """Set the modem working mode."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    if not WorkMode.is_valid(workmode):
        raise ValueError('Invalid workmode')
    cmd = f'AT+QMOD={workmode}'