Package pynimomodem

Library to interface with a Viasat-approved NIMO modem for satellite IoT.

This library abstracts various low-level AT command operations useful for interacting with a NIMO modem to send and receive data, check network status and get location-based information.

Most get methods will raise a ModemAtError if a valid response is not received to a command/query.

ModemTimeout will be raised if no response is received to a command within the default or specified timeout.

AT command errors will raise ModemAtError with a property error_code to provide further details with the AtErrorCode.

Expand source code
"""Library to interface with a Viasat-approved NIMO modem for satellite IoT.

This library abstracts various low-level AT command operations useful for
interacting with a NIMO modem to send and receive data, check network status
and get location-based information.

Most `get` methods will raise a `ModemAtError` if a valid response is not
received to a command/query.

`ModemTimeout` will be raised if no response is received to a command within
the default or specified timeout.

AT command errors will raise `ModemAtError` with a property `error_code` to
provide further details with the `AtErrorCode`.

"""

from .constants import (
    AtErrorCode,
    BeamState,
    ControlState,
    DataFormat,
    EventNotification,
    GeoBeam,
    GeoSatellite,
    GnssMode,
    GnssModeOrbcomm,
    GnssModeQuectel,
    MessagePriority,
    MessageState,
    NetworkStatus,
    PowerMode,
    SignalQuality,
    UrcCode,
    UrcControl,
    WakeupPeriod,
    WakeupWay,
    WorkMode,
)
from .modem import (
    Manufacturer,
    ModemLocation,
    MoMessage,
    MtMessage,
    NimoModem,
    ModemAtError,
    ModemCrcConfig,
    ModemCrc,
    ModemError,
    ModemTimeout,
    AcquisitionInfo,
    SatelliteLocation,
)

__all__ = [
    'AtErrorCode',
    'BeamState',
    'ControlState',
    'DataFormat',
    'GeoBeam',
    'GeoSatellite',
    'GnssMode',
    'GnssModeOrbcomm',
    'GnssModeQuectel',
    'Manufacturer',
    'MessagePriority',
    'MessageState',
    'ModemLocation',
    'MoMessage',
    'MtMessage',
    'NetworkStatus',
    'NimoModem',
    'ModemAtError',
    'ModemCrcConfig',
    'ModemCrc',
    'ModemError',
    'ModemTimeout',
    'PowerMode',
    'AcquisitionInfo',
    'SatelliteLocation',
    'SignalQuality',
    'WakeupPeriod',
    'WakeupWay',
    'WorkMode',
    'UrcCode',
    'UrcControl',
    'EventNotification',
]

Sub-modules

pynimomodem.atcommandbuffer

AT command buffer parsing for a NIMO modem …

pynimomodem.constants

NIMO modem constants …

pynimomodem.crcxmodem

Implementation of CCIT-16-CRC for use with NIMO modems …

pynimomodem.location

Classes and methods for location, elevation and azimuth for NIMO modems …

pynimomodem.message

Class and methods for managing messages submitted/retrieved to a NIMO modem …

pynimomodem.modem

Class for a Non-IP Modem using Orbcomm network protocols …

pynimomodem.nimoutils

Various utilities/helpers for NIMO modem interaction and debugging.

pynimomodem.s_registers

IDP modem S-register definitions …

Classes

class AcquisitionInfo (ctrl_state: ControlState = ControlState.STOPPED, beam_state: BeamState = BeamState.IDLE, rssi: float = 0.0, vcid: int = 0)

Details about the satellite acquisition state of the modem.

Attributes

ctrl_state : ControlState
Primary network acquisition state.
beam_state : BeamState
Secondary beam acquistion state.
rssi : float
Signal indicator Carrier to Noise ratio (dB-Hz).
vcid : int
Virtual carrier identifier for low-level sanity check.
Expand source code
@dataclass
class AcquisitionInfo:
    """Details about the satellite acquisition state of the modem.
    
    Attributes:
        ctrl_state (ControlState): Primary network acquisition state.
        beam_state (BeamState): Secondary beam acquistion state.
        rssi (float): Signal indicator Carrier to Noise ratio (dB-Hz).
        vcid (int): Virtual carrier identifier for low-level sanity check.
    
    """
    ctrl_state: ControlState = ControlState.STOPPED
    beam_state: BeamState = BeamState.IDLE
    rssi: float = 0.0
    vcid: int = 0

Class variables

var beam_stateBeamState
var ctrl_stateControlState
var rssi : float
var vcid : int
class AtErrorCode (value, names=None, *, module=None, qualname=None, type=None, start=1)

AT command error codes for NIMO modems.

Expand source code
class AtErrorCode(NimoIntEnum):
    """AT command error codes for NIMO modems."""
    # Standard / documented
    OK = 0
    ERROR = 4
    INVALID_CRC = 100
    UNKNOWN_COMMAND = 101
    INVALID_PARAMETER = 102
    MESSAGE_LENGTH_MISMATCH = 103
    RESERVED_104 = 104
    SYSTEM_ERROR = 105
    TX_QUEUE_FULL = 106
    DUPLICATE_NAME = 107
    GNSS_TIMEOUT = 108
    MESSAGE_UNAVAILABLE = 109
    RESERVED_110 = 110
    RESERVED_111 = 111
    READ_ONLY_PARAMETER = 112
    # Extensions for additional situations
    TIMEOUT = 255
    CRC_CONFIG_MISMATCH = 254
    UNABLE_TO_DELETE = 253
    INVALID_RESPONSE_CRC = 252

Ancestors

Class variables

var CRC_CONFIG_MISMATCH
var DUPLICATE_NAME
var ERROR
var GNSS_TIMEOUT
var INVALID_CRC
var INVALID_PARAMETER
var INVALID_RESPONSE_CRC
var MESSAGE_LENGTH_MISMATCH
var MESSAGE_UNAVAILABLE
var OK
var READ_ONLY_PARAMETER
var RESERVED_104
var RESERVED_110
var RESERVED_111
var SYSTEM_ERROR
var TIMEOUT
var TX_QUEUE_FULL
var UNABLE_TO_DELETE
var UNKNOWN_COMMAND

Inherited members

class BeamState (value, names=None, *, module=None, qualname=None, type=None, start=1)

States of the NIMO modem satellite beam internal selection process.

Expand source code
class BeamState(NimoIntEnum):
    """States of the NIMO modem satellite beam internal selection process."""
    IDLE = 0
    SEARCH_ANY_TRAFFIC = 1
    SEARCH_LAST_TRAFFIC = 2
    RESERVED = 3
    SEARCH_NEW_TRAFFIC = 4
    SEARCH_BULLETIN_BOARD = 5
    DELAY_TRAFFIC_SEARCH = 6

Ancestors

Class variables

var IDLE
var RESERVED
var SEARCH_ANY_TRAFFIC
var SEARCH_BULLETIN_BOARD
var SEARCH_LAST_TRAFFIC
var SEARCH_NEW_TRAFFIC

Inherited members

class ControlState (value, names=None, *, module=None, qualname=None, type=None, start=1)

States of the NIMO modem internal network acquisition process.

Expand source code
class ControlState(NimoIntEnum):
    """States of the NIMO modem internal network acquisition process."""
    STOPPED = 0
    GNSS_WAIT = 1
    SEARCH_START = 2
    BEAM_SEARCH = 3
    BEAM_FOUND = 4
    BEAM_ACQUIRED = 5
    BEAM_SWITCH = 6
    REGISTERING = 7
    RECEIVE_ONLY = 8
    BB_DOWNLOAD = 9
    ACTIVE = 10
    BLOCKED = 11
    CONFIRM_PREVIOUS_BEAM = 12
    CONFIRM_REQUESTED_BEAM = 13
    CONNECT_CONFIRMED_BEAM = 14

Ancestors

Class variables

var ACTIVE
var BB_DOWNLOAD
var BEAM_ACQUIRED
var BEAM_FOUND
var BEAM_SWITCH
var BLOCKED
var CONFIRM_PREVIOUS_BEAM
var CONFIRM_REQUESTED_BEAM
var CONNECT_CONFIRMED_BEAM
var GNSS_WAIT
var RECEIVE_ONLY
var REGISTERING
var SEARCH_START
var STOPPED

Inherited members

class DataFormat (value, names=None, *, module=None, qualname=None, type=None, start=1)

Data formats used for submitting or extracting message data/payload.

Expand source code
class DataFormat(NimoIntEnum):
    """Data formats used for submitting or extracting message data/payload."""
    TEXT = 1
    HEX = 2
    BASE64 = 3

Ancestors

Class variables

var BASE64
var HEX
var TEXT

Inherited members

class EventNotification (value, names=None, *, module=None, qualname=None, type=None, start=1)

Bitmask enumerated values for NIMO modem events.

Expand source code
class EventNotification(IntFlag):
    """Bitmask enumerated values for NIMO modem events."""
    GNSS_FIX_NEW =              0b000000000001
    MESSAGE_MT_RECEIVED =       0b000000000010
    MESSAGE_MO_COMPLETE =       0b000000000100
    NETWORK_REGISTERED =        0b000000001000
    MODEM_RESET_COMPLETE =      0b000000010000
    JAMMING_ANTENNA_CHANGE =    0b000000100000
    MODEM_RESET_PENDING =       0b000001000000
    WAKEUP_PERIOD_CHANGE =      0b000010000000
    UTC_TIME_SYNC =             0b000100000000
    GNSS_FIX_TIMEOUT =          0b001000000000
    EVENT_TRACE_CACHED =        0b010000000000
    NETWORK_PING_ACKNOWLEDGED = 0b100000000000
    
    @classmethod
    def get_events(cls, event_mask: int) -> 'list[EventNotification]':
        """Parses a bitmask to return a list of events."""
        return [item for item in cls if item.value & event_mask]

Ancestors

  • enum.IntFlag
  • builtins.int
  • enum.Flag
  • enum.Enum

Class variables

var EVENT_TRACE_CACHED
var GNSS_FIX_NEW
var GNSS_FIX_TIMEOUT
var JAMMING_ANTENNA_CHANGE
var MESSAGE_MO_COMPLETE
var MESSAGE_MT_RECEIVED
var MODEM_RESET_COMPLETE
var MODEM_RESET_PENDING
var NETWORK_PING_ACKNOWLEDGED
var NETWORK_REGISTERED
var UTC_TIME_SYNC
var WAKEUP_PERIOD_CHANGE

Static methods

def get_events(event_mask: int) ‑> list[EventNotification]

Parses a bitmask to return a list of events.

Expand source code
@classmethod
def get_events(cls, event_mask: int) -> 'list[EventNotification]':
    """Parses a bitmask to return a list of events."""
    return [item for item in cls if item.value & event_mask]
class GeoBeam (value, names=None, *, module=None, qualname=None, type=None, start=1)

Geographic Beam identifiers mapped to readable names.

Expand source code
class GeoBeam(NimoIntEnum):
    """Geographic Beam identifiers mapped to readable names."""
    GLOBAL_BB = 0
    AMER_RB1 = 1
    AMER_RB2 = 2
    AMER_RB3 = 3
    AMER_RB4 = 4
    AMER_RB5 = 5
    AMER_RB6 = 6
    AMER_RB7 = 7
    AMER_RB8 = 8
    AMER_RB9 = 9
    AMER_RB10 = 10
    AMER_RB11 = 11
    AMER_RB12 = 12
    AMER_RB13 = 13
    AMER_RB14 = 14
    AMER_RB15 = 15
    AMER_RB16 = 16
    AMER_RB17 = 17
    AMER_RB18 = 18
    AMER_RB19 = 19
    AORW_SC = 61
    EMEA_RB1 = 21
    EMEA_RB2 = 22
    EMEA_RB3 = 23
    EMEA_RB4 = 24
    EMEA_RB5 = 25
    EMEA_RB6 = 26
    EMEA_RB7 = 27
    EMEA_RB8 = 28
    EMEA_RB9 = 29
    EMEA_RB10 = 30
    EMEA_RB11 = 31
    EMEA_RB12 = 32
    EMEA_RB13 = 33
    EMEA_RB14 = 34
    EMEA_RB15 = 35
    EMEA_RB16 = 36
    EMEA_RB17 = 37
    EMEA_RB18 = 38
    EMEA_RB19 = 39
    APAC_RB1 = 41
    APAC_RB2 = 42
    APAC_RB3 = 43
    APAC_RB4 = 44
    APAC_RB5 = 45
    APAC_RB6 = 46
    APAC_RB7 = 47
    APAC_RB8 = 48
    APAC_RB9 = 49
    APAC_RB10 = 50
    APAC_RB11 = 51
    APAC_RB12 = 52
    APAC_RB13 = 53
    APAC_RB14 = 54
    APAC_RB15 = 55
    APAC_RB16 = 56
    APAC_RB17 = 57
    APAC_RB18 = 58
    APAC_RB19 = 59
    # MEAS_RB10 = 90   # replaced by IOE
    # MEAS_RB11 = 91   # replaced by IOE
    # MEAS_RB12 = 92   # replaced by IOE
    # MEAS_RB15 = 93   # replaced by IOE
    IOE_RB1 = 101
    IOE_RB2 = 102
    IOE_RB3 = 103
    IOE_RB4 = 104
    IOE_RB5 = 105
    IOE_RB6 = 106
    IOE_RB7 = 107
    IOE_RB8 = 108
    IOE_RB9 = 109
    IOE_RB10 = 110
    IOE_RB11 = 111
    IOE_RB12 = 112
    IOE_RB13 = 113
    IOE_RB14 = 114
    IOE_RB15 = 115
    IOE_RB16 = 116
    IOE_RB17 = 117
    IOE_RB18 = 118
    IOE_RB19 = 119

    @property
    def satellite(self):
        return self.name.split('_')[0]

    @property
    def beam(self):
        return self.name.split('_')[1]
    
    @property
    def id(self):
        return self.value

Ancestors

Class variables

var AMER_RB1
var AMER_RB10
var AMER_RB11
var AMER_RB12
var AMER_RB13
var AMER_RB14
var AMER_RB15
var AMER_RB16
var AMER_RB17
var AMER_RB18
var AMER_RB19
var AMER_RB2
var AMER_RB3
var AMER_RB4
var AMER_RB5
var AMER_RB6
var AMER_RB7
var AMER_RB8
var AMER_RB9
var AORW_SC
var APAC_RB1
var APAC_RB10
var APAC_RB11
var APAC_RB12
var APAC_RB13
var APAC_RB14
var APAC_RB15
var APAC_RB16
var APAC_RB17
var APAC_RB18
var APAC_RB19
var APAC_RB2
var APAC_RB3
var APAC_RB4
var APAC_RB5
var APAC_RB6
var APAC_RB7
var APAC_RB8
var APAC_RB9
var EMEA_RB1
var EMEA_RB10
var EMEA_RB11
var EMEA_RB12
var EMEA_RB13
var EMEA_RB14
var EMEA_RB15
var EMEA_RB16
var EMEA_RB17
var EMEA_RB18
var EMEA_RB19
var EMEA_RB2
var EMEA_RB3
var EMEA_RB4
var EMEA_RB5
var EMEA_RB6
var EMEA_RB7
var EMEA_RB8
var EMEA_RB9
var GLOBAL_BB
var IOE_RB1
var IOE_RB10
var IOE_RB11
var IOE_RB12
var IOE_RB13
var IOE_RB14
var IOE_RB15
var IOE_RB16
var IOE_RB17
var IOE_RB18
var IOE_RB19
var IOE_RB2
var IOE_RB3
var IOE_RB4
var IOE_RB5
var IOE_RB6
var IOE_RB7
var IOE_RB8
var IOE_RB9

Instance variables

var beam
Expand source code
@property
def beam(self):
    return self.name.split('_')[1]
var id
Expand source code
@property
def id(self):
    return self.value
var satellite
Expand source code
@property
def satellite(self):
    return self.name.split('_')[0]

Inherited members

class GeoSatellite (value, names=None, *, module=None, qualname=None, type=None, start=1)

Maps the Viasat/Inmarsat geostationary longitude supporting NIMO.

Expand source code
class GeoSatellite(NimoFloatEnum):
    """Maps the Viasat/Inmarsat geostationary longitude supporting NIMO."""
    AMER = -98.0   # Inmarsat 4F3
    AORWSC = -54.0   # Inmarsat 3F5
    EMEA = 24.9   # Inmarsat 4AF4 aka Alphasat XL
    IOE = 63.5   # Inmarsat 6F1 previously IOR 3F1, MEAS 4F2
    APAC = 143.5   # Inmarsat 4F2 previously 4F1

Ancestors

Class variables

var AMER
var AORWSC
var APAC
var EMEA
var IOE

Inherited members

class GnssMode (value, names=None, *, module=None, qualname=None, type=None, start=1)

Base class for manufacturer-specific variants.

Expand source code
class GnssMode(NimoIntEnum):
    """Base class for manufacturer-specific variants."""

Ancestors

Subclasses

Inherited members

class GnssModeOrbcomm (value, names=None, *, module=None, qualname=None, type=None, start=1)

The operating mode setting for the built-in GNSS in a NIMO modem.

Expand source code
class GnssModeOrbcomm(GnssMode):
    """The operating mode setting for the built-in GNSS in a NIMO modem."""
    GPS = 0
    GLONASS = 1
    BEIDOU = 2
    GALILEO = 3
    GPS_GLONASS = 10
    GPS_BEIDOU = 11
    GLONASS_BEIDOU = 12
    GPS_GALILEO = 13
    GLONASS_GALILEO = 14
    BEIDOU_GALILEO = 15

Ancestors

Class variables

var BEIDOU
var BEIDOU_GALILEO
var GALILEO
var GLONASS
var GLONASS_BEIDOU
var GLONASS_GALILEO
var GPS
var GPS_BEIDOU
var GPS_GALILEO
var GPS_GLONASS

Inherited members

class GnssModeQuectel (value, names=None, *, module=None, qualname=None, type=None, start=1)

An enumeration.

Expand source code
class GnssModeQuectel(GnssMode):
    GPS = 0
    GPS_BDS = 1
    GPS_GLONASS = 2
    GPS_GALILEO = 3
    GPS_GLONASS_GALILEO_BDS = 4

Ancestors

Class variables

var GPS
var GPS_BDS
var GPS_GALILEO
var GPS_GLONASS
var GPS_GLONASS_GALILEO_BDS

Inherited members

class Manufacturer (value, names=None, *, module=None, qualname=None, type=None, start=1)

Supported NIMO modem implementations.

Expand source code
class Manufacturer(IntEnum):
    """Supported NIMO modem implementations."""
    NONE = 0
    ORBCOMM = 1
    QUECTEL = 2

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.Enum

Class variables

var NONE
var ORBCOMM
var QUECTEL
class MessagePriority (value, names=None, *, module=None, qualname=None, type=None, start=1)

Message priorities for NIMO modem messages.

Expand source code
class MessagePriority(NimoIntEnum):
    """Message priorities for NIMO modem messages."""
    NONE = 0
    HIGH = 1
    MEDH = 2
    MEDL = 3
    LOW = 4

Ancestors

Class variables

var HIGH
var LOW
var MEDH
var MEDL
var NONE

Inherited members

class MessageState (value, names=None, *, module=None, qualname=None, type=None, start=1)

Message states of NIMO modem messages.

Expand source code
class MessageState(NimoIntEnum):
    """Message states of NIMO modem messages."""
    UNAVAILABLE = 0
    RX_PENDING = 1
    RX_COMPLETE = 2
    RX_RETRIEVED = 3
    TX_READY = 4
    TX_SENDING = 5
    TX_COMPLETE = 6
    TX_FAILED = 7
    TX_CANCELLED = 8

Ancestors

Class variables

var RX_COMPLETE
var RX_PENDING
var RX_RETRIEVED
var TX_CANCELLED
var TX_COMPLETE
var TX_FAILED
var TX_READY
var TX_SENDING
var UNAVAILABLE

Inherited members

class MoMessage (name: str = '', priority: MessagePriority = MessagePriority.NONE, state: MessageState = MessageState.UNAVAILABLE, length: int = 0, bytes_delivered: int = 0, payload: bytes = b'')

A Mobile-Originated Message.

Expand source code
class MoMessage(NimoMessage):
    """A Mobile-Originated Message."""
    @property
    def name(self) -> str:
        return self._message_name
    
    @name.setter
    def name(self, message_name: str):
        msg_mo_name_max_len = max(MSG_MO_NAME_MAX_LEN, MSG_MO_NAME_QMAX_LEN)
        if (not isinstance(message_name, str) or
            not 0 < len(message_name) <= msg_mo_name_max_len):
            raise ValueError('Invalid message name')
        self._message_name = message_name

Ancestors

Instance variables

var name : str
Expand source code
@property
def name(self) -> str:
    return self._message_name
class ModemAtError (*args, **kwargs)

AT command related errors with error_code property.

Expand source code
class ModemAtError(ModemError):
    """AT command related errors with error_code property."""
    @property
    def error_code(self) -> AtErrorCode:
        return AtErrorCode[self.args[0]]

Ancestors

  • ModemError
  • builtins.Exception
  • builtins.BaseException

Instance variables

var error_codeAtErrorCode
Expand source code
@property
def error_code(self) -> AtErrorCode:
    return AtErrorCode[self.args[0]]
class ModemCrc (*args, **kwargs)

Error calculating response CRC.

Expand source code
class ModemCrc(ModemError):
    """Error calculating response CRC."""

Ancestors

  • ModemError
  • builtins.Exception
  • builtins.BaseException
class ModemCrcConfig (*args, **kwargs)

Request/response mismatch of CRC presence.

Expand source code
class ModemCrcConfig(ModemError):
    """Request/response mismatch of CRC presence."""

Ancestors

  • ModemError
  • builtins.Exception
  • builtins.BaseException
class ModemError (*args, **kwargs)

Base class for NIMO modem errors.

Expand source code
class ModemError(Exception):
    """Base class for NIMO modem errors."""

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class ModemLocation (**kwargs)

A set of location-based information derived from the modem's NMEA data.

Uses 90.0/180.0 if latitude/longitude are unknown

Attributes

latitude : float
decimal degrees
longitude : float
decimal degrees
altitude : float
in metres
speed : float
in knots
heading : float
in degrees
timestamp : int
in seconds since 1970-01-01T00:00:00Z
satellites : int
in view at time of fix
fix_type : GnssFixType
1=None, 2=2D or 3=3D
fix_quality : GnssFixQuality
Enumerated lookup value
pdop : float
Probability Dilution of Precision
hdop : float
Horizontal Dilution of Precision
vdop : float
Vertical Dilution of Precision
time_iso : str
ISO 8601 formatted timestamp

Initializes a Location with default latitude/longitude 90/180.

Expand source code
class ModemLocation:
    """A set of location-based information derived from the modem's NMEA data.
    
    Uses 90.0/180.0 if latitude/longitude are unknown

    Attributes:
        latitude (float): decimal degrees
        longitude (float): decimal degrees
        altitude (float): in metres
        speed (float): in knots
        heading (float): in degrees
        timestamp (int): in seconds since 1970-01-01T00:00:00Z
        satellites (int): in view at time of fix
        fix_type (GnssFixType): 1=None, 2=2D or 3=3D
        fix_quality (GnssFixQuality): Enumerated lookup value
        pdop (float): Probability Dilution of Precision
        hdop (float): Horizontal Dilution of Precision
        vdop (float): Vertical Dilution of Precision
        time_iso (str): ISO 8601 formatted timestamp

    """
    def __init__(self, **kwargs):
        """Initializes a Location with default latitude/longitude 90/180."""
        self.latitude = float(kwargs.get('latitude', 90.0))
        self.longitude = float(kwargs.get('longitude', 180.0))
        self.altitude = float(kwargs.get('altitude', 0.0))   # metres
        self.speed = float(kwargs.get('speed', 0.0))  # knots
        self.heading = float(kwargs.get('heading', 0.0))   # degrees
        self.timestamp = int(kwargs.get('timestamp', 0))   # seconds (unix)
        self.satellites = int(kwargs.get('satellites', 0))
        self.fix_type = GnssFixType(int(kwargs.get('fix_type', 1)))
        self.fix_quality = GnssFixQuality(int(kwargs.get('fix_quality', 0)))
        self.pdop = float(kwargs.get('pdop', 99))
        self.hdop = float(kwargs.get('hdop', 99))
        self.vdop = float(kwargs.get('vdop', 99))
        # self.satellites_info: 'list[GnssSatelliteInfo]' = kwargs.get(
        #     'satellites_info', []
        # )

    @property
    def time_iso(self) -> str:
        return f'{ts_to_iso(self.timestamp)}'

    # def _update_satellites_info(self,
    #                             satellites_info: 'list[GnssSatelliteInfo]'):
    #     """Populates satellite information based on NMEA GSV data."""
    #     for satellite_info in satellites_info:
    #         if isinstance(satellite_info, GnssSatelliteInfo):
    #             new = True
    #             for i, info in enumerate(self.satellites_info):
    #                 if info.prn == satellite_info.prn:
    #                     new = False
    #                     self.satellites_info[i] = satellite_info
    #                     break
    #             if new:
    #                 self.satellites_info.append(satellite_info)

    def __repr__(self) -> str:
        obj = deepcopy(self.__dict__)
        for k, v in obj.items():
            if k in ['latitude', 'longitude']:
                obj[k] = round(v, 5)
            elif isinstance(v, float):
                obj[k] = round(v, 1)
        return json.dumps(obj, skipkeys=True)

Instance variables

var time_iso : str
Expand source code
@property
def time_iso(self) -> str:
    return f'{ts_to_iso(self.timestamp)}'
class ModemTimeout (*args, **kwargs)

Serial response timeout.

Expand source code
class ModemTimeout(ModemError):
    """Serial response timeout."""

Ancestors

  • ModemError
  • builtins.Exception
  • builtins.BaseException
class MtMessage (name: str = '', priority: MessagePriority = MessagePriority.NONE, state: MessageState = MessageState.UNAVAILABLE, length: int = 0, bytes_delivered: int = 0, payload: bytes = b'')

A Mobile-Terminated message.

Expand source code
class MtMessage(NimoMessage):
    """A Mobile-Terminated message."""
    @property
    def bytes_delivered(self) -> int:
        if self._bytes_delivered < self.length:
            # bytes delivered not updated during parsing - update
            self._bytes_delivered = self.length
        return self._bytes_delivered

    @bytes_delivered.setter
    def bytes_delivered(self, value: int):
        if not isinstance(value, int) or value < 0:
            raise ValueError('Invalid bytes delivered')
        if value > self.length:
            _log.error('Bytes delivered mismatch with message length')
            return
        self._bytes_delivered = value

Ancestors

Instance variables

var bytes_delivered : int
Expand source code
@property
def bytes_delivered(self) -> int:
    if self._bytes_delivered < self.length:
        # bytes delivered not updated during parsing - update
        self._bytes_delivered = self.length
    return self._bytes_delivered
class NetworkStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)

Simplified state of network acquisition and tracking.

Expand source code
class NetworkStatus(NimoIntEnum):
    """Simplified state of network acquisition and tracking."""
    UNKNOWN = 0
    RX_STOPPED = 1
    RX_SEARCHING = 2
    RX_ACQUIRING = 3
    RX_ONLY_NOT_REGISTERED = 4
    OK = 5
    SUSPENDED = 6
    MUTED = 7
    BLOCKED = 8

Ancestors

Class variables

var BLOCKED
var MUTED
var OK
var RX_ACQUIRING
var RX_ONLY_NOT_REGISTERED
var RX_SEARCHING
var RX_STOPPED
var SUSPENDED
var UNKNOWN

Inherited members

class NimoModem (serial_port: str, **kwargs)

A class for NIMO satellite IoT modem interaction.

Instantiate the NimoModem object.

For additional kwargs see PySerial API: https://pyserial.readthedocs.io/en/latest/pyserial_api.html

Args

serial_port : str
The path to the modem's serial port.

Keyword Args: baudrate (int): The baud rate of the modem (default 9600)

Raises

ConnectionError if unable to connect to the serial port.

Expand source code
class NimoModem:
    """A class for NIMO satellite IoT modem interaction."""
    # __slots__ = ('_modem', '_mobile_id',
    #              '_mfr_code', '_modem_booted',
    #              )
    
    def __init__(self, serial_port: str, **kwargs) -> None:
        """Instantiate the NimoModem object.
        
        For additional kwargs see PySerial API:
        https://pyserial.readthedocs.io/en/latest/pyserial_api.html
        
        Args:
            serial_port (str): The path to the modem's serial port.
        
        Keyword Args:
            baudrate (int): The baud rate of the modem (default 9600)
        
        Raises:
            `ConnectionError` if unable to connect to the serial port.
        
        """
        try:
            self._serial = Serial(serial_port, **kwargs)
        except Exception as exc:
            raise ConnectionError('Unable to connect to serial') from exc
        self._modem: AtCommandBuffer = AtCommandBuffer(self._serial)
        self._baudrate: int = self._serial.baudrate
        self._is_connected: bool = False
        self._modem_booted: bool = False
        self._mobile_id: str = ''
        self._manufacturer: Manufacturer = Manufacturer.NONE
    
    @property
    def is_ready(self) -> bool:
        return not self._modem._lock.locked()
    
    @property
    def crc_enabled(self) -> bool:
        return self._modem.crc
    
    @property
    def modem_booted(self) -> bool:
        return self._modem_booted
    
    @property
    def _mfr(self) -> Manufacturer:
        """Used internally to support different manufacturer commands."""
        if not self._manufacturer:
            self.get_manufacturer()
            _log.debug('Using %s manufacturer commands', self._manufacturer)
        return self._manufacturer
    
    @property
    def _mo_msg_name_len_max(self) -> int:
        """Used internally to restrict the length of the MO message name."""
        maxlen = MSG_MO_NAME_MAX_LEN
        if self._mfr == Manufacturer.QUECTEL:
            maxlen = MSG_MO_NAME_QMAX_LEN
        return maxlen
    
    def _at_command_response(self,
                             command: str,
                             prefix: str = '',
                             timeout: int = DEFAULT_AT_TIMEOUT) -> str:
        """Send a command and return the response.
        
        Blocks until response has been received.
        
        Args:
            command (str): The AT command to send.
            prefix (str): Optional prefix to remove from response.
            timeout (int): Maximum time in seconds to wait for response.
        
        Raises:
            `ModemTimeout` if no response is received.
            `ModemCrcConfig` if CRC is not used on both request/response.
            `ModemAtError` for other cases of errored response.
        
        """
        self._modem.send_at_command(command)
        err = self._modem.read_at_response(prefix, timeout)
        if err == AtErrorCode.OK:
            return self._modem.get_response()
        elif err == AtErrorCode.TIMEOUT:
            raise ModemTimeout(f'AT response timed out after {timeout} seconds')
        elif err == AtErrorCode.CRC_CONFIG_MISMATCH:
            raise ModemCrcConfig('Reponse checksum {}expected'.format(
                                 '' if self.crc_enabled else 'un'))
        elif err == AtErrorCode.INVALID_RESPONSE_CRC:
            raise ModemCrc(err.name)
        else:
            err = self.get_last_error_code()
            if err == AtErrorCode.INVALID_CRC:
                raise ModemCrc(err.name)
            raise ModemAtError(err.name)
    
    def connect(self) -> None:
        """Attach to the modem via serial communications.
        
        Provided for backward compatibility. Connection is usually automatic
        when instantiating `NimoModem`.
        
        """
        if not self._serial.is_open:
            self._serial.open()
    
    def disconnect(self) -> None:
        """Detach from the modem serial communications.
        
        Provided for backward compatibility. Not typically required.
        
        """
        self._is_connected = False
        self._modem_booted = False
        if self._serial.is_open:
            self._serial.close()
    
    def is_connected(self) -> bool:
        """Indicates if the modem is responding to a basic AT query."""
        try:
            self._at_command_response('AT')
            self._is_connected = True
            self._modem_booted = True
            return True
        except ModemError:
            self._is_connected = False
            self._modem_booted = False
            return False
    
    @property
    def baudrate(self) -> int:
        """The baudrate of the serial connection."""
        return self._modem.serial.baudrate
    
    @baudrate.setter
    def baudrate(self, baudrate: int):
        """Set the baud rate of the modem and adjust the serial rate."""
        if baudrate not in [9600, 115200]:
            raise ValueError('Invalid baudrate')
        self._at_command_response(f'AT+IPR={baudrate}')
        self._modem.serial.baudrate = baudrate
    
    def retry_baudrate(self) -> bool:
        """"""
        for baud in BAUDRATES:
            self._modem.serial.baudrate = baud
            if self.is_connected():
                return True
        return False
    
    def await_boot(self, boot_timeout: int = 10) -> bool:
        """Indicates if a boot string is received within a timeout window.
        
        Use `is_connected` before waiting for boot.
        
        Args:
            boot_timeout (int): The maximum time to wait in seconds.
        
        Returns:
            True if a valid boot string was received inside the timeout.
        
        """
        boot_strings = ['ST Version', 'RDY']
        _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout)
        rx_data = ''
        started = time.time()
        while time.time() - started < boot_timeout and not self._modem_booted:
            while self._modem.is_data_waiting():
                rx_data += self._modem.read_rx_buffer()
            if rx_data and any(b in rx_data for b in boot_strings):
                self._modem_booted = True
                _log.debug('Found boot string - clearing Rx buffer')
                while self._modem.is_data_waiting():
                    rx_data += self._modem.read_rx_buffer()
                break
        return self._modem_booted
    
    def get_last_error_code(self) -> AtErrorCode:
        """Get the last error code from the modem."""
        return AtErrorCode(int(self._at_command_response('ATS80?')))
    
    def initialize(self,
                   echo: bool = True,
                   verbose: bool = True,
                   ) -> bool:
        """Initialize the modem AT configuration for Echo and Verbose."""
        at_command = (f'ATZ;E{int(echo)};V{int(verbose)}')
        try:
            self._at_command_response(at_command)
            return True
        except ModemCrcConfig:
            _log.info('Attempting re-initialize with CRC enabled')
            self._at_command_response(at_command)
            return True
    
    def set_crc(self, enable: bool = False) -> bool:
        """Enable or disable CRC error checking on the modem serial port."""
        try:
            self._at_command_response(f'AT%CRC={int(enable)}')
            return True
        except ModemCrcConfig:
            if ((self._modem.crc and enable) or
                (not self._modem.crc and not enable)):
                return True
            return False
    
    def reset_factory_config(self) -> None:
        """Reset the modem's factory default configuration."""
        self._at_command_response('AT&F')
    
    def save_config(self) -> None:
        """Store the current configuration to modem non-volatile memory."""
        self._at_command_response('AT&W')
    
    def get_mobile_id(self) -> str:
        """Get the modem's globally unique identifier."""
        if not self._mobile_id:
            try:
                self._mobile_id = self._at_command_response('AT+GSN', '+GSN:')
                if vlog(VLOG_TAG):
                    _log.debug('Cached Mobile ID %s', self._mobile_id)
            except ModemError:
                self._mobile_id = ''
                raise
        return self._mobile_id
    
    @property
    def _is_simulator(self) -> bool:
        return self.get_mobile_id().startswith('00000000')
    
    def get_manufacturer(self) -> str:
        """Get the manufacturer name."""
        if not self._manufacturer:
            try:
                mfr = self._at_command_response('ATI')
                if 'quectel' in mfr.lower():
                    self._manufacturer = Manufacturer.QUECTEL
                else:
                    if not any(m in mfr.lower()
                               for m in ['orbcomm', 'skywave']):
                        _log.warning('Unsupported manufacturer %s', mfr)
                    self._manufacturer = Manufacturer.ORBCOMM
                if vlog(VLOG_TAG):
                    _log.debug('Caching manufacturer: %s',
                               self._manufacturer.name)
            except ModemError:
                self._manufacturer = Manufacturer.NONE
                raise
        return self._manufacturer.name
    
    def get_model(self) -> str:
        """Get the manufacturer model name."""
        cmd = 'ATI4'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'ATI'
        try:
            response = self._at_command_response(cmd)
            if response:
                if self._mfr == Manufacturer.QUECTEL:
                    response = response.split('\n')[1]
            return response
        except ModemError:
            return ''
    
    def get_firmware_version(self) -> str:
        """Get the modem's firmware version."""
        # TODO: Firmware structure with hardware, firmware, software?
        return self._at_command_response('AT+GMR', '+GMR:')
    
    def get_system_time(self) -> int:
        """Get the system/GNSS time from the modem."""
        try:
            nimo_time = self._at_command_response('AT%UTC', '%UTC:')
            iso_time = nimo_time.replace(' ', 'T') + 'Z'
            return iso_to_ts(iso_time)
        except ModemError:
            return 0
    
    def get_temperature(self) -> 'int|None':
        """Get the processor temperature in Celsius."""
        try:
            return int(int(self._at_command_response('ATS85?')) / 10)
        except ValueError:
            return None
    
    def is_transmit_allowed(self) -> bool:
        """Indicates if the modem is able to transmit data."""
        return self.get_network_status() == 5
    
    def is_blocked(self) -> bool:
        """Indicates if line-of-sight to the satellite is blocked."""
        return self.get_network_status() == 8
    
    def is_muted(self) -> bool:
        """Indicates if the modem has been muted (disallowed to transmit data).
        """
        return self.get_network_status() == 7
    
    def is_updating_network(self) -> bool:
        """Indicates if the modem is updating network information.
        
        The modem should not be powered down during a network update.
        
        """
        return self.get_network_status() == 4
    
    def get_network_status(self) -> NetworkStatus:
        """Get the current satellite acquisition status."""
        cmd = 'ATS54?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QREG?'
            prefix = '+QREG:'
        return NetworkStatus(int(self._at_command_response(cmd, prefix)))
    
    def get_rssi(self) -> float:
        """Get the current Received Signal Strength Indicator.
        
        Also referred to as SNR or C/N0 (dB-Hz)
        
        """
        cmd = 'ATS90=3 S91=1 S92=1 S116?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSCN'
            prefix = '+QSCN:'
        try:
            return int(self._at_command_response(cmd, prefix)) / 100
        except ValueError:
            return 0
    
    def get_signal_quality(self) -> SignalQuality:
        """Get a qualitative indicator from 0..5 of the satellite signal."""
        snr = self.get_rssi()
        if snr >= SignalLevelRegional.INVALID.value:
            return SignalQuality.WARNING
        if snr >= SignalLevelRegional.BARS_5.value:
            return SignalQuality.STRONG
        if snr >= SignalLevelRegional.BARS_4.value:
            return SignalQuality.GOOD
        if snr >= SignalLevelRegional.BARS_3.value:
            return SignalQuality.MID
        if snr >= SignalLevelRegional.BARS_2.value:
            return SignalQuality.LOW
        if snr >= SignalLevelRegional.BARS_1.value:
            return SignalQuality.WEAK
        return SignalQuality.NONE
    
    def get_acquisition_detail(self) -> AcquisitionInfo:
        """Get the detailed satellite acquisition status.
        
        Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr`
        indicators.
        
        """
        cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QEVNT=3,1'
            prefix = '+QEVNT:'
        result_str = self._at_command_response(cmd, prefix, timeout=10)
        if self._mfr == Manufacturer.ORBCOMM:
            results = [int(x) for x in result_str.split('\n')]
            ctrl_state = ControlState(results[0])
            beam_state = BeamState(results[1])
            rssi = float(results[2]) / 100
            vcid = results[3]
        elif self._mfr == Manufacturer.QUECTEL:
            # Workaround Quectel 20230731 documentation error says +QEVNT:
            result_str = result_str.replace('+QEVENT:', '').strip()
            results = [int(x) for x in result_str.split(',')]
            # <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
            #   <class>,<subclass>,<priority>,<data0>,...
            data0 = 7   # list index where trace data starts
            ctrl_state = ControlState(results[data0+22])
            beam_state = BeamState(results[data0+23])
            rssi = float(results[data0+16]) / 100
            vcid = results[data0+1]
        return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
    
    def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage':
        """Submits data to send as a mobile-originated message.
        
        If a `message_name` is not supplied one will be generated using the
        least significant 8 digits of unix timestamp.
        
        Args:
            data (bytes): The data to send.
        
        Keyword Args:
            message_name (str): Optional handle for message in Tx queue. Max 8
                characters for Orbcomm modem or 12 for Quectel.
            priority (int): Optional priority 1 (highest) .. 4 (low, default).
                May use `MessagePriority`.
            codec_sin (int): Optional first byte of payload to add as a codec
                service identifier, must be in range 16..255.
            codec_min (int): Optional second byte of payload to add as a codec
                message identifier, must be in range 0..255.
            return_message (bool): If set, returns a `MoMessage` instead of the
                message handle.
        
        Returns:
            Message handle (str) or `MoMessage` if `return_message` kwarg is set.
        
        Raises:
            `ValueError` for various parameter limit violations.
        
        """
        data_size = len(data)
        msg_payload_sin_min = b''
        message_name = kwargs.get('message_name', '')
        priority = MessagePriority(kwargs.get('priority',
                                              MessagePriority.LOW.value))
        codec_sin: int = kwargs.get('codec_sin', -1)
        codec_min: int = kwargs.get('codec_min', -1)
        if codec_sin > -1:
            data_size += 1
            msg_payload_sin_min += codec_sin.to_bytes(1, 'big')
        if codec_min > -1:
            data_size += 1
            msg_payload_sin_min += codec_min.to_bytes(1, 'big')
        if not 2 <= data_size <= MSG_MO_MAX_SIZE:
            raise ValueError('Invalid mobile-originated message size')
        if message_name and len(message_name) > self._mo_msg_name_len_max:
            raise ValueError('Message name too long')
        data_index = 0
        if codec_sin <= -1:
            codec_sin = data[0]
            data_index += 1
            data_size -= 1
        if codec_sin not in range(16, 256):
            raise ValueError('Illegal first payload byte SIN must be 16..255')
        if codec_min <= -1:
            codec_min = data[1]
            data_index += 1
            data_size -= 1
        if codec_min > 255:
            raise ValueError('Invalid second payload byte MIN must be 0..255')
        max_name_len = self._mo_msg_name_len_max
        if message_name and len(message_name) > max_name_len:
            raise ValueError(f'Invalid message name longer than {max_name_len}')
        if len(message_name) == 0:
            message_name = f'{int(time.time())}'[-max_name_len:]
        # Convert to base64 string for serial efficiency
        #   no effect on OTA size, modem always decodes and sends raw bytes OTA
        data_format = DataFormat.BASE64
        formatted_data = base64.b64encode(data[data_index:]).decode('utf-8')
        cmd = 'AT%MGRT='
        codec_sep = '.'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGT='
            codec_sep = ','
        cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}'
               f'{codec_min},{data_format},{formatted_data}')
        self._at_command_response(cmd)
        if kwargs.get('return_message', False) is True:
            return MoMessage(message_name, priority, MessageState.TX_READY,
                                payload=(msg_payload_sin_min + data))
        return message_name
    
    def send_text(self, text: str, **kwargs) -> 'str|MoMessage':
        """Submits a text string to send as data.
        
        If `codec_sin` kwarg is not provided 128 is prepended as the first byte.
        If `codec_min` kwarg is not provided 1 is prepended as the second byte.
        Other kwargs as per `send_data`.
        
        Args:
            text (str): The text message to send.
        
        Returns:
            (str) The message name assigned or MoMessage if kwarg
                `return_message` is set.
        
        """
        data = b''
        codec_sin = int(kwargs.get('codec_sin', 128))
        data += codec_sin.to_bytes(1, 'big')
        codec_min = int(kwargs.get('codec_min', 1))
        data += codec_min.to_bytes(1, 'big')
        data += text.encode()
        flowthru = ['message_name', 'priority', 'return_message']
        next_kwargs = { k:v for k, v in kwargs if k in flowthru }
        return self.send_data(data, **next_kwargs)
    
    def cancel_mo_message(self, message_name: str) -> bool:
        """Attempts to cancel a previously submitted mobile-originated message.
        
        Args:
            message_name (str): The mobile-originated message handle to delete.
        
        """
        _log.debug('Attempting to cancel MO message %s', message_name)
        cmd = 'AT%MGRC'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGC'
        cmd += f'="{message_name}"'
        self._at_command_response(cmd)
        message_states = self.get_mo_message_states(message_name)
        if len(message_states) > 0:
            state = message_states[0].state
            if state == MessageState.TX_CANCELLED:
                return True
        elif self._is_simulator:
            return True
        _log.warn('Failed to cancel message %s', message_name)
        return False
    
    def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]':
        """Get a list of mobile-originated message states in the modem Tx queue.
        
        Args:
            message_name (str): Optional filter on message name.
        
        Returns:
            A list of `MoMessage` objects including state and metadata.
        
        """
        cmd = 'AT%MGRS'
        prefix = '%MGRS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QSMGS'
            prefix = '+QSMGS:'
        if message_name and not self._is_simulator:
            # Orbcomm Modem Simulator returns ERROR for %MGRS= command
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=True)
    
    def _parse_message_states(self,
                              response_str: str,
                              is_mo: bool,
                              ) -> 'list[NimoMessage]':
        """Parses textual metadata to build a SatelliteMessageState."""
        mo_states = []
        if not response_str:
            return mo_states
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message states from %s',
                       'MO' if is_mo else 'MT', response_str)
        states_meta = [m for m in response_str.split('\n') if m != '']
        for meta in states_meta:
            message = MoMessage() if is_mo else MtMessage()
            for field_idx, field_data in enumerate(meta.split(',')):
                self._update_message_state(message, field_idx,
                                           field_data, is_mo)
            mo_states.append(message)
        return mo_states
    
    def _update_message_state(self,
                              message_state: NimoMessage,
                              field_idx: int,
                              field_data: str,
                              is_mo: bool) -> None:
        """Parse textual metadata to update a message's state."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing %s message state index %d: %s',
                       'MO' if is_mo else 'MT', field_idx, field_data)
        mfr = self._mfr
        if field_idx == 0:
            message_state.name = field_data.replace('"', '')
            if vlog(VLOG_TAG):
                _log.debug('Message name: %s', message_state.name)
        elif field_idx == 1 and mfr == Manufacturer.ORBCOMM:
            if vlog(VLOG_TAG):
                _log.debug('Ignoring msgNum %s', field_data)
        elif ((field_idx == 2 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
            message_state.priority = MessagePriority(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message priority %s', message_state.priority.name)
        elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
            if vlog(VLOG_TAG):
                _log.debug('Ignoring codec SIN %s', field_data)
        elif ((field_idx == 4 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
            message_state.state = MessageState(int(field_data))
            if vlog(VLOG_TAG):
                _log.debug('Message state: %s', message_state.state.name)
        elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
            message_state.length = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Message size: %d bytes', message_state.length)
        elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
              (field_idx == 5 and mfr == Manufacturer.QUECTEL)):
            message_state.bytes_delivered = int(field_data)
            if vlog(VLOG_TAG):
                _log.debug('Bytes delivered: %d', message_state.bytes_delivered)
        else:
            _log.warning('Unhandled field index %d (%s) for manufacturer %s',
                         field_idx, 'MO' if is_mo else 'MT', mfr.name)
    
    def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]':
        """Get a list of mobile-terminated message states in the modem Tx queue.
        
        Args:
            message_name (str): Optional filter on message name.
        
        Returns:
            A list of `MtMessage` objects including state and metadata.
        
        """
        cmd = 'AT%MGFN' if not message_name else 'AT%MGFS'
        prefix = '%MGFN:' if not message_name else 'AT%MGFS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS'
            prefix = '+QRMGN:' if not message_name else '+QRMGS:'
        if message_name and not self._is_simulator:
            cmd += f'="{message_name}"'
        response_str = self._at_command_response(cmd, prefix)
        return self._parse_message_states(response_str, is_mo=False)
    
    def get_mt_message(self, message_name: str) -> 'MtMessage|None':
        """Get a mobile-terminated message from the modem's Rx queue by name."""
        cmd = 'AT%MGFG'
        prefix = '%MGFG:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+GRMGR'
            prefix = '+GRMGR:'
        data_format = DataFormat.BASE64
        cmd += f'="{message_name}",{data_format}'
        response = self._at_command_response(cmd, prefix)
        if response:
            return self._parse_mt_message(response)
        return None
    
    def _parse_mt_message(self, meta: str) -> MtMessage:
        """Parse textual metadata to build a MtMessage."""
        if vlog(VLOG_TAG):
            _log.debug('Parsing MT message from meta: %s', meta)
        data_includes_sin = False
        mfr = self._mfr
        message = MtMessage()
        for field_idx, field_data in enumerate(meta.split(',')):
            if field_idx == 0:
                message.name = field_data.replace('"', '')
                if vlog(VLOG_TAG):
                    _log.debug('Message name: %s', message.name)
            elif (field_idx == 1 and mfr == Manufacturer.ORBCOMM):
                if vlog(VLOG_TAG):
                    _log.debug('Ignoring msgNum %s', field_data)
            elif (field_idx == 2 and mfr == Manufacturer.ORBCOMM):
                message.priority = MessagePriority(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message priority %s', message.priority.name)
            elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 1 and mfr == Manufacturer.QUECTEL)):
                codec_sin = int(field_data)
                if not data_includes_sin:
                    message.payload += codec_sin.to_bytes(1, 'big')
                if vlog(VLOG_TAG):
                    _log.debug('Added SIN as first payload byte: %d', codec_sin)
            elif (field_idx == 4 and mfr == Manufacturer.ORBCOMM):
                message.state = MessageState(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Message state %s', message.state.name)
            elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 2 and mfr == Manufacturer.QUECTEL)):
                message.length = int(field_data)
                if vlog(VLOG_TAG):
                    _log.debug('Message size: %d bytes', message.length)
            elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 3 and mfr == Manufacturer.QUECTEL)):
                data_format = DataFormat(int(field_data))
                if vlog(VLOG_TAG):
                    _log.debug('Data format %s', data_format.name)
            elif ((field_idx == 7 and mfr == Manufacturer.ORBCOMM) or
                  (field_idx == 4 and mfr == Manufacturer.QUECTEL)):
                if vlog(VLOG_TAG):
                    _log.debug('Decoding payload from: %s', field_data)
                if message.length > 0:
                    if data_format == DataFormat.BASE64:
                        message.payload += base64.b64decode(field_data)
                    elif data_format == DataFormat.HEX:
                        message.payload += bytes.fromhex(field_data)
                    else:   # DataFormat.TEXT
                        message.payload += field_data.encode()
                    if message.length != len(message.payload):
                        _log.warn('Message length mismatch')
        return message
    
    def delete_mt_message(self, message_name: str) -> bool:
        """Remove a mobile-terminated message from the modem's Rx queue."""
        cmd = 'AT%MGFM'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QRMGM'
        cmd += f'="{message_name}"'
        self._at_command_response(cmd)
        check = self.get_mt_message_states(message_name)
        if check and check[0].state == MessageState.RX_RETRIEVED:
            return True
        return False
    
    def receive_data(self, message_name: str) -> 'bytes|None':
        """Get the raw data from a mobile-terminated message."""
        message = self.get_mt_message(message_name)
        if message:
            return message.payload
        return None
    
    def get_gnss_mode(self) -> GnssMode:
        """Get the modem's GNSS receiver mode."""
        cmd = 'ATS39?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSMOD?'
            prefix = '+QGNSSMOD:'
        response = self._at_command_response(cmd, prefix)
        if self._mfr == Manufacturer.QUECTEL:
            return GnssModeQuectel(int(response))
        return GnssModeOrbcomm(int(response))
    
    def set_gnss_mode(self, gnss_mode: GnssMode) -> None:
        """Get the modem's GNSS receiver mode."""
        cmd = f'ATS39={gnss_mode}'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            if not GnssModeQuectel.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
            cmd = f'AT+QGNSSMOD={gnss_mode}'
            prefix = '+QGNSSMOD:'
        else:
            if not GnssModeOrbcomm.is_valid(gnss_mode):
                raise ValueError('Invalid GNSS mode')
        self._at_command_response(cmd, prefix)
    
    def get_gnss_continuous(self) -> int:
        """Get the modem's GNSS continuous refresh interval in seconds."""
        cmd = 'ATS55?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSSCW?'
            prefix = '+QGNSSCW:'
        try:
            return int(self._at_command_response(cmd, prefix))
        except ValueError:
            return 0
    
    def set_gnss_continuous(self, interval: int) -> None:
        """Set the modem's GNSS continuous refresh interval in seconds.
        
        Args:
            interval (int): Automatic update interval 0..30 seconds.
        
        Returns:
            `True` if successful.
        
        Raises:
            `ValueError` if invalid interval is specified.
        
        """
        if interval not in range (0, 31):
            raise ValueError('Invalid GNSS refresh interval')
        cmd = f'ATS55={interval}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QGNSSCW={interval}'
        self._at_command_response(cmd)
    
    def get_nmea_data(self,
                      stale_secs: int = 1,
                      wait_secs: int = 35,
                      rmc: bool = True,
                      gga: bool = True,
                      gsa: bool = True,
                      gsv: bool = False,
                      ) -> str:
        """Get a set of NMEA data detailing the modem's location.
        
        Args:
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
            rmc (bool): Include Recommended Minimum data.
            gga (bool): Include altitude and fix quality data.
            gsa (bool): Include Dilution of Precision data.
            gsv (bool): Include verbose GNSS satellite details.
        
        """
        cmd = 'AT%GPS'
        prefix = '%GPS:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QGNSS'
            prefix = '+QGNSS:'
        cmd += f'={stale_secs},{wait_secs}'
        if rmc:
            cmd += ',"RMC"'
        if gga:
            cmd += ',"GGA"'
        if gsa:
            cmd += ',"GSA"'
        if gsv:
            cmd += ',"GSV"'
        try:
            response = self._at_command_response(cmd, prefix, wait_secs + 5)
            return response
        except ModemAtError as exc:
            if exc.error_code != AtErrorCode.GNSS_TIMEOUT:
                raise
        return ''
    
    def get_location(self,
                     stale_secs: int = 1,
                     wait_secs: int = 35) -> 'ModemLocation|None':
        """Get the modem's location.
        
        Args:
            stale_secs (int): Maximum cached fix age to use in seconds.
            wait_secs (int): Maximum duration to wait for a fix in seconds.
        
        Returns:
            ModemLocation object if GNSS does not time out waiting for fix.
        
        """
        nmea_data = self.get_nmea_data(stale_secs, wait_secs)
        if nmea_data:
            return get_location_from_nmea_data(nmea_data)
        return None
    
    def get_satellite_info(self) -> 'SatelliteLocation|None':
        """Get the satellite's information including azimuth and elevation.
        
        Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
        
        Returns:
            `SatelliteLocation` object (azimuth, elevation) if determinable.
        
        """
        geobeam = None
        modem_location = self.get_location()
        if (modem_location is not None and
            self.get_network_status() > NetworkStatus.RX_SEARCHING):
            # satellite has been found
            cmd = 'ATS90=3 S91=5 S92=1 S102?'
            prefix = ''
            if self._mfr == Manufacturer.QUECTEL:
                cmd = 'AT+QEVNT=3,5'
                prefix = '+QEVNT:'
            response = self._at_command_response(cmd, prefix)
            if self._mfr == Manufacturer.QUECTEL:
                # workaround documentation error
                response = response.replace('+QEVENT:', '').strip()
                response = response.split(',')[9]
            geobeam = GeoBeam(int(response))
            return get_satellite_location(modem_location, geobeam)
        return None
    
    def get_event_mask(self) -> int:
        """Get the set of monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS88?'
        try:
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    
    def set_event_mask(self, event_mask: int) -> None:
        """Set monitored events that trigger event notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        max_bits = 12
        if not isinstance(event_mask, int) or event_mask > 2**max_bits-1:
            raise ValueError('Invalid event bitmask')
        cmd = f'ATS88={event_mask}'
        self._at_command_response(cmd)
    
    def get_events_asserted_mask(self) -> int:
        """Get the set of events that are active following a notification."""
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'ATS89?'
        try:
            return int(self._at_command_response(cmd))
        except ValueError:
            return 0
    
    def get_trace_event_monitor(self,
                                asserted_only: bool = False,
                                ) -> 'list[tuple[int, int]]':
        """Get the list of monitored Trace Events.
        
        Args:
            asserted_only (bool): If True returns only asserted monitored events
        
        Returns:
            A list of tuples (trace_class, trace_subclass)
        
        Raises:
            `ModemError` if unsupported by the modem type.
        
        """
        if self._mfr != Manufacturer.ORBCOMM:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT%EVMON'
        prefix = '%EVMON:'
        trace_events = []
        events = self._at_command_response(cmd, prefix).split(',')
        for event in events:
            trace_class = int(event.split('.')[0])
            trace_subclass = int(event.split('.')[1].replace('*', ''))
            if not asserted_only or event.endswith('*'):
                trace_events.append((trace_class, trace_subclass))
        return trace_events
    
    def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None:
        """Set the list of monitored trace events."""
        cmd = 'AT%EVMON='
        for event in events:
            if not cmd.endswith('='):
                cmd += ','
            cmd += f'{event[0]}.{event[1]}'
        self._at_command_response(cmd)
    
    def get_trace_events_cached(self) -> 'list[tuple[int, int]]':
        """Get a list of trace events cached."""
        return self.get_trace_event_monitor(True)
    
    def get_trace_event_data(self,
                             event: 'tuple[int, int]',
                             decode: bool = False,
                             ) -> 'list[int]|dict[str, int]':
        """Get the trace event data.
        
        Args:
            event (tuple): The trace (class, subclass)
            decode (bool): Decodes raw data to dictionary (not implemented)
        
        """
        cmd = f'AT%EVNT={event[0]},{event[1]}'
        prefix = '%EVNT:'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = cmd.replace('%EVNT', '+QEVNT')
            prefix = '+QEVENT:'   # documented as +QEVNT
        trace = self._at_command_response(cmd, prefix)
        if decode:
            raise NotImplementedError
        return [int(i) for i in trace.split(',')]
            
    def get_urc_ctl(self) -> int:
        """Get the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = 'AT+QURCCTL?'
        prefix = '+QURCCTL:'
        try:
            return int(self._at_command_response(cmd, prefix), 16)
        except ValueError:
            return 0
    
    def set_urc_ctl(self, qurc_mask: int) -> None:
        """Set the event list that trigger Unsolicited Report Codes."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        cmd = f'AT+QURCCTL=0x{qurc_mask:04X}'
        self._at_command_response(cmd)
    
    def get_urc(self) -> 'UrcCode|None':
        """Get the pending Unsolicited Result Code if one is present."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ValueError('Modem does not support this feature')
        eol = '\r\n' if self._modem.verbose else '\r'
        result = self._modem.read_rx_buffer(read_until=eol)
        if result:
            result = result.replace('+QURC:', '').strip()
            try:
                return UrcCode(int(result))
            except ValueError:
                return UrcCode[result]
        return None
    
    def get_power_mode(self) -> PowerMode:
        """Get the modem's power mode configuration."""
        cmd = 'ATS50?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPMD?'
            prefix = '+QPMD:'
        return PowerMode(int(self._at_command_response(cmd, prefix)))
    
    def set_power_mode(self, power_mode: PowerMode) -> None:
        """Set the modem's power mode configuration."""
        if not PowerMode.is_valid(power_mode):
            raise ValueError('Invalid Power Mode')
        cmd = f'ATS50={power_mode}'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = f'AT+QPMD={power_mode}'
        self._at_command_response(cmd)
    
    def get_wakeup_period(self) -> WakeupPeriod:
        """Get the modem's wakeup period configuration."""
        cmd = 'ATS51?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QWKUPCFG?'
            prefix = '+QWKUPCFG:'
        if self._mfr == Manufacturer.QUECTEL:
            return WakeupPeriod(int(
                self._at_command_response(cmd, prefix).split(',')[0]))
        return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
    
    def set_wakeup_period(self,
                          wakeup_period: WakeupPeriod,
                          wakeup_way: 'WakeupWay|None' = None,
                          ) -> None:
        """Set the modem's wakeup period configuration.
        
        The configuration does not update until confimed by the network.
        
        """
        if not WakeupPeriod.is_valid(wakeup_period):
            raise ValueError('Invalid wakeup period')
        cmd = f'ATS51={wakeup_period}'
        if self._mfr == Manufacturer.QUECTEL:
            if wakeup_way is None:
                query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:')
                wakeup_way = WakeupWay(int(query.split(',')[1]))
            cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}'
        self._at_command_response(cmd)
    
    def get_wakeup_way(self) -> WakeupWay:
        """Get the modem wakeup method."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QWKUPCFG?'
        prefix = '+QWKUPCFG:'
        wakeup_way = self._at_command_response(cmd, prefix).split(',')[1]
        return WakeupWay(int(wakeup_way))
    
    def power_down(self) -> None:
        """Prepare the modem for power-down."""
        cmd = 'AT%OFF'
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QPOWD=2'
        self._at_command_response(cmd)
    
    def get_workmode(self) -> WorkMode:
        """Get the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QMOD?'
        prefix = '+QMOD:'
        return WorkMode(int(self._at_command_response(cmd, prefix)))
    
    def set_workmode(self, workmode: WorkMode) -> None:
        """Set the modem working mode."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        if not WorkMode.is_valid(workmode):
            raise ValueError('Invalid workmode')
        cmd = f'AT+QMOD={workmode}'
        self._at_command_response(cmd)
    
    def get_deepsleep_enable(self) -> bool:
        """Get the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        cmd = 'AT+QSCLK?'
        prefix = '+QSCLK:'
        return bool(int(self._at_command_response(cmd, prefix)))
    
    def set_deepsleep_enable(self, enable: bool) -> None:
        """Set the deepsleep configuration flag."""
        if self._mfr != Manufacturer.QUECTEL:
            raise ModemError('Operation not supported by this modem')
        self._at_command_response(f'AT+QSCLK={int(enable)}')
    
    def get_register(self, s_register_number: int) -> 'int|None':
        """Get a modem register value."""
        cmd = f'ATS{s_register_number}?'
        try:
            return int(self._at_command_response(cmd))
        except ValueError:
            return None
    
    def set_register(self, s_register_number: int, value: int) -> None:
        """Set a modem register value."""
        cmd = f'ATS{s_register_number}={value}'
        self._at_command_response(cmd)
    
    def get_all_registers(self) -> dict:
        """Get a dictionary of modem register values."""
        raise NotImplementedError

Instance variables

var baudrate : int

The baudrate of the serial connection.

Expand source code
@property
def baudrate(self) -> int:
    """The baudrate of the serial connection."""
    return self._modem.serial.baudrate
var crc_enabled : bool
Expand source code
@property
def crc_enabled(self) -> bool:
    return self._modem.crc
var is_ready : bool
Expand source code
@property
def is_ready(self) -> bool:
    return not self._modem._lock.locked()
var modem_booted : bool
Expand source code
@property
def modem_booted(self) -> bool:
    return self._modem_booted

Methods

def await_boot(self, boot_timeout: int = 10) ‑> bool

Indicates if a boot string is received within a timeout window.

Use is_connected before waiting for boot.

Args

boot_timeout : int
The maximum time to wait in seconds.

Returns

True if a valid boot string was received inside the timeout.

Expand source code
def await_boot(self, boot_timeout: int = 10) -> bool:
    """Indicates if a boot string is received within a timeout window.
    
    Use `is_connected` before waiting for boot.
    
    Args:
        boot_timeout (int): The maximum time to wait in seconds.
    
    Returns:
        True if a valid boot string was received inside the timeout.
    
    """
    boot_strings = ['ST Version', 'RDY']
    _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout)
    rx_data = ''
    started = time.time()
    while time.time() - started < boot_timeout and not self._modem_booted:
        while self._modem.is_data_waiting():
            rx_data += self._modem.read_rx_buffer()
        if rx_data and any(b in rx_data for b in boot_strings):
            self._modem_booted = True
            _log.debug('Found boot string - clearing Rx buffer')
            while self._modem.is_data_waiting():
                rx_data += self._modem.read_rx_buffer()
            break
    return self._modem_booted
def cancel_mo_message(self, message_name: str) ‑> bool

Attempts to cancel a previously submitted mobile-originated message.

Args

message_name : str
The mobile-originated message handle to delete.
Expand source code
def cancel_mo_message(self, message_name: str) -> bool:
    """Attempts to cancel a previously submitted mobile-originated message.
    
    Args:
        message_name (str): The mobile-originated message handle to delete.
    
    """
    _log.debug('Attempting to cancel MO message %s', message_name)
    cmd = 'AT%MGRC'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGC'
    cmd += f'="{message_name}"'
    self._at_command_response(cmd)
    message_states = self.get_mo_message_states(message_name)
    if len(message_states) > 0:
        state = message_states[0].state
        if state == MessageState.TX_CANCELLED:
            return True
    elif self._is_simulator:
        return True
    _log.warn('Failed to cancel message %s', message_name)
    return False
def connect(self) ‑> None

Attach to the modem via serial communications.

Provided for backward compatibility. Connection is usually automatic when instantiating NimoModem.

Expand source code
def connect(self) -> None:
    """Attach to the modem via serial communications.
    
    Provided for backward compatibility. Connection is usually automatic
    when instantiating `NimoModem`.
    
    """
    if not self._serial.is_open:
        self._serial.open()
def delete_mt_message(self, message_name: str) ‑> bool

Remove a mobile-terminated message from the modem's Rx queue.

Expand source code
def delete_mt_message(self, message_name: str) -> bool:
    """Remove a mobile-terminated message from the modem's Rx queue."""
    cmd = 'AT%MGFM'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QRMGM'
    cmd += f'="{message_name}"'
    self._at_command_response(cmd)
    check = self.get_mt_message_states(message_name)
    if check and check[0].state == MessageState.RX_RETRIEVED:
        return True
    return False
def disconnect(self) ‑> None

Detach from the modem serial communications.

Provided for backward compatibility. Not typically required.

Expand source code
def disconnect(self) -> None:
    """Detach from the modem serial communications.
    
    Provided for backward compatibility. Not typically required.
    
    """
    self._is_connected = False
    self._modem_booted = False
    if self._serial.is_open:
        self._serial.close()
def get_acquisition_detail(self) ‑> AcquisitionInfo

Get the detailed satellite acquisition status.

Includes acquisition_state, beamsearch_state, vcid and snr indicators.

Expand source code
def get_acquisition_detail(self) -> AcquisitionInfo:
    """Get the detailed satellite acquisition status.
    
    Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr`
    indicators.
    
    """
    cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QEVNT=3,1'
        prefix = '+QEVNT:'
    result_str = self._at_command_response(cmd, prefix, timeout=10)
    if self._mfr == Manufacturer.ORBCOMM:
        results = [int(x) for x in result_str.split('\n')]
        ctrl_state = ControlState(results[0])
        beam_state = BeamState(results[1])
        rssi = float(results[2]) / 100
        vcid = results[3]
    elif self._mfr == Manufacturer.QUECTEL:
        # Workaround Quectel 20230731 documentation error says +QEVNT:
        result_str = result_str.replace('+QEVENT:', '').strip()
        results = [int(x) for x in result_str.split(',')]
        # <dataCount>,<signedBitmask>,<MTID>,<timestamp>,
        #   <class>,<subclass>,<priority>,<data0>,...
        data0 = 7   # list index where trace data starts
        ctrl_state = ControlState(results[data0+22])
        beam_state = BeamState(results[data0+23])
        rssi = float(results[data0+16]) / 100
        vcid = results[data0+1]
    return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
def get_all_registers(self) ‑> dict

Get a dictionary of modem register values.

Expand source code
def get_all_registers(self) -> dict:
    """Get a dictionary of modem register values."""
    raise NotImplementedError
def get_deepsleep_enable(self) ‑> bool

Get the deepsleep configuration flag.

Expand source code
def get_deepsleep_enable(self) -> bool:
    """Get the deepsleep configuration flag."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QSCLK?'
    prefix = '+QSCLK:'
    return bool(int(self._at_command_response(cmd, prefix)))
def get_event_mask(self) ‑> int

Get the set of monitored events that trigger event notification.

Expand source code
def get_event_mask(self) -> int:
    """Get the set of monitored events that trigger event notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'ATS88?'
    try:
        return int(self._at_command_response(cmd))
    except ValueError:
        return 0
def get_events_asserted_mask(self) ‑> int

Get the set of events that are active following a notification.

Expand source code
def get_events_asserted_mask(self) -> int:
    """Get the set of events that are active following a notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'ATS89?'
    try:
        return int(self._at_command_response(cmd))
    except ValueError:
        return 0
def get_firmware_version(self) ‑> str

Get the modem's firmware version.

Expand source code
def get_firmware_version(self) -> str:
    """Get the modem's firmware version."""
    # TODO: Firmware structure with hardware, firmware, software?
    return self._at_command_response('AT+GMR', '+GMR:')
def get_gnss_continuous(self) ‑> int

Get the modem's GNSS continuous refresh interval in seconds.

Expand source code
def get_gnss_continuous(self) -> int:
    """Get the modem's GNSS continuous refresh interval in seconds."""
    cmd = 'ATS55?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSSCW?'
        prefix = '+QGNSSCW:'
    try:
        return int(self._at_command_response(cmd, prefix))
    except ValueError:
        return 0
def get_gnss_mode(self) ‑> GnssMode

Get the modem's GNSS receiver mode.

Expand source code
def get_gnss_mode(self) -> GnssMode:
    """Get the modem's GNSS receiver mode."""
    cmd = 'ATS39?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSSMOD?'
        prefix = '+QGNSSMOD:'
    response = self._at_command_response(cmd, prefix)
    if self._mfr == Manufacturer.QUECTEL:
        return GnssModeQuectel(int(response))
    return GnssModeOrbcomm(int(response))
def get_last_error_code(self) ‑> AtErrorCode

Get the last error code from the modem.

Expand source code
def get_last_error_code(self) -> AtErrorCode:
    """Get the last error code from the modem."""
    return AtErrorCode(int(self._at_command_response('ATS80?')))
def get_location(self, stale_secs: int = 1, wait_secs: int = 35) ‑> ModemLocation|None

Get the modem's location.

Args

stale_secs : int
Maximum cached fix age to use in seconds.
wait_secs : int
Maximum duration to wait for a fix in seconds.

Returns

ModemLocation object if GNSS does not time out waiting for fix.

Expand source code
def get_location(self,
                 stale_secs: int = 1,
                 wait_secs: int = 35) -> 'ModemLocation|None':
    """Get the modem's location.
    
    Args:
        stale_secs (int): Maximum cached fix age to use in seconds.
        wait_secs (int): Maximum duration to wait for a fix in seconds.
    
    Returns:
        ModemLocation object if GNSS does not time out waiting for fix.
    
    """
    nmea_data = self.get_nmea_data(stale_secs, wait_secs)
    if nmea_data:
        return get_location_from_nmea_data(nmea_data)
    return None
def get_manufacturer(self) ‑> str

Get the manufacturer name.

Expand source code
def get_manufacturer(self) -> str:
    """Get the manufacturer name."""
    if not self._manufacturer:
        try:
            mfr = self._at_command_response('ATI')
            if 'quectel' in mfr.lower():
                self._manufacturer = Manufacturer.QUECTEL
            else:
                if not any(m in mfr.lower()
                           for m in ['orbcomm', 'skywave']):
                    _log.warning('Unsupported manufacturer %s', mfr)
                self._manufacturer = Manufacturer.ORBCOMM
            if vlog(VLOG_TAG):
                _log.debug('Caching manufacturer: %s',
                           self._manufacturer.name)
        except ModemError:
            self._manufacturer = Manufacturer.NONE
            raise
    return self._manufacturer.name
def get_mo_message_states(self, message_name: str = '') ‑> list[MoMessage]

Get a list of mobile-originated message states in the modem Tx queue.

Args

message_name : str
Optional filter on message name.

Returns

A list of MoMessage objects including state and metadata.

Expand source code
def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]':
    """Get a list of mobile-originated message states in the modem Tx queue.
    
    Args:
        message_name (str): Optional filter on message name.
    
    Returns:
        A list of `MoMessage` objects including state and metadata.
    
    """
    cmd = 'AT%MGRS'
    prefix = '%MGRS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGS'
        prefix = '+QSMGS:'
    if message_name and not self._is_simulator:
        # Orbcomm Modem Simulator returns ERROR for %MGRS= command
        cmd += f'="{message_name}"'
    response_str = self._at_command_response(cmd, prefix)
    return self._parse_message_states(response_str, is_mo=True)
def get_mobile_id(self) ‑> str

Get the modem's globally unique identifier.

Expand source code
def get_mobile_id(self) -> str:
    """Get the modem's globally unique identifier."""
    if not self._mobile_id:
        try:
            self._mobile_id = self._at_command_response('AT+GSN', '+GSN:')
            if vlog(VLOG_TAG):
                _log.debug('Cached Mobile ID %s', self._mobile_id)
        except ModemError:
            self._mobile_id = ''
            raise
    return self._mobile_id
def get_model(self) ‑> str

Get the manufacturer model name.

Expand source code
def get_model(self) -> str:
    """Get the manufacturer model name."""
    cmd = 'ATI4'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'ATI'
    try:
        response = self._at_command_response(cmd)
        if response:
            if self._mfr == Manufacturer.QUECTEL:
                response = response.split('\n')[1]
        return response
    except ModemError:
        return ''
def get_mt_message(self, message_name: str) ‑> MtMessage|None

Get a mobile-terminated message from the modem's Rx queue by name.

Expand source code
def get_mt_message(self, message_name: str) -> 'MtMessage|None':
    """Get a mobile-terminated message from the modem's Rx queue by name."""
    cmd = 'AT%MGFG'
    prefix = '%MGFG:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+GRMGR'
        prefix = '+GRMGR:'
    data_format = DataFormat.BASE64
    cmd += f'="{message_name}",{data_format}'
    response = self._at_command_response(cmd, prefix)
    if response:
        return self._parse_mt_message(response)
    return None
def get_mt_message_states(self, message_name: str = '') ‑> list[MtMessage]

Get a list of mobile-terminated message states in the modem Tx queue.

Args

message_name : str
Optional filter on message name.

Returns

A list of MtMessage objects including state and metadata.

Expand source code
def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]':
    """Get a list of mobile-terminated message states in the modem Tx queue.
    
    Args:
        message_name (str): Optional filter on message name.
    
    Returns:
        A list of `MtMessage` objects including state and metadata.
    
    """
    cmd = 'AT%MGFN' if not message_name else 'AT%MGFS'
    prefix = '%MGFN:' if not message_name else 'AT%MGFS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS'
        prefix = '+QRMGN:' if not message_name else '+QRMGS:'
    if message_name and not self._is_simulator:
        cmd += f'="{message_name}"'
    response_str = self._at_command_response(cmd, prefix)
    return self._parse_message_states(response_str, is_mo=False)
def get_network_status(self) ‑> NetworkStatus

Get the current satellite acquisition status.

Expand source code
def get_network_status(self) -> NetworkStatus:
    """Get the current satellite acquisition status."""
    cmd = 'ATS54?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QREG?'
        prefix = '+QREG:'
    return NetworkStatus(int(self._at_command_response(cmd, prefix)))
def get_nmea_data(self, stale_secs: int = 1, wait_secs: int = 35, rmc: bool = True, gga: bool = True, gsa: bool = True, gsv: bool = False) ‑> str

Get a set of NMEA data detailing the modem's location.

Args

stale_secs : int
Maximum cached fix age to use in seconds.
wait_secs : int
Maximum duration to wait for a fix in seconds.
rmc : bool
Include Recommended Minimum data.
gga : bool
Include altitude and fix quality data.
gsa : bool
Include Dilution of Precision data.
gsv : bool
Include verbose GNSS satellite details.
Expand source code
def get_nmea_data(self,
                  stale_secs: int = 1,
                  wait_secs: int = 35,
                  rmc: bool = True,
                  gga: bool = True,
                  gsa: bool = True,
                  gsv: bool = False,
                  ) -> str:
    """Get a set of NMEA data detailing the modem's location.
    
    Args:
        stale_secs (int): Maximum cached fix age to use in seconds.
        wait_secs (int): Maximum duration to wait for a fix in seconds.
        rmc (bool): Include Recommended Minimum data.
        gga (bool): Include altitude and fix quality data.
        gsa (bool): Include Dilution of Precision data.
        gsv (bool): Include verbose GNSS satellite details.
    
    """
    cmd = 'AT%GPS'
    prefix = '%GPS:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QGNSS'
        prefix = '+QGNSS:'
    cmd += f'={stale_secs},{wait_secs}'
    if rmc:
        cmd += ',"RMC"'
    if gga:
        cmd += ',"GGA"'
    if gsa:
        cmd += ',"GSA"'
    if gsv:
        cmd += ',"GSV"'
    try:
        response = self._at_command_response(cmd, prefix, wait_secs + 5)
        return response
    except ModemAtError as exc:
        if exc.error_code != AtErrorCode.GNSS_TIMEOUT:
            raise
    return ''
def get_power_mode(self) ‑> PowerMode

Get the modem's power mode configuration.

Expand source code
def get_power_mode(self) -> PowerMode:
    """Get the modem's power mode configuration."""
    cmd = 'ATS50?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QPMD?'
        prefix = '+QPMD:'
    return PowerMode(int(self._at_command_response(cmd, prefix)))
def get_register(self, s_register_number: int) ‑> int|None

Get a modem register value.

Expand source code
def get_register(self, s_register_number: int) -> 'int|None':
    """Get a modem register value."""
    cmd = f'ATS{s_register_number}?'
    try:
        return int(self._at_command_response(cmd))
    except ValueError:
        return None
def get_rssi(self) ‑> float

Get the current Received Signal Strength Indicator.

Also referred to as SNR or C/N0 (dB-Hz)

Expand source code
def get_rssi(self) -> float:
    """Get the current Received Signal Strength Indicator.
    
    Also referred to as SNR or C/N0 (dB-Hz)
    
    """
    cmd = 'ATS90=3 S91=1 S92=1 S116?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSCN'
        prefix = '+QSCN:'
    try:
        return int(self._at_command_response(cmd, prefix)) / 100
    except ValueError:
        return 0
def get_satellite_info(self) ‑> SatelliteLocation|None

Get the satellite's information including azimuth and elevation.

Derives which satellite/GeoBeam is used from trace class 3 subclass 5.

Returns

SatelliteLocation object (azimuth, elevation) if determinable.

Expand source code
def get_satellite_info(self) -> 'SatelliteLocation|None':
    """Get the satellite's information including azimuth and elevation.
    
    Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
    
    Returns:
        `SatelliteLocation` object (azimuth, elevation) if determinable.
    
    """
    geobeam = None
    modem_location = self.get_location()
    if (modem_location is not None and
        self.get_network_status() > NetworkStatus.RX_SEARCHING):
        # satellite has been found
        cmd = 'ATS90=3 S91=5 S92=1 S102?'
        prefix = ''
        if self._mfr == Manufacturer.QUECTEL:
            cmd = 'AT+QEVNT=3,5'
            prefix = '+QEVNT:'
        response = self._at_command_response(cmd, prefix)
        if self._mfr == Manufacturer.QUECTEL:
            # workaround documentation error
            response = response.replace('+QEVENT:', '').strip()
            response = response.split(',')[9]
        geobeam = GeoBeam(int(response))
        return get_satellite_location(modem_location, geobeam)
    return None
def get_signal_quality(self) ‑> SignalQuality

Get a qualitative indicator from 0..5 of the satellite signal.

Expand source code
def get_signal_quality(self) -> SignalQuality:
    """Get a qualitative indicator from 0..5 of the satellite signal."""
    snr = self.get_rssi()
    if snr >= SignalLevelRegional.INVALID.value:
        return SignalQuality.WARNING
    if snr >= SignalLevelRegional.BARS_5.value:
        return SignalQuality.STRONG
    if snr >= SignalLevelRegional.BARS_4.value:
        return SignalQuality.GOOD
    if snr >= SignalLevelRegional.BARS_3.value:
        return SignalQuality.MID
    if snr >= SignalLevelRegional.BARS_2.value:
        return SignalQuality.LOW
    if snr >= SignalLevelRegional.BARS_1.value:
        return SignalQuality.WEAK
    return SignalQuality.NONE
def get_system_time(self) ‑> int

Get the system/GNSS time from the modem.

Expand source code
def get_system_time(self) -> int:
    """Get the system/GNSS time from the modem."""
    try:
        nimo_time = self._at_command_response('AT%UTC', '%UTC:')
        iso_time = nimo_time.replace(' ', 'T') + 'Z'
        return iso_to_ts(iso_time)
    except ModemError:
        return 0
def get_temperature(self) ‑> int|None

Get the processor temperature in Celsius.

Expand source code
def get_temperature(self) -> 'int|None':
    """Get the processor temperature in Celsius."""
    try:
        return int(int(self._at_command_response('ATS85?')) / 10)
    except ValueError:
        return None
def get_trace_event_data(self, event: tuple[int, int], decode: bool = False) ‑> list[int]|dict[str, int]

Get the trace event data.

Args

event : tuple
The trace (class, subclass)
decode : bool
Decodes raw data to dictionary (not implemented)
Expand source code
def get_trace_event_data(self,
                         event: 'tuple[int, int]',
                         decode: bool = False,
                         ) -> 'list[int]|dict[str, int]':
    """Get the trace event data.
    
    Args:
        event (tuple): The trace (class, subclass)
        decode (bool): Decodes raw data to dictionary (not implemented)
    
    """
    cmd = f'AT%EVNT={event[0]},{event[1]}'
    prefix = '%EVNT:'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = cmd.replace('%EVNT', '+QEVNT')
        prefix = '+QEVENT:'   # documented as +QEVNT
    trace = self._at_command_response(cmd, prefix)
    if decode:
        raise NotImplementedError
    return [int(i) for i in trace.split(',')]
def get_trace_event_monitor(self, asserted_only: bool = False) ‑> list[tuple[int, int]]

Get the list of monitored Trace Events.

Args

asserted_only : bool
If True returns only asserted monitored events

Returns

A list of tuples (trace_class, trace_subclass)

Raises

ModemError if unsupported by the modem type.

Expand source code
def get_trace_event_monitor(self,
                            asserted_only: bool = False,
                            ) -> 'list[tuple[int, int]]':
    """Get the list of monitored Trace Events.
    
    Args:
        asserted_only (bool): If True returns only asserted monitored events
    
    Returns:
        A list of tuples (trace_class, trace_subclass)
    
    Raises:
        `ModemError` if unsupported by the modem type.
    
    """
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT%EVMON'
    prefix = '%EVMON:'
    trace_events = []
    events = self._at_command_response(cmd, prefix).split(',')
    for event in events:
        trace_class = int(event.split('.')[0])
        trace_subclass = int(event.split('.')[1].replace('*', ''))
        if not asserted_only or event.endswith('*'):
            trace_events.append((trace_class, trace_subclass))
    return trace_events
def get_trace_events_cached(self) ‑> list[tuple[int, int]]

Get a list of trace events cached.

Expand source code
def get_trace_events_cached(self) -> 'list[tuple[int, int]]':
    """Get a list of trace events cached."""
    return self.get_trace_event_monitor(True)
def get_urc(self) ‑> UrcCode|None

Get the pending Unsolicited Result Code if one is present.

Expand source code
def get_urc(self) -> 'UrcCode|None':
    """Get the pending Unsolicited Result Code if one is present."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    eol = '\r\n' if self._modem.verbose else '\r'
    result = self._modem.read_rx_buffer(read_until=eol)
    if result:
        result = result.replace('+QURC:', '').strip()
        try:
            return UrcCode(int(result))
        except ValueError:
            return UrcCode[result]
    return None
def get_urc_ctl(self) ‑> int

Get the event list that trigger Unsolicited Report Codes.

Expand source code
def get_urc_ctl(self) -> int:
    """Get the event list that trigger Unsolicited Report Codes."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    cmd = 'AT+QURCCTL?'
    prefix = '+QURCCTL:'
    try:
        return int(self._at_command_response(cmd, prefix), 16)
    except ValueError:
        return 0
def get_wakeup_period(self) ‑> WakeupPeriod

Get the modem's wakeup period configuration.

Expand source code
def get_wakeup_period(self) -> WakeupPeriod:
    """Get the modem's wakeup period configuration."""
    cmd = 'ATS51?'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QWKUPCFG?'
        prefix = '+QWKUPCFG:'
    if self._mfr == Manufacturer.QUECTEL:
        return WakeupPeriod(int(
            self._at_command_response(cmd, prefix).split(',')[0]))
    return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
def get_wakeup_way(self) ‑> WakeupWay

Get the modem wakeup method.

Expand source code
def get_wakeup_way(self) -> WakeupWay:
    """Get the modem wakeup method."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QWKUPCFG?'
    prefix = '+QWKUPCFG:'
    wakeup_way = self._at_command_response(cmd, prefix).split(',')[1]
    return WakeupWay(int(wakeup_way))
def get_workmode(self) ‑> WorkMode

Get the modem working mode.

Expand source code
def get_workmode(self) -> WorkMode:
    """Get the modem working mode."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    cmd = 'AT+QMOD?'
    prefix = '+QMOD:'
    return WorkMode(int(self._at_command_response(cmd, prefix)))
def initialize(self, echo: bool = True, verbose: bool = True) ‑> bool

Initialize the modem AT configuration for Echo and Verbose.

Expand source code
def initialize(self,
               echo: bool = True,
               verbose: bool = True,
               ) -> bool:
    """Initialize the modem AT configuration for Echo and Verbose."""
    at_command = (f'ATZ;E{int(echo)};V{int(verbose)}')
    try:
        self._at_command_response(at_command)
        return True
    except ModemCrcConfig:
        _log.info('Attempting re-initialize with CRC enabled')
        self._at_command_response(at_command)
        return True
def is_blocked(self) ‑> bool

Indicates if line-of-sight to the satellite is blocked.

Expand source code
def is_blocked(self) -> bool:
    """Indicates if line-of-sight to the satellite is blocked."""
    return self.get_network_status() == 8
def is_connected(self) ‑> bool

Indicates if the modem is responding to a basic AT query.

Expand source code
def is_connected(self) -> bool:
    """Indicates if the modem is responding to a basic AT query."""
    try:
        self._at_command_response('AT')
        self._is_connected = True
        self._modem_booted = True
        return True
    except ModemError:
        self._is_connected = False
        self._modem_booted = False
        return False
def is_muted(self) ‑> bool

Indicates if the modem has been muted (disallowed to transmit data).

Expand source code
def is_muted(self) -> bool:
    """Indicates if the modem has been muted (disallowed to transmit data).
    """
    return self.get_network_status() == 7
def is_transmit_allowed(self) ‑> bool

Indicates if the modem is able to transmit data.

Expand source code
def is_transmit_allowed(self) -> bool:
    """Indicates if the modem is able to transmit data."""
    return self.get_network_status() == 5
def is_updating_network(self) ‑> bool

Indicates if the modem is updating network information.

The modem should not be powered down during a network update.

Expand source code
def is_updating_network(self) -> bool:
    """Indicates if the modem is updating network information.
    
    The modem should not be powered down during a network update.
    
    """
    return self.get_network_status() == 4
def power_down(self) ‑> None

Prepare the modem for power-down.

Expand source code
def power_down(self) -> None:
    """Prepare the modem for power-down."""
    cmd = 'AT%OFF'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QPOWD=2'
    self._at_command_response(cmd)
def receive_data(self, message_name: str) ‑> bytes|None

Get the raw data from a mobile-terminated message.

Expand source code
def receive_data(self, message_name: str) -> 'bytes|None':
    """Get the raw data from a mobile-terminated message."""
    message = self.get_mt_message(message_name)
    if message:
        return message.payload
    return None
def reset_factory_config(self) ‑> None

Reset the modem's factory default configuration.

Expand source code
def reset_factory_config(self) -> None:
    """Reset the modem's factory default configuration."""
    self._at_command_response('AT&F')
def retry_baudrate(self) ‑> bool
Expand source code
def retry_baudrate(self) -> bool:
    """"""
    for baud in BAUDRATES:
        self._modem.serial.baudrate = baud
        if self.is_connected():
            return True
    return False
def save_config(self) ‑> None

Store the current configuration to modem non-volatile memory.

Expand source code
def save_config(self) -> None:
    """Store the current configuration to modem non-volatile memory."""
    self._at_command_response('AT&W')
def send_data(self, data: bytes, **kwargs) ‑> str|MoMessage

Submits data to send as a mobile-originated message.

If a message_name is not supplied one will be generated using the least significant 8 digits of unix timestamp.

Args

data : bytes
The data to send.

Keyword Args: message_name (str): Optional handle for message in Tx queue. Max 8 characters for Orbcomm modem or 12 for Quectel. priority (int): Optional priority 1 (highest) .. 4 (low, default). May use MessagePriority. codec_sin (int): Optional first byte of payload to add as a codec service identifier, must be in range 16..255. codec_min (int): Optional second byte of payload to add as a codec message identifier, must be in range 0..255. return_message (bool): If set, returns a MoMessage instead of the message handle.

Returns

Message handle (str) or MoMessage if return_message kwarg is set.

Raises

ValueError for various parameter limit violations.

Expand source code
def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage':
    """Submits data to send as a mobile-originated message.
    
    If a `message_name` is not supplied one will be generated using the
    least significant 8 digits of unix timestamp.
    
    Args:
        data (bytes): The data to send.
    
    Keyword Args:
        message_name (str): Optional handle for message in Tx queue. Max 8
            characters for Orbcomm modem or 12 for Quectel.
        priority (int): Optional priority 1 (highest) .. 4 (low, default).
            May use `MessagePriority`.
        codec_sin (int): Optional first byte of payload to add as a codec
            service identifier, must be in range 16..255.
        codec_min (int): Optional second byte of payload to add as a codec
            message identifier, must be in range 0..255.
        return_message (bool): If set, returns a `MoMessage` instead of the
            message handle.
    
    Returns:
        Message handle (str) or `MoMessage` if `return_message` kwarg is set.
    
    Raises:
        `ValueError` for various parameter limit violations.
    
    """
    data_size = len(data)
    msg_payload_sin_min = b''
    message_name = kwargs.get('message_name', '')
    priority = MessagePriority(kwargs.get('priority',
                                          MessagePriority.LOW.value))
    codec_sin: int = kwargs.get('codec_sin', -1)
    codec_min: int = kwargs.get('codec_min', -1)
    if codec_sin > -1:
        data_size += 1
        msg_payload_sin_min += codec_sin.to_bytes(1, 'big')
    if codec_min > -1:
        data_size += 1
        msg_payload_sin_min += codec_min.to_bytes(1, 'big')
    if not 2 <= data_size <= MSG_MO_MAX_SIZE:
        raise ValueError('Invalid mobile-originated message size')
    if message_name and len(message_name) > self._mo_msg_name_len_max:
        raise ValueError('Message name too long')
    data_index = 0
    if codec_sin <= -1:
        codec_sin = data[0]
        data_index += 1
        data_size -= 1
    if codec_sin not in range(16, 256):
        raise ValueError('Illegal first payload byte SIN must be 16..255')
    if codec_min <= -1:
        codec_min = data[1]
        data_index += 1
        data_size -= 1
    if codec_min > 255:
        raise ValueError('Invalid second payload byte MIN must be 0..255')
    max_name_len = self._mo_msg_name_len_max
    if message_name and len(message_name) > max_name_len:
        raise ValueError(f'Invalid message name longer than {max_name_len}')
    if len(message_name) == 0:
        message_name = f'{int(time.time())}'[-max_name_len:]
    # Convert to base64 string for serial efficiency
    #   no effect on OTA size, modem always decodes and sends raw bytes OTA
    data_format = DataFormat.BASE64
    formatted_data = base64.b64encode(data[data_index:]).decode('utf-8')
    cmd = 'AT%MGRT='
    codec_sep = '.'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = 'AT+QSMGT='
        codec_sep = ','
    cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}'
           f'{codec_min},{data_format},{formatted_data}')
    self._at_command_response(cmd)
    if kwargs.get('return_message', False) is True:
        return MoMessage(message_name, priority, MessageState.TX_READY,
                            payload=(msg_payload_sin_min + data))
    return message_name
def send_text(self, text: str, **kwargs) ‑> str|MoMessage

Submits a text string to send as data.

If codec_sin kwarg is not provided 128 is prepended as the first byte. If codec_min kwarg is not provided 1 is prepended as the second byte. Other kwargs as per send_data.

Args

text : str
The text message to send.

Returns

(str) The message name assigned or MoMessage if kwarg return_message is set.

Expand source code
def send_text(self, text: str, **kwargs) -> 'str|MoMessage':
    """Submits a text string to send as data.
    
    If `codec_sin` kwarg is not provided 128 is prepended as the first byte.
    If `codec_min` kwarg is not provided 1 is prepended as the second byte.
    Other kwargs as per `send_data`.
    
    Args:
        text (str): The text message to send.
    
    Returns:
        (str) The message name assigned or MoMessage if kwarg
            `return_message` is set.
    
    """
    data = b''
    codec_sin = int(kwargs.get('codec_sin', 128))
    data += codec_sin.to_bytes(1, 'big')
    codec_min = int(kwargs.get('codec_min', 1))
    data += codec_min.to_bytes(1, 'big')
    data += text.encode()
    flowthru = ['message_name', 'priority', 'return_message']
    next_kwargs = { k:v for k, v in kwargs if k in flowthru }
    return self.send_data(data, **next_kwargs)
def set_crc(self, enable: bool = False) ‑> bool

Enable or disable CRC error checking on the modem serial port.

Expand source code
def set_crc(self, enable: bool = False) -> bool:
    """Enable or disable CRC error checking on the modem serial port."""
    try:
        self._at_command_response(f'AT%CRC={int(enable)}')
        return True
    except ModemCrcConfig:
        if ((self._modem.crc and enable) or
            (not self._modem.crc and not enable)):
            return True
        return False
def set_deepsleep_enable(self, enable: bool) ‑> None

Set the deepsleep configuration flag.

Expand source code
def set_deepsleep_enable(self, enable: bool) -> None:
    """Set the deepsleep configuration flag."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    self._at_command_response(f'AT+QSCLK={int(enable)}')
def set_event_mask(self, event_mask: int) ‑> None

Set monitored events that trigger event notification.

Expand source code
def set_event_mask(self, event_mask: int) -> None:
    """Set monitored events that trigger event notification."""
    if self._mfr != Manufacturer.ORBCOMM:
        raise ModemError('Operation not supported by this modem')
    max_bits = 12
    if not isinstance(event_mask, int) or event_mask > 2**max_bits-1:
        raise ValueError('Invalid event bitmask')
    cmd = f'ATS88={event_mask}'
    self._at_command_response(cmd)
def set_gnss_continuous(self, interval: int) ‑> None

Set the modem's GNSS continuous refresh interval in seconds.

Args

interval : int
Automatic update interval 0..30 seconds.

Returns

True if successful.

Raises

ValueError if invalid interval is specified.

Expand source code
def set_gnss_continuous(self, interval: int) -> None:
    """Set the modem's GNSS continuous refresh interval in seconds.
    
    Args:
        interval (int): Automatic update interval 0..30 seconds.
    
    Returns:
        `True` if successful.
    
    Raises:
        `ValueError` if invalid interval is specified.
    
    """
    if interval not in range (0, 31):
        raise ValueError('Invalid GNSS refresh interval')
    cmd = f'ATS55={interval}'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = f'AT+QGNSSCW={interval}'
    self._at_command_response(cmd)
def set_gnss_mode(self, gnss_mode: GnssMode) ‑> None

Get the modem's GNSS receiver mode.

Expand source code
def set_gnss_mode(self, gnss_mode: GnssMode) -> None:
    """Get the modem's GNSS receiver mode."""
    cmd = f'ATS39={gnss_mode}'
    prefix = ''
    if self._mfr == Manufacturer.QUECTEL:
        if not GnssModeQuectel.is_valid(gnss_mode):
            raise ValueError('Invalid GNSS mode')
        cmd = f'AT+QGNSSMOD={gnss_mode}'
        prefix = '+QGNSSMOD:'
    else:
        if not GnssModeOrbcomm.is_valid(gnss_mode):
            raise ValueError('Invalid GNSS mode')
    self._at_command_response(cmd, prefix)
def set_power_mode(self, power_mode: PowerMode) ‑> None

Set the modem's power mode configuration.

Expand source code
def set_power_mode(self, power_mode: PowerMode) -> None:
    """Set the modem's power mode configuration."""
    if not PowerMode.is_valid(power_mode):
        raise ValueError('Invalid Power Mode')
    cmd = f'ATS50={power_mode}'
    if self._mfr == Manufacturer.QUECTEL:
        cmd = f'AT+QPMD={power_mode}'
    self._at_command_response(cmd)
def set_register(self, s_register_number: int, value: int) ‑> None

Set a modem register value.

Expand source code
def set_register(self, s_register_number: int, value: int) -> None:
    """Set a modem register value."""
    cmd = f'ATS{s_register_number}={value}'
    self._at_command_response(cmd)
def set_trace_event_monitor(self, events: list[tuple[int, int]]) ‑> None

Set the list of monitored trace events.

Expand source code
def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None:
    """Set the list of monitored trace events."""
    cmd = 'AT%EVMON='
    for event in events:
        if not cmd.endswith('='):
            cmd += ','
        cmd += f'{event[0]}.{event[1]}'
    self._at_command_response(cmd)
def set_urc_ctl(self, qurc_mask: int) ‑> None

Set the event list that trigger Unsolicited Report Codes.

Expand source code
def set_urc_ctl(self, qurc_mask: int) -> None:
    """Set the event list that trigger Unsolicited Report Codes."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ValueError('Modem does not support this feature')
    cmd = f'AT+QURCCTL=0x{qurc_mask:04X}'
    self._at_command_response(cmd)
def set_wakeup_period(self, wakeup_period: WakeupPeriod, wakeup_way: WakeupWay|None = None)

Set the modem's wakeup period configuration.

The configuration does not update until confimed by the network.

Expand source code
def set_wakeup_period(self,
                      wakeup_period: WakeupPeriod,
                      wakeup_way: 'WakeupWay|None' = None,
                      ) -> None:
    """Set the modem's wakeup period configuration.
    
    The configuration does not update until confimed by the network.
    
    """
    if not WakeupPeriod.is_valid(wakeup_period):
        raise ValueError('Invalid wakeup period')
    cmd = f'ATS51={wakeup_period}'
    if self._mfr == Manufacturer.QUECTEL:
        if wakeup_way is None:
            query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:')
            wakeup_way = WakeupWay(int(query.split(',')[1]))
        cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}'
    self._at_command_response(cmd)
def set_workmode(self, workmode: WorkMode) ‑> None

Set the modem working mode.

Expand source code
def set_workmode(self, workmode: WorkMode) -> None:
    """Set the modem working mode."""
    if self._mfr != Manufacturer.QUECTEL:
        raise ModemError('Operation not supported by this modem')
    if not WorkMode.is_valid(workmode):
        raise ValueError('Invalid workmode')
    cmd = f'AT+QMOD={workmode}'
    self._at_command_response(cmd)
class PowerMode (value, names=None, *, module=None, qualname=None, type=None, start=1)

The Power Mode setting of the NIMO modem.

Implies various internal state machine settings for balancing power consumption against speed of recovery from line of sight blockages.

Expand source code
class PowerMode(NimoIntEnum):
    """The Power Mode setting of the NIMO modem.
    
    Implies various internal state machine settings for balancing power
    consumption against speed of recovery from line of sight blockages.
    
    """
    MOBILE_POWERED = 0
    FIXED_POWERED = 1
    MOBILE_BATTERY = 2
    FIXED_BATTERY = 3
    MOBILE_MINIMAL = 4
    MOBILE_PARKED = 5

    def gnss_refresh_hours(self):
        """The minimum GNSS refresh interval [hours]."""
        if self.value == 0:
            return 3
        if self.value == 1:
            return 24
        if self.value == 2:
            return 6
        if self.value == 3:
            return 14 * 24
        if self.value == 4:
            return 12
        if self.value == 5:
            return 24
    
    def transmit_lifetime_seconds(self):
        """The maximum duration of a message in the transmit queue [seconds]."""
        if self.value in [0, 1]:
            return 3 * 3600
        return  3 * 60
    
    def beam_search_interval_seconds(self):
        """The minimum time between background beam searches [seconds]."""
        if self.value in [0, 2, 4]:
            return 20 * 60
        return 60 * 60

    def short_term_blockage_seconds(self):
        """The time in blockage before initiating a beam search [seconds]."""
        if self.value in [0, 1]:
            return 5 * 60
        if self.value in [2, 3, 4]:
            return 20 * 60
        return 15 * 60
    
    def beam_search_maximum_seconds(self):
        """The maximum backoff interval between searches when blocked [seconds].
        """
        if self.value in [0, 1]:
            return 0
        return 1600 * 60

Ancestors

Class variables

var FIXED_BATTERY
var FIXED_POWERED
var MOBILE_BATTERY
var MOBILE_MINIMAL
var MOBILE_PARKED
var MOBILE_POWERED

Methods

def beam_search_interval_seconds(self)

The minimum time between background beam searches [seconds].

Expand source code
def beam_search_interval_seconds(self):
    """The minimum time between background beam searches [seconds]."""
    if self.value in [0, 2, 4]:
        return 20 * 60
    return 60 * 60
def beam_search_maximum_seconds(self)

The maximum backoff interval between searches when blocked [seconds].

Expand source code
def beam_search_maximum_seconds(self):
    """The maximum backoff interval between searches when blocked [seconds].
    """
    if self.value in [0, 1]:
        return 0
    return 1600 * 60
def gnss_refresh_hours(self)

The minimum GNSS refresh interval [hours].

Expand source code
def gnss_refresh_hours(self):
    """The minimum GNSS refresh interval [hours]."""
    if self.value == 0:
        return 3
    if self.value == 1:
        return 24
    if self.value == 2:
        return 6
    if self.value == 3:
        return 14 * 24
    if self.value == 4:
        return 12
    if self.value == 5:
        return 24
def short_term_blockage_seconds(self)

The time in blockage before initiating a beam search [seconds].

Expand source code
def short_term_blockage_seconds(self):
    """The time in blockage before initiating a beam search [seconds]."""
    if self.value in [0, 1]:
        return 5 * 60
    if self.value in [2, 3, 4]:
        return 20 * 60
    return 15 * 60
def transmit_lifetime_seconds(self)

The maximum duration of a message in the transmit queue [seconds].

Expand source code
def transmit_lifetime_seconds(self):
    """The maximum duration of a message in the transmit queue [seconds]."""
    if self.value in [0, 1]:
        return 3 * 3600
    return  3 * 60

Inherited members

class SatelliteLocation (name: str = '', latitude: float = 0.0, longitude: float = 180.0, altitude: float = 35786000, azimuth: float = 0.0, elevation: float = 0.0, geobeam: GeoBeam|None = None)

Represents a geostationary satellite location relative to a modem.

Expand source code
@dataclass
class SatelliteLocation:
    """Represents a geostationary satellite location relative to a modem."""
    name: str = ''
    latitude: float = 0.0
    longitude: float = 180.0
    altitude: float = GEOSTATIONARY_DISTANCE_M
    azimuth: float = 0.0
    elevation: float = 0.0
    geobeam: 'GeoBeam|None' = None

Class variables

var altitude : float
var azimuth : float
var elevation : float
var geobeamGeoBeam|None
var latitude : float
var longitude : float
var name : str
class SignalQuality (value, names=None, *, module=None, qualname=None, type=None, start=1)

Qualitative descriptor corresponding to a SignalLevel

Expand source code
class SignalQuality(NimoIntEnum):
    """Qualitative descriptor corresponding to a SignalLevel"""
    NONE = 0
    WEAK = 1
    LOW = 2
    MID = 3
    GOOD = 4
    STRONG = 5
    WARNING = 6

Ancestors

Class variables

var GOOD
var LOW
var MID
var NONE
var STRONG
var WARNING
var WEAK

Inherited members

class UrcCode (value, names=None, *, module=None, qualname=None, type=None, start=1)

Quectel URC code map.

Expand source code
class UrcCode(NimoIntEnum):
    """Quectel URC code map."""
    GPS_FIX = 0
    RX_END = 1
    TX_END = 2
    REGED = 3
    ITV_CHG = 4
    TIME_UPD = 5
    GPS_TMO = 6
    PLG_RESP = 7

Ancestors

Class variables

var GPS_FIX
var GPS_TMO
var ITV_CHG
var PLG_RESP
var REGED
var RX_END
var TIME_UPD
var TX_END

Inherited members

class UrcControl (value, names=None, *, module=None, qualname=None, type=None, start=1)

Control bits for Quectel Unsolicited Response Codes.

Expand source code
class UrcControl(IntFlag):
    """Control bits for Quectel Unsolicited Response Codes."""
    GNSS_FIX_NEW =              0b00000001
    MESSAGE_MT_RECEIVED =       0b00000010
    MESSAGE_MO_COMPLETE =       0b00000100
    NETWORK_REGISTERED =        0b00001000
    WAKEUP_PERIOD_CHANGE =      0b00010000
    UTC_TIME_SYNC =             0b00100000
    GNSS_FIX_TIMEOUT =          0b01000000
    NETWORK_PING_ACKNOWLEDGED = 0b10000000
    
    @classmethod
    def get_events(cls, event_mask: int) -> 'list[EventNotification]':
        """Parses a bitmask to return a list of events."""
        return [item for item in cls if item.value & event_mask]

Ancestors

  • enum.IntFlag
  • builtins.int
  • enum.Flag
  • enum.Enum

Class variables

var GNSS_FIX_NEW
var GNSS_FIX_TIMEOUT
var MESSAGE_MO_COMPLETE
var MESSAGE_MT_RECEIVED
var NETWORK_PING_ACKNOWLEDGED
var NETWORK_REGISTERED
var UTC_TIME_SYNC
var WAKEUP_PERIOD_CHANGE

Static methods

def get_events(event_mask: int) ‑> list[EventNotification]

Parses a bitmask to return a list of events.

Expand source code
@classmethod
def get_events(cls, event_mask: int) -> 'list[EventNotification]':
    """Parses a bitmask to return a list of events."""
    return [item for item in cls if item.value & event_mask]
class WakeupPeriod (value, names=None, *, module=None, qualname=None, type=None, start=1)

The Wakeup Period setting of a NIMO modem.

Determines how often the modem wakes up to listen briefly for potential mobile-terminated messages to be delivered by the network.

Expand source code
class WakeupPeriod(NimoIntEnum):
    """The Wakeup Period setting of a NIMO modem.
    
    Determines how often the modem wakes up to listen briefly for potential
    mobile-terminated messages to be delivered by the network.
    
    """
    NONE = 0   # 5 seconds
    SECONDS_30 = 1
    MINUTES_1 = 2
    MINUTES_3 = 3
    MINUTES_10 = 4
    MINUTES_30 = 5
    MINUTES_60 = 6
    MINUTES_2 = 7
    MINUTES_5 = 8
    MINUTES_15 = 9
    MINUTES_20 = 10

    def seconds(self):
        if self.name == 'NONE':
            return 5
        value = int(self.name.split('_'))[1]
        if self.name.startswith('MINUTES'):
            return value * 60
        return value

Ancestors

Class variables

var MINUTES_1
var MINUTES_10
var MINUTES_15
var MINUTES_2
var MINUTES_20
var MINUTES_3
var MINUTES_30
var MINUTES_5
var MINUTES_60
var NONE
var SECONDS_30

Methods

def seconds(self)
Expand source code
def seconds(self):
    if self.name == 'NONE':
        return 5
    value = int(self.name.split('_'))[1]
    if self.name.startswith('MINUTES'):
        return value * 60
    return value

Inherited members

class WakeupWay (value, names=None, *, module=None, qualname=None, type=None, start=1)

Quectel CC200A-LB wakeup methods.

Expand source code
class WakeupWay(NimoIntEnum):
    """Quectel CC200A-LB wakeup methods."""
    WAKEUP_PIN = 0
    UART = 1

Ancestors

Class variables

var UART
var WAKEUP_PIN

Inherited members

class WorkMode (value, names=None, *, module=None, qualname=None, type=None, start=1)

Quectel CC200A-LB working modes.

Expand source code
class WorkMode(NimoIntEnum):
    """Quectel CC200A-LB working modes."""
    WORKING = 1
    GNSS = 2
    PERIODIC_SLEEP = 3

Ancestors

Class variables

var GNSS
var PERIODIC_SLEEP
var WORKING

Inherited members