Package pynimomodem
Library to interface with a Viasat-approved NIMO modem for satellite IoT.
This library abstracts various low-level AT command operations useful for interacting with a NIMO modem to send and receive data, check network status and get location-based information.
Most get
methods will raise a ModemAtError
if a valid response is not
received to a command/query.
ModemTimeout
will be raised if no response is received to a command within
the default or specified timeout.
AT command errors will raise ModemAtError
with a property error_code
to
provide further details with the AtErrorCode
.
Expand source code
"""Library to interface with a Viasat-approved NIMO modem for satellite IoT.
This library abstracts various low-level AT command operations useful for
interacting with a NIMO modem to send and receive data, check network status
and get location-based information.
Most `get` methods will raise a `ModemAtError` if a valid response is not
received to a command/query.
`ModemTimeout` will be raised if no response is received to a command within
the default or specified timeout.
AT command errors will raise `ModemAtError` with a property `error_code` to
provide further details with the `AtErrorCode`.
"""
from .constants import (
AtErrorCode,
BeamState,
ControlState,
DataFormat,
EventNotification,
GeoBeam,
GeoSatellite,
GnssMode,
GnssModeOrbcomm,
GnssModeQuectel,
MessagePriority,
MessageState,
NetworkStatus,
PowerMode,
SignalQuality,
UrcCode,
UrcControl,
WakeupPeriod,
WakeupWay,
WorkMode,
)
from .modem import (
Manufacturer,
ModemLocation,
MoMessage,
MtMessage,
NimoModem,
ModemAtError,
ModemCrcConfig,
ModemCrc,
ModemError,
ModemTimeout,
AcquisitionInfo,
SatelliteLocation,
)
__all__ = [
'AtErrorCode',
'BeamState',
'ControlState',
'DataFormat',
'GeoBeam',
'GeoSatellite',
'GnssMode',
'GnssModeOrbcomm',
'GnssModeQuectel',
'Manufacturer',
'MessagePriority',
'MessageState',
'ModemLocation',
'MoMessage',
'MtMessage',
'NetworkStatus',
'NimoModem',
'ModemAtError',
'ModemCrcConfig',
'ModemCrc',
'ModemError',
'ModemTimeout',
'PowerMode',
'AcquisitionInfo',
'SatelliteLocation',
'SignalQuality',
'WakeupPeriod',
'WakeupWay',
'WorkMode',
'UrcCode',
'UrcControl',
'EventNotification',
]
Sub-modules
pynimomodem.atcommandbuffer
-
AT command buffer parsing for a NIMO modem …
pynimomodem.constants
-
NIMO modem constants …
pynimomodem.crcxmodem
-
Implementation of CCIT-16-CRC for use with NIMO modems …
pynimomodem.location
-
Classes and methods for location, elevation and azimuth for NIMO modems …
pynimomodem.message
-
Class and methods for managing messages submitted/retrieved to a NIMO modem …
pynimomodem.modem
-
Class for a Non-IP Modem using Orbcomm network protocols …
pynimomodem.nimoutils
-
Various utilities/helpers for NIMO modem interaction and debugging.
pynimomodem.s_registers
-
IDP modem S-register definitions …
Classes
class AcquisitionInfo (ctrl_state: ControlState = ControlState.STOPPED, beam_state: BeamState = BeamState.IDLE, rssi: float = 0.0, vcid: int = 0)
-
Details about the satellite acquisition state of the modem.
Attributes
ctrl_state
:ControlState
- Primary network acquisition state.
beam_state
:BeamState
- Secondary beam acquistion state.
rssi
:float
- Signal indicator Carrier to Noise ratio (dB-Hz).
vcid
:int
- Virtual carrier identifier for low-level sanity check.
Expand source code
@dataclass class AcquisitionInfo: """Details about the satellite acquisition state of the modem. Attributes: ctrl_state (ControlState): Primary network acquisition state. beam_state (BeamState): Secondary beam acquistion state. rssi (float): Signal indicator Carrier to Noise ratio (dB-Hz). vcid (int): Virtual carrier identifier for low-level sanity check. """ ctrl_state: ControlState = ControlState.STOPPED beam_state: BeamState = BeamState.IDLE rssi: float = 0.0 vcid: int = 0
Class variables
var beam_state : BeamState
var ctrl_state : ControlState
var rssi : float
var vcid : int
class AtErrorCode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
AT command error codes for NIMO modems.
Expand source code
class AtErrorCode(NimoIntEnum): """AT command error codes for NIMO modems.""" # Standard / documented OK = 0 ERROR = 4 INVALID_CRC = 100 UNKNOWN_COMMAND = 101 INVALID_PARAMETER = 102 MESSAGE_LENGTH_MISMATCH = 103 RESERVED_104 = 104 SYSTEM_ERROR = 105 TX_QUEUE_FULL = 106 DUPLICATE_NAME = 107 GNSS_TIMEOUT = 108 MESSAGE_UNAVAILABLE = 109 RESERVED_110 = 110 RESERVED_111 = 111 READ_ONLY_PARAMETER = 112 # Extensions for additional situations TIMEOUT = 255 CRC_CONFIG_MISMATCH = 254 UNABLE_TO_DELETE = 253 INVALID_RESPONSE_CRC = 252
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var CRC_CONFIG_MISMATCH
var DUPLICATE_NAME
var ERROR
var GNSS_TIMEOUT
var INVALID_CRC
var INVALID_PARAMETER
var INVALID_RESPONSE_CRC
var MESSAGE_LENGTH_MISMATCH
var MESSAGE_UNAVAILABLE
var OK
var READ_ONLY_PARAMETER
var RESERVED_104
var RESERVED_110
var RESERVED_111
var SYSTEM_ERROR
var TIMEOUT
var TX_QUEUE_FULL
var UNABLE_TO_DELETE
var UNKNOWN_COMMAND
Inherited members
class BeamState (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
States of the NIMO modem satellite beam internal selection process.
Expand source code
class BeamState(NimoIntEnum): """States of the NIMO modem satellite beam internal selection process.""" IDLE = 0 SEARCH_ANY_TRAFFIC = 1 SEARCH_LAST_TRAFFIC = 2 RESERVED = 3 SEARCH_NEW_TRAFFIC = 4 SEARCH_BULLETIN_BOARD = 5 DELAY_TRAFFIC_SEARCH = 6
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var DELAY_TRAFFIC_SEARCH
var IDLE
var RESERVED
var SEARCH_ANY_TRAFFIC
var SEARCH_BULLETIN_BOARD
var SEARCH_LAST_TRAFFIC
var SEARCH_NEW_TRAFFIC
Inherited members
class ControlState (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
States of the NIMO modem internal network acquisition process.
Expand source code
class ControlState(NimoIntEnum): """States of the NIMO modem internal network acquisition process.""" STOPPED = 0 GNSS_WAIT = 1 SEARCH_START = 2 BEAM_SEARCH = 3 BEAM_FOUND = 4 BEAM_ACQUIRED = 5 BEAM_SWITCH = 6 REGISTERING = 7 RECEIVE_ONLY = 8 BB_DOWNLOAD = 9 ACTIVE = 10 BLOCKED = 11 CONFIRM_PREVIOUS_BEAM = 12 CONFIRM_REQUESTED_BEAM = 13 CONNECT_CONFIRMED_BEAM = 14
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var ACTIVE
var BB_DOWNLOAD
var BEAM_ACQUIRED
var BEAM_FOUND
var BEAM_SEARCH
var BEAM_SWITCH
var BLOCKED
var CONFIRM_PREVIOUS_BEAM
var CONFIRM_REQUESTED_BEAM
var CONNECT_CONFIRMED_BEAM
var GNSS_WAIT
var RECEIVE_ONLY
var REGISTERING
var SEARCH_START
var STOPPED
Inherited members
class DataFormat (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Data formats used for submitting or extracting message data/payload.
Expand source code
class DataFormat(NimoIntEnum): """Data formats used for submitting or extracting message data/payload.""" TEXT = 1 HEX = 2 BASE64 = 3
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var BASE64
var HEX
var TEXT
Inherited members
class EventNotification (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Bitmask enumerated values for NIMO modem events.
Expand source code
class EventNotification(IntFlag): """Bitmask enumerated values for NIMO modem events.""" GNSS_FIX_NEW = 0b000000000001 MESSAGE_MT_RECEIVED = 0b000000000010 MESSAGE_MO_COMPLETE = 0b000000000100 NETWORK_REGISTERED = 0b000000001000 MODEM_RESET_COMPLETE = 0b000000010000 JAMMING_ANTENNA_CHANGE = 0b000000100000 MODEM_RESET_PENDING = 0b000001000000 WAKEUP_PERIOD_CHANGE = 0b000010000000 UTC_TIME_SYNC = 0b000100000000 GNSS_FIX_TIMEOUT = 0b001000000000 EVENT_TRACE_CACHED = 0b010000000000 NETWORK_PING_ACKNOWLEDGED = 0b100000000000 @classmethod def get_events(cls, event_mask: int) -> 'list[EventNotification]': """Parses a bitmask to return a list of events.""" return [item for item in cls if item.value & event_mask]
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var EVENT_TRACE_CACHED
var GNSS_FIX_NEW
var GNSS_FIX_TIMEOUT
var JAMMING_ANTENNA_CHANGE
var MESSAGE_MO_COMPLETE
var MESSAGE_MT_RECEIVED
var MODEM_RESET_COMPLETE
var MODEM_RESET_PENDING
var NETWORK_PING_ACKNOWLEDGED
var NETWORK_REGISTERED
var UTC_TIME_SYNC
var WAKEUP_PERIOD_CHANGE
Static methods
def get_events(event_mask: int) ‑> list[EventNotification]
-
Parses a bitmask to return a list of events.
Expand source code
@classmethod def get_events(cls, event_mask: int) -> 'list[EventNotification]': """Parses a bitmask to return a list of events.""" return [item for item in cls if item.value & event_mask]
class GeoBeam (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Geographic Beam identifiers mapped to readable names.
Expand source code
class GeoBeam(NimoIntEnum): """Geographic Beam identifiers mapped to readable names.""" GLOBAL_BB = 0 AMER_RB1 = 1 AMER_RB2 = 2 AMER_RB3 = 3 AMER_RB4 = 4 AMER_RB5 = 5 AMER_RB6 = 6 AMER_RB7 = 7 AMER_RB8 = 8 AMER_RB9 = 9 AMER_RB10 = 10 AMER_RB11 = 11 AMER_RB12 = 12 AMER_RB13 = 13 AMER_RB14 = 14 AMER_RB15 = 15 AMER_RB16 = 16 AMER_RB17 = 17 AMER_RB18 = 18 AMER_RB19 = 19 AORW_SC = 61 EMEA_RB1 = 21 EMEA_RB2 = 22 EMEA_RB3 = 23 EMEA_RB4 = 24 EMEA_RB5 = 25 EMEA_RB6 = 26 EMEA_RB7 = 27 EMEA_RB8 = 28 EMEA_RB9 = 29 EMEA_RB10 = 30 EMEA_RB11 = 31 EMEA_RB12 = 32 EMEA_RB13 = 33 EMEA_RB14 = 34 EMEA_RB15 = 35 EMEA_RB16 = 36 EMEA_RB17 = 37 EMEA_RB18 = 38 EMEA_RB19 = 39 APAC_RB1 = 41 APAC_RB2 = 42 APAC_RB3 = 43 APAC_RB4 = 44 APAC_RB5 = 45 APAC_RB6 = 46 APAC_RB7 = 47 APAC_RB8 = 48 APAC_RB9 = 49 APAC_RB10 = 50 APAC_RB11 = 51 APAC_RB12 = 52 APAC_RB13 = 53 APAC_RB14 = 54 APAC_RB15 = 55 APAC_RB16 = 56 APAC_RB17 = 57 APAC_RB18 = 58 APAC_RB19 = 59 # MEAS_RB10 = 90 # replaced by IOE # MEAS_RB11 = 91 # replaced by IOE # MEAS_RB12 = 92 # replaced by IOE # MEAS_RB15 = 93 # replaced by IOE IOE_RB1 = 101 IOE_RB2 = 102 IOE_RB3 = 103 IOE_RB4 = 104 IOE_RB5 = 105 IOE_RB6 = 106 IOE_RB7 = 107 IOE_RB8 = 108 IOE_RB9 = 109 IOE_RB10 = 110 IOE_RB11 = 111 IOE_RB12 = 112 IOE_RB13 = 113 IOE_RB14 = 114 IOE_RB15 = 115 IOE_RB16 = 116 IOE_RB17 = 117 IOE_RB18 = 118 IOE_RB19 = 119 @property def satellite(self): return self.name.split('_')[0] @property def beam(self): return self.name.split('_')[1] @property def id(self): return self.value
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var AMER_RB1
var AMER_RB10
var AMER_RB11
var AMER_RB12
var AMER_RB13
var AMER_RB14
var AMER_RB15
var AMER_RB16
var AMER_RB17
var AMER_RB18
var AMER_RB19
var AMER_RB2
var AMER_RB3
var AMER_RB4
var AMER_RB5
var AMER_RB6
var AMER_RB7
var AMER_RB8
var AMER_RB9
var AORW_SC
var APAC_RB1
var APAC_RB10
var APAC_RB11
var APAC_RB12
var APAC_RB13
var APAC_RB14
var APAC_RB15
var APAC_RB16
var APAC_RB17
var APAC_RB18
var APAC_RB19
var APAC_RB2
var APAC_RB3
var APAC_RB4
var APAC_RB5
var APAC_RB6
var APAC_RB7
var APAC_RB8
var APAC_RB9
var EMEA_RB1
var EMEA_RB10
var EMEA_RB11
var EMEA_RB12
var EMEA_RB13
var EMEA_RB14
var EMEA_RB15
var EMEA_RB16
var EMEA_RB17
var EMEA_RB18
var EMEA_RB19
var EMEA_RB2
var EMEA_RB3
var EMEA_RB4
var EMEA_RB5
var EMEA_RB6
var EMEA_RB7
var EMEA_RB8
var EMEA_RB9
var GLOBAL_BB
var IOE_RB1
var IOE_RB10
var IOE_RB11
var IOE_RB12
var IOE_RB13
var IOE_RB14
var IOE_RB15
var IOE_RB16
var IOE_RB17
var IOE_RB18
var IOE_RB19
var IOE_RB2
var IOE_RB3
var IOE_RB4
var IOE_RB5
var IOE_RB6
var IOE_RB7
var IOE_RB8
var IOE_RB9
Instance variables
var beam
-
Expand source code
@property def beam(self): return self.name.split('_')[1]
var id
-
Expand source code
@property def id(self): return self.value
var satellite
-
Expand source code
@property def satellite(self): return self.name.split('_')[0]
Inherited members
class GeoSatellite (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Maps the Viasat/Inmarsat geostationary longitude supporting NIMO.
Expand source code
class GeoSatellite(NimoFloatEnum): """Maps the Viasat/Inmarsat geostationary longitude supporting NIMO.""" AMER = -98.0 # Inmarsat 4F3 AORWSC = -54.0 # Inmarsat 3F5 EMEA = 24.9 # Inmarsat 4AF4 aka Alphasat XL IOE = 63.5 # Inmarsat 6F1 previously IOR 3F1, MEAS 4F2 APAC = 143.5 # Inmarsat 4F2 previously 4F1
Ancestors
- NimoFloatEnum
- enum.Enum
Class variables
var AMER
var AORWSC
var APAC
var EMEA
var IOE
Inherited members
class GnssMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Base class for manufacturer-specific variants.
Expand source code
class GnssMode(NimoIntEnum): """Base class for manufacturer-specific variants."""
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Subclasses
Inherited members
class GnssModeOrbcomm (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The operating mode setting for the built-in GNSS in a NIMO modem.
Expand source code
class GnssModeOrbcomm(GnssMode): """The operating mode setting for the built-in GNSS in a NIMO modem.""" GPS = 0 GLONASS = 1 BEIDOU = 2 GALILEO = 3 GPS_GLONASS = 10 GPS_BEIDOU = 11 GLONASS_BEIDOU = 12 GPS_GALILEO = 13 GLONASS_GALILEO = 14 BEIDOU_GALILEO = 15
Ancestors
- GnssMode
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var BEIDOU
var BEIDOU_GALILEO
var GALILEO
var GLONASS
var GLONASS_BEIDOU
var GLONASS_GALILEO
var GPS
var GPS_BEIDOU
var GPS_GALILEO
var GPS_GLONASS
Inherited members
class GnssModeQuectel (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
An enumeration.
Expand source code
class GnssModeQuectel(GnssMode): GPS = 0 GPS_BDS = 1 GPS_GLONASS = 2 GPS_GALILEO = 3 GPS_GLONASS_GALILEO_BDS = 4
Ancestors
- GnssMode
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var GPS
var GPS_BDS
var GPS_GALILEO
var GPS_GLONASS
var GPS_GLONASS_GALILEO_BDS
Inherited members
class Manufacturer (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Supported NIMO modem implementations.
Expand source code
class Manufacturer(IntEnum): """Supported NIMO modem implementations.""" NONE = 0 ORBCOMM = 1 QUECTEL = 2
Ancestors
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var NONE
var ORBCOMM
var QUECTEL
class MessagePriority (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Message priorities for NIMO modem messages.
Expand source code
class MessagePriority(NimoIntEnum): """Message priorities for NIMO modem messages.""" NONE = 0 HIGH = 1 MEDH = 2 MEDL = 3 LOW = 4
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var HIGH
var LOW
var MEDH
var MEDL
var NONE
Inherited members
class MessageState (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Message states of NIMO modem messages.
Expand source code
class MessageState(NimoIntEnum): """Message states of NIMO modem messages.""" UNAVAILABLE = 0 RX_PENDING = 1 RX_COMPLETE = 2 RX_RETRIEVED = 3 TX_READY = 4 TX_SENDING = 5 TX_COMPLETE = 6 TX_FAILED = 7 TX_CANCELLED = 8
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var RX_COMPLETE
var RX_PENDING
var RX_RETRIEVED
var TX_CANCELLED
var TX_COMPLETE
var TX_FAILED
var TX_READY
var TX_SENDING
var UNAVAILABLE
Inherited members
class MoMessage (name: str = '', priority: MessagePriority = MessagePriority.NONE, state: MessageState = MessageState.UNAVAILABLE, length: int = 0, bytes_delivered: int = 0, payload: bytes = b'')
-
A Mobile-Originated Message.
Expand source code
class MoMessage(NimoMessage): """A Mobile-Originated Message.""" @property def name(self) -> str: return self._message_name @name.setter def name(self, message_name: str): msg_mo_name_max_len = max(MSG_MO_NAME_MAX_LEN, MSG_MO_NAME_QMAX_LEN) if (not isinstance(message_name, str) or not 0 < len(message_name) <= msg_mo_name_max_len): raise ValueError('Invalid message name') self._message_name = message_name
Ancestors
Instance variables
var name : str
-
Expand source code
@property def name(self) -> str: return self._message_name
class ModemAtError (*args, **kwargs)
-
AT command related errors with error_code property.
Expand source code
class ModemAtError(ModemError): """AT command related errors with error_code property.""" @property def error_code(self) -> AtErrorCode: return AtErrorCode[self.args[0]]
Ancestors
- ModemError
- builtins.Exception
- builtins.BaseException
Instance variables
var error_code : AtErrorCode
-
Expand source code
@property def error_code(self) -> AtErrorCode: return AtErrorCode[self.args[0]]
class ModemCrc (*args, **kwargs)
-
Error calculating response CRC.
Expand source code
class ModemCrc(ModemError): """Error calculating response CRC."""
Ancestors
- ModemError
- builtins.Exception
- builtins.BaseException
class ModemCrcConfig (*args, **kwargs)
-
Request/response mismatch of CRC presence.
Expand source code
class ModemCrcConfig(ModemError): """Request/response mismatch of CRC presence."""
Ancestors
- ModemError
- builtins.Exception
- builtins.BaseException
class ModemError (*args, **kwargs)
-
Base class for NIMO modem errors.
Expand source code
class ModemError(Exception): """Base class for NIMO modem errors."""
Ancestors
- builtins.Exception
- builtins.BaseException
Subclasses
class ModemLocation (**kwargs)
-
A set of location-based information derived from the modem's NMEA data.
Uses 90.0/180.0 if latitude/longitude are unknown
Attributes
latitude
:float
- decimal degrees
longitude
:float
- decimal degrees
altitude
:float
- in metres
speed
:float
- in knots
heading
:float
- in degrees
timestamp
:int
- in seconds since 1970-01-01T00:00:00Z
satellites
:int
- in view at time of fix
fix_type
:GnssFixType
- 1=None, 2=2D or 3=3D
fix_quality
:GnssFixQuality
- Enumerated lookup value
pdop
:float
- Probability Dilution of Precision
hdop
:float
- Horizontal Dilution of Precision
vdop
:float
- Vertical Dilution of Precision
time_iso
:str
- ISO 8601 formatted timestamp
Initializes a Location with default latitude/longitude 90/180.
Expand source code
class ModemLocation: """A set of location-based information derived from the modem's NMEA data. Uses 90.0/180.0 if latitude/longitude are unknown Attributes: latitude (float): decimal degrees longitude (float): decimal degrees altitude (float): in metres speed (float): in knots heading (float): in degrees timestamp (int): in seconds since 1970-01-01T00:00:00Z satellites (int): in view at time of fix fix_type (GnssFixType): 1=None, 2=2D or 3=3D fix_quality (GnssFixQuality): Enumerated lookup value pdop (float): Probability Dilution of Precision hdop (float): Horizontal Dilution of Precision vdop (float): Vertical Dilution of Precision time_iso (str): ISO 8601 formatted timestamp """ def __init__(self, **kwargs): """Initializes a Location with default latitude/longitude 90/180.""" self.latitude = float(kwargs.get('latitude', 90.0)) self.longitude = float(kwargs.get('longitude', 180.0)) self.altitude = float(kwargs.get('altitude', 0.0)) # metres self.speed = float(kwargs.get('speed', 0.0)) # knots self.heading = float(kwargs.get('heading', 0.0)) # degrees self.timestamp = int(kwargs.get('timestamp', 0)) # seconds (unix) self.satellites = int(kwargs.get('satellites', 0)) self.fix_type = GnssFixType(int(kwargs.get('fix_type', 1))) self.fix_quality = GnssFixQuality(int(kwargs.get('fix_quality', 0))) self.pdop = float(kwargs.get('pdop', 99)) self.hdop = float(kwargs.get('hdop', 99)) self.vdop = float(kwargs.get('vdop', 99)) # self.satellites_info: 'list[GnssSatelliteInfo]' = kwargs.get( # 'satellites_info', [] # ) @property def time_iso(self) -> str: return f'{ts_to_iso(self.timestamp)}' # def _update_satellites_info(self, # satellites_info: 'list[GnssSatelliteInfo]'): # """Populates satellite information based on NMEA GSV data.""" # for satellite_info in satellites_info: # if isinstance(satellite_info, GnssSatelliteInfo): # new = True # for i, info in enumerate(self.satellites_info): # if info.prn == satellite_info.prn: # new = False # self.satellites_info[i] = satellite_info # break # if new: # self.satellites_info.append(satellite_info) def __repr__(self) -> str: obj = deepcopy(self.__dict__) for k, v in obj.items(): if k in ['latitude', 'longitude']: obj[k] = round(v, 5) elif isinstance(v, float): obj[k] = round(v, 1) return json.dumps(obj, skipkeys=True)
Instance variables
var time_iso : str
-
Expand source code
@property def time_iso(self) -> str: return f'{ts_to_iso(self.timestamp)}'
class ModemTimeout (*args, **kwargs)
-
Serial response timeout.
Expand source code
class ModemTimeout(ModemError): """Serial response timeout."""
Ancestors
- ModemError
- builtins.Exception
- builtins.BaseException
class MtMessage (name: str = '', priority: MessagePriority = MessagePriority.NONE, state: MessageState = MessageState.UNAVAILABLE, length: int = 0, bytes_delivered: int = 0, payload: bytes = b'')
-
A Mobile-Terminated message.
Expand source code
class MtMessage(NimoMessage): """A Mobile-Terminated message.""" @property def bytes_delivered(self) -> int: if self._bytes_delivered < self.length: # bytes delivered not updated during parsing - update self._bytes_delivered = self.length return self._bytes_delivered @bytes_delivered.setter def bytes_delivered(self, value: int): if not isinstance(value, int) or value < 0: raise ValueError('Invalid bytes delivered') if value > self.length: _log.error('Bytes delivered mismatch with message length') return self._bytes_delivered = value
Ancestors
Instance variables
var bytes_delivered : int
-
Expand source code
@property def bytes_delivered(self) -> int: if self._bytes_delivered < self.length: # bytes delivered not updated during parsing - update self._bytes_delivered = self.length return self._bytes_delivered
class NetworkStatus (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Simplified state of network acquisition and tracking.
Expand source code
class NetworkStatus(NimoIntEnum): """Simplified state of network acquisition and tracking.""" UNKNOWN = 0 RX_STOPPED = 1 RX_SEARCHING = 2 RX_ACQUIRING = 3 RX_ONLY_NOT_REGISTERED = 4 OK = 5 SUSPENDED = 6 MUTED = 7 BLOCKED = 8
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var BLOCKED
var MUTED
var OK
var RX_ACQUIRING
var RX_ONLY_NOT_REGISTERED
var RX_SEARCHING
var RX_STOPPED
var SUSPENDED
var UNKNOWN
Inherited members
class NimoModem (serial_port: str, **kwargs)
-
A class for NIMO satellite IoT modem interaction.
Instantiate the NimoModem object.
For additional kwargs see PySerial API: https://pyserial.readthedocs.io/en/latest/pyserial_api.html
Args
serial_port
:str
- The path to the modem's serial port.
Keyword Args: baudrate (int): The baud rate of the modem (default 9600)
Raises
ConnectionError
if unable to connect to the serial port.Expand source code
class NimoModem: """A class for NIMO satellite IoT modem interaction.""" # __slots__ = ('_modem', '_mobile_id', # '_mfr_code', '_modem_booted', # ) def __init__(self, serial_port: str, **kwargs) -> None: """Instantiate the NimoModem object. For additional kwargs see PySerial API: https://pyserial.readthedocs.io/en/latest/pyserial_api.html Args: serial_port (str): The path to the modem's serial port. Keyword Args: baudrate (int): The baud rate of the modem (default 9600) Raises: `ConnectionError` if unable to connect to the serial port. """ try: self._serial = Serial(serial_port, **kwargs) except Exception as exc: raise ConnectionError('Unable to connect to serial') from exc self._modem: AtCommandBuffer = AtCommandBuffer(self._serial) self._baudrate: int = self._serial.baudrate self._is_connected: bool = False self._modem_booted: bool = False self._mobile_id: str = '' self._manufacturer: Manufacturer = Manufacturer.NONE @property def is_ready(self) -> bool: return not self._modem._lock.locked() @property def crc_enabled(self) -> bool: return self._modem.crc @property def modem_booted(self) -> bool: return self._modem_booted @property def _mfr(self) -> Manufacturer: """Used internally to support different manufacturer commands.""" if not self._manufacturer: self.get_manufacturer() _log.debug('Using %s manufacturer commands', self._manufacturer) return self._manufacturer @property def _mo_msg_name_len_max(self) -> int: """Used internally to restrict the length of the MO message name.""" maxlen = MSG_MO_NAME_MAX_LEN if self._mfr == Manufacturer.QUECTEL: maxlen = MSG_MO_NAME_QMAX_LEN return maxlen def _at_command_response(self, command: str, prefix: str = '', timeout: int = DEFAULT_AT_TIMEOUT) -> str: """Send a command and return the response. Blocks until response has been received. Args: command (str): The AT command to send. prefix (str): Optional prefix to remove from response. timeout (int): Maximum time in seconds to wait for response. Raises: `ModemTimeout` if no response is received. `ModemCrcConfig` if CRC is not used on both request/response. `ModemAtError` for other cases of errored response. """ self._modem.send_at_command(command) err = self._modem.read_at_response(prefix, timeout) if err == AtErrorCode.OK: return self._modem.get_response() elif err == AtErrorCode.TIMEOUT: raise ModemTimeout(f'AT response timed out after {timeout} seconds') elif err == AtErrorCode.CRC_CONFIG_MISMATCH: raise ModemCrcConfig('Reponse checksum {}expected'.format( '' if self.crc_enabled else 'un')) elif err == AtErrorCode.INVALID_RESPONSE_CRC: raise ModemCrc(err.name) else: err = self.get_last_error_code() if err == AtErrorCode.INVALID_CRC: raise ModemCrc(err.name) raise ModemAtError(err.name) def connect(self) -> None: """Attach to the modem via serial communications. Provided for backward compatibility. Connection is usually automatic when instantiating `NimoModem`. """ if not self._serial.is_open: self._serial.open() def disconnect(self) -> None: """Detach from the modem serial communications. Provided for backward compatibility. Not typically required. """ self._is_connected = False self._modem_booted = False if self._serial.is_open: self._serial.close() def is_connected(self) -> bool: """Indicates if the modem is responding to a basic AT query.""" try: self._at_command_response('AT') self._is_connected = True self._modem_booted = True return True except ModemError: self._is_connected = False self._modem_booted = False return False @property def baudrate(self) -> int: """The baudrate of the serial connection.""" return self._modem.serial.baudrate @baudrate.setter def baudrate(self, baudrate: int): """Set the baud rate of the modem and adjust the serial rate.""" if baudrate not in [9600, 115200]: raise ValueError('Invalid baudrate') self._at_command_response(f'AT+IPR={baudrate}') self._modem.serial.baudrate = baudrate def retry_baudrate(self) -> bool: """""" for baud in BAUDRATES: self._modem.serial.baudrate = baud if self.is_connected(): return True return False def await_boot(self, boot_timeout: int = 10) -> bool: """Indicates if a boot string is received within a timeout window. Use `is_connected` before waiting for boot. Args: boot_timeout (int): The maximum time to wait in seconds. Returns: True if a valid boot string was received inside the timeout. """ boot_strings = ['ST Version', 'RDY'] _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout) rx_data = '' started = time.time() while time.time() - started < boot_timeout and not self._modem_booted: while self._modem.is_data_waiting(): rx_data += self._modem.read_rx_buffer() if rx_data and any(b in rx_data for b in boot_strings): self._modem_booted = True _log.debug('Found boot string - clearing Rx buffer') while self._modem.is_data_waiting(): rx_data += self._modem.read_rx_buffer() break return self._modem_booted def get_last_error_code(self) -> AtErrorCode: """Get the last error code from the modem.""" return AtErrorCode(int(self._at_command_response('ATS80?'))) def initialize(self, echo: bool = True, verbose: bool = True, ) -> bool: """Initialize the modem AT configuration for Echo and Verbose.""" at_command = (f'ATZ;E{int(echo)};V{int(verbose)}') try: self._at_command_response(at_command) return True except ModemCrcConfig: _log.info('Attempting re-initialize with CRC enabled') self._at_command_response(at_command) return True def set_crc(self, enable: bool = False) -> bool: """Enable or disable CRC error checking on the modem serial port.""" try: self._at_command_response(f'AT%CRC={int(enable)}') return True except ModemCrcConfig: if ((self._modem.crc and enable) or (not self._modem.crc and not enable)): return True return False def reset_factory_config(self) -> None: """Reset the modem's factory default configuration.""" self._at_command_response('AT&F') def save_config(self) -> None: """Store the current configuration to modem non-volatile memory.""" self._at_command_response('AT&W') def get_mobile_id(self) -> str: """Get the modem's globally unique identifier.""" if not self._mobile_id: try: self._mobile_id = self._at_command_response('AT+GSN', '+GSN:') if vlog(VLOG_TAG): _log.debug('Cached Mobile ID %s', self._mobile_id) except ModemError: self._mobile_id = '' raise return self._mobile_id @property def _is_simulator(self) -> bool: return self.get_mobile_id().startswith('00000000') def get_manufacturer(self) -> str: """Get the manufacturer name.""" if not self._manufacturer: try: mfr = self._at_command_response('ATI') if 'quectel' in mfr.lower(): self._manufacturer = Manufacturer.QUECTEL else: if not any(m in mfr.lower() for m in ['orbcomm', 'skywave']): _log.warning('Unsupported manufacturer %s', mfr) self._manufacturer = Manufacturer.ORBCOMM if vlog(VLOG_TAG): _log.debug('Caching manufacturer: %s', self._manufacturer.name) except ModemError: self._manufacturer = Manufacturer.NONE raise return self._manufacturer.name def get_model(self) -> str: """Get the manufacturer model name.""" cmd = 'ATI4' if self._mfr == Manufacturer.QUECTEL: cmd = 'ATI' try: response = self._at_command_response(cmd) if response: if self._mfr == Manufacturer.QUECTEL: response = response.split('\n')[1] return response except ModemError: return '' def get_firmware_version(self) -> str: """Get the modem's firmware version.""" # TODO: Firmware structure with hardware, firmware, software? return self._at_command_response('AT+GMR', '+GMR:') def get_system_time(self) -> int: """Get the system/GNSS time from the modem.""" try: nimo_time = self._at_command_response('AT%UTC', '%UTC:') iso_time = nimo_time.replace(' ', 'T') + 'Z' return iso_to_ts(iso_time) except ModemError: return 0 def get_temperature(self) -> 'int|None': """Get the processor temperature in Celsius.""" try: return int(int(self._at_command_response('ATS85?')) / 10) except ValueError: return None def is_transmit_allowed(self) -> bool: """Indicates if the modem is able to transmit data.""" return self.get_network_status() == 5 def is_blocked(self) -> bool: """Indicates if line-of-sight to the satellite is blocked.""" return self.get_network_status() == 8 def is_muted(self) -> bool: """Indicates if the modem has been muted (disallowed to transmit data). """ return self.get_network_status() == 7 def is_updating_network(self) -> bool: """Indicates if the modem is updating network information. The modem should not be powered down during a network update. """ return self.get_network_status() == 4 def get_network_status(self) -> NetworkStatus: """Get the current satellite acquisition status.""" cmd = 'ATS54?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QREG?' prefix = '+QREG:' return NetworkStatus(int(self._at_command_response(cmd, prefix))) def get_rssi(self) -> float: """Get the current Received Signal Strength Indicator. Also referred to as SNR or C/N0 (dB-Hz) """ cmd = 'ATS90=3 S91=1 S92=1 S116?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSCN' prefix = '+QSCN:' try: return int(self._at_command_response(cmd, prefix)) / 100 except ValueError: return 0 def get_signal_quality(self) -> SignalQuality: """Get a qualitative indicator from 0..5 of the satellite signal.""" snr = self.get_rssi() if snr >= SignalLevelRegional.INVALID.value: return SignalQuality.WARNING if snr >= SignalLevelRegional.BARS_5.value: return SignalQuality.STRONG if snr >= SignalLevelRegional.BARS_4.value: return SignalQuality.GOOD if snr >= SignalLevelRegional.BARS_3.value: return SignalQuality.MID if snr >= SignalLevelRegional.BARS_2.value: return SignalQuality.LOW if snr >= SignalLevelRegional.BARS_1.value: return SignalQuality.WEAK return SignalQuality.NONE def get_acquisition_detail(self) -> AcquisitionInfo: """Get the detailed satellite acquisition status. Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr` indicators. """ cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QEVNT=3,1' prefix = '+QEVNT:' result_str = self._at_command_response(cmd, prefix, timeout=10) if self._mfr == Manufacturer.ORBCOMM: results = [int(x) for x in result_str.split('\n')] ctrl_state = ControlState(results[0]) beam_state = BeamState(results[1]) rssi = float(results[2]) / 100 vcid = results[3] elif self._mfr == Manufacturer.QUECTEL: # Workaround Quectel 20230731 documentation error says +QEVNT: result_str = result_str.replace('+QEVENT:', '').strip() results = [int(x) for x in result_str.split(',')] # <dataCount>,<signedBitmask>,<MTID>,<timestamp>, # <class>,<subclass>,<priority>,<data0>,... data0 = 7 # list index where trace data starts ctrl_state = ControlState(results[data0+22]) beam_state = BeamState(results[data0+23]) rssi = float(results[data0+16]) / 100 vcid = results[data0+1] return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid) def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage': """Submits data to send as a mobile-originated message. If a `message_name` is not supplied one will be generated using the least significant 8 digits of unix timestamp. Args: data (bytes): The data to send. Keyword Args: message_name (str): Optional handle for message in Tx queue. Max 8 characters for Orbcomm modem or 12 for Quectel. priority (int): Optional priority 1 (highest) .. 4 (low, default). May use `MessagePriority`. codec_sin (int): Optional first byte of payload to add as a codec service identifier, must be in range 16..255. codec_min (int): Optional second byte of payload to add as a codec message identifier, must be in range 0..255. return_message (bool): If set, returns a `MoMessage` instead of the message handle. Returns: Message handle (str) or `MoMessage` if `return_message` kwarg is set. Raises: `ValueError` for various parameter limit violations. """ data_size = len(data) msg_payload_sin_min = b'' message_name = kwargs.get('message_name', '') priority = MessagePriority(kwargs.get('priority', MessagePriority.LOW.value)) codec_sin: int = kwargs.get('codec_sin', -1) codec_min: int = kwargs.get('codec_min', -1) if codec_sin > -1: data_size += 1 msg_payload_sin_min += codec_sin.to_bytes(1, 'big') if codec_min > -1: data_size += 1 msg_payload_sin_min += codec_min.to_bytes(1, 'big') if not 2 <= data_size <= MSG_MO_MAX_SIZE: raise ValueError('Invalid mobile-originated message size') if message_name and len(message_name) > self._mo_msg_name_len_max: raise ValueError('Message name too long') data_index = 0 if codec_sin <= -1: codec_sin = data[0] data_index += 1 data_size -= 1 if codec_sin not in range(16, 256): raise ValueError('Illegal first payload byte SIN must be 16..255') if codec_min <= -1: codec_min = data[1] data_index += 1 data_size -= 1 if codec_min > 255: raise ValueError('Invalid second payload byte MIN must be 0..255') max_name_len = self._mo_msg_name_len_max if message_name and len(message_name) > max_name_len: raise ValueError(f'Invalid message name longer than {max_name_len}') if len(message_name) == 0: message_name = f'{int(time.time())}'[-max_name_len:] # Convert to base64 string for serial efficiency # no effect on OTA size, modem always decodes and sends raw bytes OTA data_format = DataFormat.BASE64 formatted_data = base64.b64encode(data[data_index:]).decode('utf-8') cmd = 'AT%MGRT=' codec_sep = '.' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGT=' codec_sep = ',' cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}' f'{codec_min},{data_format},{formatted_data}') self._at_command_response(cmd) if kwargs.get('return_message', False) is True: return MoMessage(message_name, priority, MessageState.TX_READY, payload=(msg_payload_sin_min + data)) return message_name def send_text(self, text: str, **kwargs) -> 'str|MoMessage': """Submits a text string to send as data. If `codec_sin` kwarg is not provided 128 is prepended as the first byte. If `codec_min` kwarg is not provided 1 is prepended as the second byte. Other kwargs as per `send_data`. Args: text (str): The text message to send. Returns: (str) The message name assigned or MoMessage if kwarg `return_message` is set. """ data = b'' codec_sin = int(kwargs.get('codec_sin', 128)) data += codec_sin.to_bytes(1, 'big') codec_min = int(kwargs.get('codec_min', 1)) data += codec_min.to_bytes(1, 'big') data += text.encode() flowthru = ['message_name', 'priority', 'return_message'] next_kwargs = { k:v for k, v in kwargs if k in flowthru } return self.send_data(data, **next_kwargs) def cancel_mo_message(self, message_name: str) -> bool: """Attempts to cancel a previously submitted mobile-originated message. Args: message_name (str): The mobile-originated message handle to delete. """ _log.debug('Attempting to cancel MO message %s', message_name) cmd = 'AT%MGRC' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGC' cmd += f'="{message_name}"' self._at_command_response(cmd) message_states = self.get_mo_message_states(message_name) if len(message_states) > 0: state = message_states[0].state if state == MessageState.TX_CANCELLED: return True elif self._is_simulator: return True _log.warn('Failed to cancel message %s', message_name) return False def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]': """Get a list of mobile-originated message states in the modem Tx queue. Args: message_name (str): Optional filter on message name. Returns: A list of `MoMessage` objects including state and metadata. """ cmd = 'AT%MGRS' prefix = '%MGRS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGS' prefix = '+QSMGS:' if message_name and not self._is_simulator: # Orbcomm Modem Simulator returns ERROR for %MGRS= command cmd += f'="{message_name}"' response_str = self._at_command_response(cmd, prefix) return self._parse_message_states(response_str, is_mo=True) def _parse_message_states(self, response_str: str, is_mo: bool, ) -> 'list[NimoMessage]': """Parses textual metadata to build a SatelliteMessageState.""" mo_states = [] if not response_str: return mo_states if vlog(VLOG_TAG): _log.debug('Parsing %s message states from %s', 'MO' if is_mo else 'MT', response_str) states_meta = [m for m in response_str.split('\n') if m != ''] for meta in states_meta: message = MoMessage() if is_mo else MtMessage() for field_idx, field_data in enumerate(meta.split(',')): self._update_message_state(message, field_idx, field_data, is_mo) mo_states.append(message) return mo_states def _update_message_state(self, message_state: NimoMessage, field_idx: int, field_data: str, is_mo: bool) -> None: """Parse textual metadata to update a message's state.""" if vlog(VLOG_TAG): _log.debug('Parsing %s message state index %d: %s', 'MO' if is_mo else 'MT', field_idx, field_data) mfr = self._mfr if field_idx == 0: message_state.name = field_data.replace('"', '') if vlog(VLOG_TAG): _log.debug('Message name: %s', message_state.name) elif field_idx == 1 and mfr == Manufacturer.ORBCOMM: if vlog(VLOG_TAG): _log.debug('Ignoring msgNum %s', field_data) elif ((field_idx == 2 and mfr == Manufacturer.ORBCOMM) or (field_idx == 1 and mfr == Manufacturer.QUECTEL)): message_state.priority = MessagePriority(int(field_data)) if vlog(VLOG_TAG): _log.debug('Message priority %s', message_state.priority.name) elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or (field_idx == 2 and mfr == Manufacturer.QUECTEL)): if vlog(VLOG_TAG): _log.debug('Ignoring codec SIN %s', field_data) elif ((field_idx == 4 and mfr == Manufacturer.ORBCOMM) or (field_idx == 3 and mfr == Manufacturer.QUECTEL)): message_state.state = MessageState(int(field_data)) if vlog(VLOG_TAG): _log.debug('Message state: %s', message_state.state.name) elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or (field_idx == 4 and mfr == Manufacturer.QUECTEL)): message_state.length = int(field_data) if vlog(VLOG_TAG): _log.debug('Message size: %d bytes', message_state.length) elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or (field_idx == 5 and mfr == Manufacturer.QUECTEL)): message_state.bytes_delivered = int(field_data) if vlog(VLOG_TAG): _log.debug('Bytes delivered: %d', message_state.bytes_delivered) else: _log.warning('Unhandled field index %d (%s) for manufacturer %s', field_idx, 'MO' if is_mo else 'MT', mfr.name) def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]': """Get a list of mobile-terminated message states in the modem Tx queue. Args: message_name (str): Optional filter on message name. Returns: A list of `MtMessage` objects including state and metadata. """ cmd = 'AT%MGFN' if not message_name else 'AT%MGFS' prefix = '%MGFN:' if not message_name else 'AT%MGFS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS' prefix = '+QRMGN:' if not message_name else '+QRMGS:' if message_name and not self._is_simulator: cmd += f'="{message_name}"' response_str = self._at_command_response(cmd, prefix) return self._parse_message_states(response_str, is_mo=False) def get_mt_message(self, message_name: str) -> 'MtMessage|None': """Get a mobile-terminated message from the modem's Rx queue by name.""" cmd = 'AT%MGFG' prefix = '%MGFG:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+GRMGR' prefix = '+GRMGR:' data_format = DataFormat.BASE64 cmd += f'="{message_name}",{data_format}' response = self._at_command_response(cmd, prefix) if response: return self._parse_mt_message(response) return None def _parse_mt_message(self, meta: str) -> MtMessage: """Parse textual metadata to build a MtMessage.""" if vlog(VLOG_TAG): _log.debug('Parsing MT message from meta: %s', meta) data_includes_sin = False mfr = self._mfr message = MtMessage() for field_idx, field_data in enumerate(meta.split(',')): if field_idx == 0: message.name = field_data.replace('"', '') if vlog(VLOG_TAG): _log.debug('Message name: %s', message.name) elif (field_idx == 1 and mfr == Manufacturer.ORBCOMM): if vlog(VLOG_TAG): _log.debug('Ignoring msgNum %s', field_data) elif (field_idx == 2 and mfr == Manufacturer.ORBCOMM): message.priority = MessagePriority(int(field_data)) if vlog(VLOG_TAG): _log.debug('Message priority %s', message.priority.name) elif ((field_idx == 3 and mfr == Manufacturer.ORBCOMM) or (field_idx == 1 and mfr == Manufacturer.QUECTEL)): codec_sin = int(field_data) if not data_includes_sin: message.payload += codec_sin.to_bytes(1, 'big') if vlog(VLOG_TAG): _log.debug('Added SIN as first payload byte: %d', codec_sin) elif (field_idx == 4 and mfr == Manufacturer.ORBCOMM): message.state = MessageState(int(field_data)) if vlog(VLOG_TAG): _log.debug('Message state %s', message.state.name) elif ((field_idx == 5 and mfr == Manufacturer.ORBCOMM) or (field_idx == 2 and mfr == Manufacturer.QUECTEL)): message.length = int(field_data) if vlog(VLOG_TAG): _log.debug('Message size: %d bytes', message.length) elif ((field_idx == 6 and mfr == Manufacturer.ORBCOMM) or (field_idx == 3 and mfr == Manufacturer.QUECTEL)): data_format = DataFormat(int(field_data)) if vlog(VLOG_TAG): _log.debug('Data format %s', data_format.name) elif ((field_idx == 7 and mfr == Manufacturer.ORBCOMM) or (field_idx == 4 and mfr == Manufacturer.QUECTEL)): if vlog(VLOG_TAG): _log.debug('Decoding payload from: %s', field_data) if message.length > 0: if data_format == DataFormat.BASE64: message.payload += base64.b64decode(field_data) elif data_format == DataFormat.HEX: message.payload += bytes.fromhex(field_data) else: # DataFormat.TEXT message.payload += field_data.encode() if message.length != len(message.payload): _log.warn('Message length mismatch') return message def delete_mt_message(self, message_name: str) -> bool: """Remove a mobile-terminated message from the modem's Rx queue.""" cmd = 'AT%MGFM' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QRMGM' cmd += f'="{message_name}"' self._at_command_response(cmd) check = self.get_mt_message_states(message_name) if check and check[0].state == MessageState.RX_RETRIEVED: return True return False def receive_data(self, message_name: str) -> 'bytes|None': """Get the raw data from a mobile-terminated message.""" message = self.get_mt_message(message_name) if message: return message.payload return None def get_gnss_mode(self) -> GnssMode: """Get the modem's GNSS receiver mode.""" cmd = 'ATS39?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSSMOD?' prefix = '+QGNSSMOD:' response = self._at_command_response(cmd, prefix) if self._mfr == Manufacturer.QUECTEL: return GnssModeQuectel(int(response)) return GnssModeOrbcomm(int(response)) def set_gnss_mode(self, gnss_mode: GnssMode) -> None: """Get the modem's GNSS receiver mode.""" cmd = f'ATS39={gnss_mode}' prefix = '' if self._mfr == Manufacturer.QUECTEL: if not GnssModeQuectel.is_valid(gnss_mode): raise ValueError('Invalid GNSS mode') cmd = f'AT+QGNSSMOD={gnss_mode}' prefix = '+QGNSSMOD:' else: if not GnssModeOrbcomm.is_valid(gnss_mode): raise ValueError('Invalid GNSS mode') self._at_command_response(cmd, prefix) def get_gnss_continuous(self) -> int: """Get the modem's GNSS continuous refresh interval in seconds.""" cmd = 'ATS55?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSSCW?' prefix = '+QGNSSCW:' try: return int(self._at_command_response(cmd, prefix)) except ValueError: return 0 def set_gnss_continuous(self, interval: int) -> None: """Set the modem's GNSS continuous refresh interval in seconds. Args: interval (int): Automatic update interval 0..30 seconds. Returns: `True` if successful. Raises: `ValueError` if invalid interval is specified. """ if interval not in range (0, 31): raise ValueError('Invalid GNSS refresh interval') cmd = f'ATS55={interval}' if self._mfr == Manufacturer.QUECTEL: cmd = f'AT+QGNSSCW={interval}' self._at_command_response(cmd) def get_nmea_data(self, stale_secs: int = 1, wait_secs: int = 35, rmc: bool = True, gga: bool = True, gsa: bool = True, gsv: bool = False, ) -> str: """Get a set of NMEA data detailing the modem's location. Args: stale_secs (int): Maximum cached fix age to use in seconds. wait_secs (int): Maximum duration to wait for a fix in seconds. rmc (bool): Include Recommended Minimum data. gga (bool): Include altitude and fix quality data. gsa (bool): Include Dilution of Precision data. gsv (bool): Include verbose GNSS satellite details. """ cmd = 'AT%GPS' prefix = '%GPS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSS' prefix = '+QGNSS:' cmd += f'={stale_secs},{wait_secs}' if rmc: cmd += ',"RMC"' if gga: cmd += ',"GGA"' if gsa: cmd += ',"GSA"' if gsv: cmd += ',"GSV"' try: response = self._at_command_response(cmd, prefix, wait_secs + 5) return response except ModemAtError as exc: if exc.error_code != AtErrorCode.GNSS_TIMEOUT: raise return '' def get_location(self, stale_secs: int = 1, wait_secs: int = 35) -> 'ModemLocation|None': """Get the modem's location. Args: stale_secs (int): Maximum cached fix age to use in seconds. wait_secs (int): Maximum duration to wait for a fix in seconds. Returns: ModemLocation object if GNSS does not time out waiting for fix. """ nmea_data = self.get_nmea_data(stale_secs, wait_secs) if nmea_data: return get_location_from_nmea_data(nmea_data) return None def get_satellite_info(self) -> 'SatelliteLocation|None': """Get the satellite's information including azimuth and elevation. Derives which satellite/GeoBeam is used from trace class 3 subclass 5. Returns: `SatelliteLocation` object (azimuth, elevation) if determinable. """ geobeam = None modem_location = self.get_location() if (modem_location is not None and self.get_network_status() > NetworkStatus.RX_SEARCHING): # satellite has been found cmd = 'ATS90=3 S91=5 S92=1 S102?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QEVNT=3,5' prefix = '+QEVNT:' response = self._at_command_response(cmd, prefix) if self._mfr == Manufacturer.QUECTEL: # workaround documentation error response = response.replace('+QEVENT:', '').strip() response = response.split(',')[9] geobeam = GeoBeam(int(response)) return get_satellite_location(modem_location, geobeam) return None def get_event_mask(self) -> int: """Get the set of monitored events that trigger event notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'ATS88?' try: return int(self._at_command_response(cmd)) except ValueError: return 0 def set_event_mask(self, event_mask: int) -> None: """Set monitored events that trigger event notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') max_bits = 12 if not isinstance(event_mask, int) or event_mask > 2**max_bits-1: raise ValueError('Invalid event bitmask') cmd = f'ATS88={event_mask}' self._at_command_response(cmd) def get_events_asserted_mask(self) -> int: """Get the set of events that are active following a notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'ATS89?' try: return int(self._at_command_response(cmd)) except ValueError: return 0 def get_trace_event_monitor(self, asserted_only: bool = False, ) -> 'list[tuple[int, int]]': """Get the list of monitored Trace Events. Args: asserted_only (bool): If True returns only asserted monitored events Returns: A list of tuples (trace_class, trace_subclass) Raises: `ModemError` if unsupported by the modem type. """ if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'AT%EVMON' prefix = '%EVMON:' trace_events = [] events = self._at_command_response(cmd, prefix).split(',') for event in events: trace_class = int(event.split('.')[0]) trace_subclass = int(event.split('.')[1].replace('*', '')) if not asserted_only or event.endswith('*'): trace_events.append((trace_class, trace_subclass)) return trace_events def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None: """Set the list of monitored trace events.""" cmd = 'AT%EVMON=' for event in events: if not cmd.endswith('='): cmd += ',' cmd += f'{event[0]}.{event[1]}' self._at_command_response(cmd) def get_trace_events_cached(self) -> 'list[tuple[int, int]]': """Get a list of trace events cached.""" return self.get_trace_event_monitor(True) def get_trace_event_data(self, event: 'tuple[int, int]', decode: bool = False, ) -> 'list[int]|dict[str, int]': """Get the trace event data. Args: event (tuple): The trace (class, subclass) decode (bool): Decodes raw data to dictionary (not implemented) """ cmd = f'AT%EVNT={event[0]},{event[1]}' prefix = '%EVNT:' if self._mfr == Manufacturer.QUECTEL: cmd = cmd.replace('%EVNT', '+QEVNT') prefix = '+QEVENT:' # documented as +QEVNT trace = self._at_command_response(cmd, prefix) if decode: raise NotImplementedError return [int(i) for i in trace.split(',')] def get_urc_ctl(self) -> int: """Get the event list that trigger Unsolicited Report Codes.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') cmd = 'AT+QURCCTL?' prefix = '+QURCCTL:' try: return int(self._at_command_response(cmd, prefix), 16) except ValueError: return 0 def set_urc_ctl(self, qurc_mask: int) -> None: """Set the event list that trigger Unsolicited Report Codes.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') cmd = f'AT+QURCCTL=0x{qurc_mask:04X}' self._at_command_response(cmd) def get_urc(self) -> 'UrcCode|None': """Get the pending Unsolicited Result Code if one is present.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') eol = '\r\n' if self._modem.verbose else '\r' result = self._modem.read_rx_buffer(read_until=eol) if result: result = result.replace('+QURC:', '').strip() try: return UrcCode(int(result)) except ValueError: return UrcCode[result] return None def get_power_mode(self) -> PowerMode: """Get the modem's power mode configuration.""" cmd = 'ATS50?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QPMD?' prefix = '+QPMD:' return PowerMode(int(self._at_command_response(cmd, prefix))) def set_power_mode(self, power_mode: PowerMode) -> None: """Set the modem's power mode configuration.""" if not PowerMode.is_valid(power_mode): raise ValueError('Invalid Power Mode') cmd = f'ATS50={power_mode}' if self._mfr == Manufacturer.QUECTEL: cmd = f'AT+QPMD={power_mode}' self._at_command_response(cmd) def get_wakeup_period(self) -> WakeupPeriod: """Get the modem's wakeup period configuration.""" cmd = 'ATS51?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QWKUPCFG?' prefix = '+QWKUPCFG:' if self._mfr == Manufacturer.QUECTEL: return WakeupPeriod(int( self._at_command_response(cmd, prefix).split(',')[0])) return WakeupPeriod(int(self._at_command_response(cmd, prefix))) def set_wakeup_period(self, wakeup_period: WakeupPeriod, wakeup_way: 'WakeupWay|None' = None, ) -> None: """Set the modem's wakeup period configuration. The configuration does not update until confimed by the network. """ if not WakeupPeriod.is_valid(wakeup_period): raise ValueError('Invalid wakeup period') cmd = f'ATS51={wakeup_period}' if self._mfr == Manufacturer.QUECTEL: if wakeup_way is None: query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:') wakeup_way = WakeupWay(int(query.split(',')[1])) cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}' self._at_command_response(cmd) def get_wakeup_way(self) -> WakeupWay: """Get the modem wakeup method.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QWKUPCFG?' prefix = '+QWKUPCFG:' wakeup_way = self._at_command_response(cmd, prefix).split(',')[1] return WakeupWay(int(wakeup_way)) def power_down(self) -> None: """Prepare the modem for power-down.""" cmd = 'AT%OFF' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QPOWD=2' self._at_command_response(cmd) def get_workmode(self) -> WorkMode: """Get the modem working mode.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QMOD?' prefix = '+QMOD:' return WorkMode(int(self._at_command_response(cmd, prefix))) def set_workmode(self, workmode: WorkMode) -> None: """Set the modem working mode.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') if not WorkMode.is_valid(workmode): raise ValueError('Invalid workmode') cmd = f'AT+QMOD={workmode}' self._at_command_response(cmd) def get_deepsleep_enable(self) -> bool: """Get the deepsleep configuration flag.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QSCLK?' prefix = '+QSCLK:' return bool(int(self._at_command_response(cmd, prefix))) def set_deepsleep_enable(self, enable: bool) -> None: """Set the deepsleep configuration flag.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') self._at_command_response(f'AT+QSCLK={int(enable)}') def get_register(self, s_register_number: int) -> 'int|None': """Get a modem register value.""" cmd = f'ATS{s_register_number}?' try: return int(self._at_command_response(cmd)) except ValueError: return None def set_register(self, s_register_number: int, value: int) -> None: """Set a modem register value.""" cmd = f'ATS{s_register_number}={value}' self._at_command_response(cmd) def get_all_registers(self) -> dict: """Get a dictionary of modem register values.""" raise NotImplementedError
Instance variables
var baudrate : int
-
The baudrate of the serial connection.
Expand source code
@property def baudrate(self) -> int: """The baudrate of the serial connection.""" return self._modem.serial.baudrate
var crc_enabled : bool
-
Expand source code
@property def crc_enabled(self) -> bool: return self._modem.crc
var is_ready : bool
-
Expand source code
@property def is_ready(self) -> bool: return not self._modem._lock.locked()
var modem_booted : bool
-
Expand source code
@property def modem_booted(self) -> bool: return self._modem_booted
Methods
def await_boot(self, boot_timeout: int = 10) ‑> bool
-
Indicates if a boot string is received within a timeout window.
Use
is_connected
before waiting for boot.Args
boot_timeout
:int
- The maximum time to wait in seconds.
Returns
True if a valid boot string was received inside the timeout.
Expand source code
def await_boot(self, boot_timeout: int = 10) -> bool: """Indicates if a boot string is received within a timeout window. Use `is_connected` before waiting for boot. Args: boot_timeout (int): The maximum time to wait in seconds. Returns: True if a valid boot string was received inside the timeout. """ boot_strings = ['ST Version', 'RDY'] _log.debug('Awaiting modem boot string for %d seconds...', boot_timeout) rx_data = '' started = time.time() while time.time() - started < boot_timeout and not self._modem_booted: while self._modem.is_data_waiting(): rx_data += self._modem.read_rx_buffer() if rx_data and any(b in rx_data for b in boot_strings): self._modem_booted = True _log.debug('Found boot string - clearing Rx buffer') while self._modem.is_data_waiting(): rx_data += self._modem.read_rx_buffer() break return self._modem_booted
def cancel_mo_message(self, message_name: str) ‑> bool
-
Attempts to cancel a previously submitted mobile-originated message.
Args
message_name
:str
- The mobile-originated message handle to delete.
Expand source code
def cancel_mo_message(self, message_name: str) -> bool: """Attempts to cancel a previously submitted mobile-originated message. Args: message_name (str): The mobile-originated message handle to delete. """ _log.debug('Attempting to cancel MO message %s', message_name) cmd = 'AT%MGRC' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGC' cmd += f'="{message_name}"' self._at_command_response(cmd) message_states = self.get_mo_message_states(message_name) if len(message_states) > 0: state = message_states[0].state if state == MessageState.TX_CANCELLED: return True elif self._is_simulator: return True _log.warn('Failed to cancel message %s', message_name) return False
def connect(self) ‑> None
-
Attach to the modem via serial communications.
Provided for backward compatibility. Connection is usually automatic when instantiating
NimoModem
.Expand source code
def connect(self) -> None: """Attach to the modem via serial communications. Provided for backward compatibility. Connection is usually automatic when instantiating `NimoModem`. """ if not self._serial.is_open: self._serial.open()
def delete_mt_message(self, message_name: str) ‑> bool
-
Remove a mobile-terminated message from the modem's Rx queue.
Expand source code
def delete_mt_message(self, message_name: str) -> bool: """Remove a mobile-terminated message from the modem's Rx queue.""" cmd = 'AT%MGFM' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QRMGM' cmd += f'="{message_name}"' self._at_command_response(cmd) check = self.get_mt_message_states(message_name) if check and check[0].state == MessageState.RX_RETRIEVED: return True return False
def disconnect(self) ‑> None
-
Detach from the modem serial communications.
Provided for backward compatibility. Not typically required.
Expand source code
def disconnect(self) -> None: """Detach from the modem serial communications. Provided for backward compatibility. Not typically required. """ self._is_connected = False self._modem_booted = False if self._serial.is_open: self._serial.close()
def get_acquisition_detail(self) ‑> AcquisitionInfo
-
Get the detailed satellite acquisition status.
Includes
acquisition_state
,beamsearch_state
,vcid
andsnr
indicators.Expand source code
def get_acquisition_detail(self) -> AcquisitionInfo: """Get the detailed satellite acquisition status. Includes `acquisition_state`, `beamsearch_state`, `vcid` and `snr` indicators. """ cmd = 'ATS90=3 S91=1 S92=1 S122? S123? S116? S101?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QEVNT=3,1' prefix = '+QEVNT:' result_str = self._at_command_response(cmd, prefix, timeout=10) if self._mfr == Manufacturer.ORBCOMM: results = [int(x) for x in result_str.split('\n')] ctrl_state = ControlState(results[0]) beam_state = BeamState(results[1]) rssi = float(results[2]) / 100 vcid = results[3] elif self._mfr == Manufacturer.QUECTEL: # Workaround Quectel 20230731 documentation error says +QEVNT: result_str = result_str.replace('+QEVENT:', '').strip() results = [int(x) for x in result_str.split(',')] # <dataCount>,<signedBitmask>,<MTID>,<timestamp>, # <class>,<subclass>,<priority>,<data0>,... data0 = 7 # list index where trace data starts ctrl_state = ControlState(results[data0+22]) beam_state = BeamState(results[data0+23]) rssi = float(results[data0+16]) / 100 vcid = results[data0+1] return AcquisitionInfo(ctrl_state, beam_state, rssi, vcid)
def get_all_registers(self) ‑> dict
-
Get a dictionary of modem register values.
Expand source code
def get_all_registers(self) -> dict: """Get a dictionary of modem register values.""" raise NotImplementedError
def get_deepsleep_enable(self) ‑> bool
-
Get the deepsleep configuration flag.
Expand source code
def get_deepsleep_enable(self) -> bool: """Get the deepsleep configuration flag.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QSCLK?' prefix = '+QSCLK:' return bool(int(self._at_command_response(cmd, prefix)))
def get_event_mask(self) ‑> int
-
Get the set of monitored events that trigger event notification.
Expand source code
def get_event_mask(self) -> int: """Get the set of monitored events that trigger event notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'ATS88?' try: return int(self._at_command_response(cmd)) except ValueError: return 0
def get_events_asserted_mask(self) ‑> int
-
Get the set of events that are active following a notification.
Expand source code
def get_events_asserted_mask(self) -> int: """Get the set of events that are active following a notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'ATS89?' try: return int(self._at_command_response(cmd)) except ValueError: return 0
def get_firmware_version(self) ‑> str
-
Get the modem's firmware version.
Expand source code
def get_firmware_version(self) -> str: """Get the modem's firmware version.""" # TODO: Firmware structure with hardware, firmware, software? return self._at_command_response('AT+GMR', '+GMR:')
def get_gnss_continuous(self) ‑> int
-
Get the modem's GNSS continuous refresh interval in seconds.
Expand source code
def get_gnss_continuous(self) -> int: """Get the modem's GNSS continuous refresh interval in seconds.""" cmd = 'ATS55?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSSCW?' prefix = '+QGNSSCW:' try: return int(self._at_command_response(cmd, prefix)) except ValueError: return 0
def get_gnss_mode(self) ‑> GnssMode
-
Get the modem's GNSS receiver mode.
Expand source code
def get_gnss_mode(self) -> GnssMode: """Get the modem's GNSS receiver mode.""" cmd = 'ATS39?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSSMOD?' prefix = '+QGNSSMOD:' response = self._at_command_response(cmd, prefix) if self._mfr == Manufacturer.QUECTEL: return GnssModeQuectel(int(response)) return GnssModeOrbcomm(int(response))
def get_last_error_code(self) ‑> AtErrorCode
-
Get the last error code from the modem.
Expand source code
def get_last_error_code(self) -> AtErrorCode: """Get the last error code from the modem.""" return AtErrorCode(int(self._at_command_response('ATS80?')))
def get_location(self, stale_secs: int = 1, wait_secs: int = 35) ‑> ModemLocation|None
-
Get the modem's location.
Args
stale_secs
:int
- Maximum cached fix age to use in seconds.
wait_secs
:int
- Maximum duration to wait for a fix in seconds.
Returns
ModemLocation object if GNSS does not time out waiting for fix.
Expand source code
def get_location(self, stale_secs: int = 1, wait_secs: int = 35) -> 'ModemLocation|None': """Get the modem's location. Args: stale_secs (int): Maximum cached fix age to use in seconds. wait_secs (int): Maximum duration to wait for a fix in seconds. Returns: ModemLocation object if GNSS does not time out waiting for fix. """ nmea_data = self.get_nmea_data(stale_secs, wait_secs) if nmea_data: return get_location_from_nmea_data(nmea_data) return None
def get_manufacturer(self) ‑> str
-
Get the manufacturer name.
Expand source code
def get_manufacturer(self) -> str: """Get the manufacturer name.""" if not self._manufacturer: try: mfr = self._at_command_response('ATI') if 'quectel' in mfr.lower(): self._manufacturer = Manufacturer.QUECTEL else: if not any(m in mfr.lower() for m in ['orbcomm', 'skywave']): _log.warning('Unsupported manufacturer %s', mfr) self._manufacturer = Manufacturer.ORBCOMM if vlog(VLOG_TAG): _log.debug('Caching manufacturer: %s', self._manufacturer.name) except ModemError: self._manufacturer = Manufacturer.NONE raise return self._manufacturer.name
def get_mo_message_states(self, message_name: str = '') ‑> list[MoMessage]
-
Get a list of mobile-originated message states in the modem Tx queue.
Args
message_name
:str
- Optional filter on message name.
Returns
A list of
MoMessage
objects including state and metadata.Expand source code
def get_mo_message_states(self, message_name: str = '') -> 'list[MoMessage]': """Get a list of mobile-originated message states in the modem Tx queue. Args: message_name (str): Optional filter on message name. Returns: A list of `MoMessage` objects including state and metadata. """ cmd = 'AT%MGRS' prefix = '%MGRS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGS' prefix = '+QSMGS:' if message_name and not self._is_simulator: # Orbcomm Modem Simulator returns ERROR for %MGRS= command cmd += f'="{message_name}"' response_str = self._at_command_response(cmd, prefix) return self._parse_message_states(response_str, is_mo=True)
def get_mobile_id(self) ‑> str
-
Get the modem's globally unique identifier.
Expand source code
def get_mobile_id(self) -> str: """Get the modem's globally unique identifier.""" if not self._mobile_id: try: self._mobile_id = self._at_command_response('AT+GSN', '+GSN:') if vlog(VLOG_TAG): _log.debug('Cached Mobile ID %s', self._mobile_id) except ModemError: self._mobile_id = '' raise return self._mobile_id
def get_model(self) ‑> str
-
Get the manufacturer model name.
Expand source code
def get_model(self) -> str: """Get the manufacturer model name.""" cmd = 'ATI4' if self._mfr == Manufacturer.QUECTEL: cmd = 'ATI' try: response = self._at_command_response(cmd) if response: if self._mfr == Manufacturer.QUECTEL: response = response.split('\n')[1] return response except ModemError: return ''
def get_mt_message(self, message_name: str) ‑> MtMessage|None
-
Get a mobile-terminated message from the modem's Rx queue by name.
Expand source code
def get_mt_message(self, message_name: str) -> 'MtMessage|None': """Get a mobile-terminated message from the modem's Rx queue by name.""" cmd = 'AT%MGFG' prefix = '%MGFG:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+GRMGR' prefix = '+GRMGR:' data_format = DataFormat.BASE64 cmd += f'="{message_name}",{data_format}' response = self._at_command_response(cmd, prefix) if response: return self._parse_mt_message(response) return None
def get_mt_message_states(self, message_name: str = '') ‑> list[MtMessage]
-
Get a list of mobile-terminated message states in the modem Tx queue.
Args
message_name
:str
- Optional filter on message name.
Returns
A list of
MtMessage
objects including state and metadata.Expand source code
def get_mt_message_states(self, message_name: str = '') -> 'list[MtMessage]': """Get a list of mobile-terminated message states in the modem Tx queue. Args: message_name (str): Optional filter on message name. Returns: A list of `MtMessage` objects including state and metadata. """ cmd = 'AT%MGFN' if not message_name else 'AT%MGFS' prefix = '%MGFN:' if not message_name else 'AT%MGFS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QRMGN' if not message_name else 'AT+QRMGS' prefix = '+QRMGN:' if not message_name else '+QRMGS:' if message_name and not self._is_simulator: cmd += f'="{message_name}"' response_str = self._at_command_response(cmd, prefix) return self._parse_message_states(response_str, is_mo=False)
def get_network_status(self) ‑> NetworkStatus
-
Get the current satellite acquisition status.
Expand source code
def get_network_status(self) -> NetworkStatus: """Get the current satellite acquisition status.""" cmd = 'ATS54?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QREG?' prefix = '+QREG:' return NetworkStatus(int(self._at_command_response(cmd, prefix)))
def get_nmea_data(self, stale_secs: int = 1, wait_secs: int = 35, rmc: bool = True, gga: bool = True, gsa: bool = True, gsv: bool = False) ‑> str
-
Get a set of NMEA data detailing the modem's location.
Args
stale_secs
:int
- Maximum cached fix age to use in seconds.
wait_secs
:int
- Maximum duration to wait for a fix in seconds.
rmc
:bool
- Include Recommended Minimum data.
gga
:bool
- Include altitude and fix quality data.
gsa
:bool
- Include Dilution of Precision data.
gsv
:bool
- Include verbose GNSS satellite details.
Expand source code
def get_nmea_data(self, stale_secs: int = 1, wait_secs: int = 35, rmc: bool = True, gga: bool = True, gsa: bool = True, gsv: bool = False, ) -> str: """Get a set of NMEA data detailing the modem's location. Args: stale_secs (int): Maximum cached fix age to use in seconds. wait_secs (int): Maximum duration to wait for a fix in seconds. rmc (bool): Include Recommended Minimum data. gga (bool): Include altitude and fix quality data. gsa (bool): Include Dilution of Precision data. gsv (bool): Include verbose GNSS satellite details. """ cmd = 'AT%GPS' prefix = '%GPS:' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QGNSS' prefix = '+QGNSS:' cmd += f'={stale_secs},{wait_secs}' if rmc: cmd += ',"RMC"' if gga: cmd += ',"GGA"' if gsa: cmd += ',"GSA"' if gsv: cmd += ',"GSV"' try: response = self._at_command_response(cmd, prefix, wait_secs + 5) return response except ModemAtError as exc: if exc.error_code != AtErrorCode.GNSS_TIMEOUT: raise return ''
def get_power_mode(self) ‑> PowerMode
-
Get the modem's power mode configuration.
Expand source code
def get_power_mode(self) -> PowerMode: """Get the modem's power mode configuration.""" cmd = 'ATS50?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QPMD?' prefix = '+QPMD:' return PowerMode(int(self._at_command_response(cmd, prefix)))
def get_register(self, s_register_number: int) ‑> int|None
-
Get a modem register value.
Expand source code
def get_register(self, s_register_number: int) -> 'int|None': """Get a modem register value.""" cmd = f'ATS{s_register_number}?' try: return int(self._at_command_response(cmd)) except ValueError: return None
def get_rssi(self) ‑> float
-
Get the current Received Signal Strength Indicator.
Also referred to as SNR or C/N0 (dB-Hz)
Expand source code
def get_rssi(self) -> float: """Get the current Received Signal Strength Indicator. Also referred to as SNR or C/N0 (dB-Hz) """ cmd = 'ATS90=3 S91=1 S92=1 S116?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSCN' prefix = '+QSCN:' try: return int(self._at_command_response(cmd, prefix)) / 100 except ValueError: return 0
def get_satellite_info(self) ‑> SatelliteLocation|None
-
Get the satellite's information including azimuth and elevation.
Derives which satellite/GeoBeam is used from trace class 3 subclass 5.
Returns
SatelliteLocation
object (azimuth, elevation) if determinable.Expand source code
def get_satellite_info(self) -> 'SatelliteLocation|None': """Get the satellite's information including azimuth and elevation. Derives which satellite/GeoBeam is used from trace class 3 subclass 5. Returns: `SatelliteLocation` object (azimuth, elevation) if determinable. """ geobeam = None modem_location = self.get_location() if (modem_location is not None and self.get_network_status() > NetworkStatus.RX_SEARCHING): # satellite has been found cmd = 'ATS90=3 S91=5 S92=1 S102?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QEVNT=3,5' prefix = '+QEVNT:' response = self._at_command_response(cmd, prefix) if self._mfr == Manufacturer.QUECTEL: # workaround documentation error response = response.replace('+QEVENT:', '').strip() response = response.split(',')[9] geobeam = GeoBeam(int(response)) return get_satellite_location(modem_location, geobeam) return None
def get_signal_quality(self) ‑> SignalQuality
-
Get a qualitative indicator from 0..5 of the satellite signal.
Expand source code
def get_signal_quality(self) -> SignalQuality: """Get a qualitative indicator from 0..5 of the satellite signal.""" snr = self.get_rssi() if snr >= SignalLevelRegional.INVALID.value: return SignalQuality.WARNING if snr >= SignalLevelRegional.BARS_5.value: return SignalQuality.STRONG if snr >= SignalLevelRegional.BARS_4.value: return SignalQuality.GOOD if snr >= SignalLevelRegional.BARS_3.value: return SignalQuality.MID if snr >= SignalLevelRegional.BARS_2.value: return SignalQuality.LOW if snr >= SignalLevelRegional.BARS_1.value: return SignalQuality.WEAK return SignalQuality.NONE
def get_system_time(self) ‑> int
-
Get the system/GNSS time from the modem.
Expand source code
def get_system_time(self) -> int: """Get the system/GNSS time from the modem.""" try: nimo_time = self._at_command_response('AT%UTC', '%UTC:') iso_time = nimo_time.replace(' ', 'T') + 'Z' return iso_to_ts(iso_time) except ModemError: return 0
def get_temperature(self) ‑> int|None
-
Get the processor temperature in Celsius.
Expand source code
def get_temperature(self) -> 'int|None': """Get the processor temperature in Celsius.""" try: return int(int(self._at_command_response('ATS85?')) / 10) except ValueError: return None
def get_trace_event_data(self, event: tuple[int, int], decode: bool = False) ‑> list[int]|dict[str, int]
-
Get the trace event data.
Args
event
:tuple
- The trace (class, subclass)
decode
:bool
- Decodes raw data to dictionary (not implemented)
Expand source code
def get_trace_event_data(self, event: 'tuple[int, int]', decode: bool = False, ) -> 'list[int]|dict[str, int]': """Get the trace event data. Args: event (tuple): The trace (class, subclass) decode (bool): Decodes raw data to dictionary (not implemented) """ cmd = f'AT%EVNT={event[0]},{event[1]}' prefix = '%EVNT:' if self._mfr == Manufacturer.QUECTEL: cmd = cmd.replace('%EVNT', '+QEVNT') prefix = '+QEVENT:' # documented as +QEVNT trace = self._at_command_response(cmd, prefix) if decode: raise NotImplementedError return [int(i) for i in trace.split(',')]
def get_trace_event_monitor(self, asserted_only: bool = False) ‑> list[tuple[int, int]]
-
Get the list of monitored Trace Events.
Args
asserted_only
:bool
- If True returns only asserted monitored events
Returns
A list of tuples (trace_class, trace_subclass)
Raises
ModemError
if unsupported by the modem type.Expand source code
def get_trace_event_monitor(self, asserted_only: bool = False, ) -> 'list[tuple[int, int]]': """Get the list of monitored Trace Events. Args: asserted_only (bool): If True returns only asserted monitored events Returns: A list of tuples (trace_class, trace_subclass) Raises: `ModemError` if unsupported by the modem type. """ if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') cmd = 'AT%EVMON' prefix = '%EVMON:' trace_events = [] events = self._at_command_response(cmd, prefix).split(',') for event in events: trace_class = int(event.split('.')[0]) trace_subclass = int(event.split('.')[1].replace('*', '')) if not asserted_only or event.endswith('*'): trace_events.append((trace_class, trace_subclass)) return trace_events
def get_trace_events_cached(self) ‑> list[tuple[int, int]]
-
Get a list of trace events cached.
Expand source code
def get_trace_events_cached(self) -> 'list[tuple[int, int]]': """Get a list of trace events cached.""" return self.get_trace_event_monitor(True)
def get_urc(self) ‑> UrcCode|None
-
Get the pending Unsolicited Result Code if one is present.
Expand source code
def get_urc(self) -> 'UrcCode|None': """Get the pending Unsolicited Result Code if one is present.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') eol = '\r\n' if self._modem.verbose else '\r' result = self._modem.read_rx_buffer(read_until=eol) if result: result = result.replace('+QURC:', '').strip() try: return UrcCode(int(result)) except ValueError: return UrcCode[result] return None
def get_urc_ctl(self) ‑> int
-
Get the event list that trigger Unsolicited Report Codes.
Expand source code
def get_urc_ctl(self) -> int: """Get the event list that trigger Unsolicited Report Codes.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') cmd = 'AT+QURCCTL?' prefix = '+QURCCTL:' try: return int(self._at_command_response(cmd, prefix), 16) except ValueError: return 0
def get_wakeup_period(self) ‑> WakeupPeriod
-
Get the modem's wakeup period configuration.
Expand source code
def get_wakeup_period(self) -> WakeupPeriod: """Get the modem's wakeup period configuration.""" cmd = 'ATS51?' prefix = '' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QWKUPCFG?' prefix = '+QWKUPCFG:' if self._mfr == Manufacturer.QUECTEL: return WakeupPeriod(int( self._at_command_response(cmd, prefix).split(',')[0])) return WakeupPeriod(int(self._at_command_response(cmd, prefix)))
def get_wakeup_way(self) ‑> WakeupWay
-
Get the modem wakeup method.
Expand source code
def get_wakeup_way(self) -> WakeupWay: """Get the modem wakeup method.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QWKUPCFG?' prefix = '+QWKUPCFG:' wakeup_way = self._at_command_response(cmd, prefix).split(',')[1] return WakeupWay(int(wakeup_way))
def get_workmode(self) ‑> WorkMode
-
Get the modem working mode.
Expand source code
def get_workmode(self) -> WorkMode: """Get the modem working mode.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') cmd = 'AT+QMOD?' prefix = '+QMOD:' return WorkMode(int(self._at_command_response(cmd, prefix)))
def initialize(self, echo: bool = True, verbose: bool = True) ‑> bool
-
Initialize the modem AT configuration for Echo and Verbose.
Expand source code
def initialize(self, echo: bool = True, verbose: bool = True, ) -> bool: """Initialize the modem AT configuration for Echo and Verbose.""" at_command = (f'ATZ;E{int(echo)};V{int(verbose)}') try: self._at_command_response(at_command) return True except ModemCrcConfig: _log.info('Attempting re-initialize with CRC enabled') self._at_command_response(at_command) return True
def is_blocked(self) ‑> bool
-
Indicates if line-of-sight to the satellite is blocked.
Expand source code
def is_blocked(self) -> bool: """Indicates if line-of-sight to the satellite is blocked.""" return self.get_network_status() == 8
def is_connected(self) ‑> bool
-
Indicates if the modem is responding to a basic AT query.
Expand source code
def is_connected(self) -> bool: """Indicates if the modem is responding to a basic AT query.""" try: self._at_command_response('AT') self._is_connected = True self._modem_booted = True return True except ModemError: self._is_connected = False self._modem_booted = False return False
def is_muted(self) ‑> bool
-
Indicates if the modem has been muted (disallowed to transmit data).
Expand source code
def is_muted(self) -> bool: """Indicates if the modem has been muted (disallowed to transmit data). """ return self.get_network_status() == 7
def is_transmit_allowed(self) ‑> bool
-
Indicates if the modem is able to transmit data.
Expand source code
def is_transmit_allowed(self) -> bool: """Indicates if the modem is able to transmit data.""" return self.get_network_status() == 5
def is_updating_network(self) ‑> bool
-
Indicates if the modem is updating network information.
The modem should not be powered down during a network update.
Expand source code
def is_updating_network(self) -> bool: """Indicates if the modem is updating network information. The modem should not be powered down during a network update. """ return self.get_network_status() == 4
def power_down(self) ‑> None
-
Prepare the modem for power-down.
Expand source code
def power_down(self) -> None: """Prepare the modem for power-down.""" cmd = 'AT%OFF' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QPOWD=2' self._at_command_response(cmd)
def receive_data(self, message_name: str) ‑> bytes|None
-
Get the raw data from a mobile-terminated message.
Expand source code
def receive_data(self, message_name: str) -> 'bytes|None': """Get the raw data from a mobile-terminated message.""" message = self.get_mt_message(message_name) if message: return message.payload return None
def reset_factory_config(self) ‑> None
-
Reset the modem's factory default configuration.
Expand source code
def reset_factory_config(self) -> None: """Reset the modem's factory default configuration.""" self._at_command_response('AT&F')
def retry_baudrate(self) ‑> bool
-
Expand source code
def retry_baudrate(self) -> bool: """""" for baud in BAUDRATES: self._modem.serial.baudrate = baud if self.is_connected(): return True return False
def save_config(self) ‑> None
-
Store the current configuration to modem non-volatile memory.
Expand source code
def save_config(self) -> None: """Store the current configuration to modem non-volatile memory.""" self._at_command_response('AT&W')
def send_data(self, data: bytes, **kwargs) ‑> str|MoMessage
-
Submits data to send as a mobile-originated message.
If a
message_name
is not supplied one will be generated using the least significant 8 digits of unix timestamp.Args
data
:bytes
- The data to send.
Keyword Args: message_name (str): Optional handle for message in Tx queue. Max 8 characters for Orbcomm modem or 12 for Quectel. priority (int): Optional priority 1 (highest) .. 4 (low, default). May use
MessagePriority
. codec_sin (int): Optional first byte of payload to add as a codec service identifier, must be in range 16..255. codec_min (int): Optional second byte of payload to add as a codec message identifier, must be in range 0..255. return_message (bool): If set, returns aMoMessage
instead of the message handle.Returns
Message handle (str) or
MoMessage
ifreturn_message
kwarg is set.Raises
ValueError
for various parameter limit violations.Expand source code
def send_data(self, data: bytes, **kwargs) -> 'str|MoMessage': """Submits data to send as a mobile-originated message. If a `message_name` is not supplied one will be generated using the least significant 8 digits of unix timestamp. Args: data (bytes): The data to send. Keyword Args: message_name (str): Optional handle for message in Tx queue. Max 8 characters for Orbcomm modem or 12 for Quectel. priority (int): Optional priority 1 (highest) .. 4 (low, default). May use `MessagePriority`. codec_sin (int): Optional first byte of payload to add as a codec service identifier, must be in range 16..255. codec_min (int): Optional second byte of payload to add as a codec message identifier, must be in range 0..255. return_message (bool): If set, returns a `MoMessage` instead of the message handle. Returns: Message handle (str) or `MoMessage` if `return_message` kwarg is set. Raises: `ValueError` for various parameter limit violations. """ data_size = len(data) msg_payload_sin_min = b'' message_name = kwargs.get('message_name', '') priority = MessagePriority(kwargs.get('priority', MessagePriority.LOW.value)) codec_sin: int = kwargs.get('codec_sin', -1) codec_min: int = kwargs.get('codec_min', -1) if codec_sin > -1: data_size += 1 msg_payload_sin_min += codec_sin.to_bytes(1, 'big') if codec_min > -1: data_size += 1 msg_payload_sin_min += codec_min.to_bytes(1, 'big') if not 2 <= data_size <= MSG_MO_MAX_SIZE: raise ValueError('Invalid mobile-originated message size') if message_name and len(message_name) > self._mo_msg_name_len_max: raise ValueError('Message name too long') data_index = 0 if codec_sin <= -1: codec_sin = data[0] data_index += 1 data_size -= 1 if codec_sin not in range(16, 256): raise ValueError('Illegal first payload byte SIN must be 16..255') if codec_min <= -1: codec_min = data[1] data_index += 1 data_size -= 1 if codec_min > 255: raise ValueError('Invalid second payload byte MIN must be 0..255') max_name_len = self._mo_msg_name_len_max if message_name and len(message_name) > max_name_len: raise ValueError(f'Invalid message name longer than {max_name_len}') if len(message_name) == 0: message_name = f'{int(time.time())}'[-max_name_len:] # Convert to base64 string for serial efficiency # no effect on OTA size, modem always decodes and sends raw bytes OTA data_format = DataFormat.BASE64 formatted_data = base64.b64encode(data[data_index:]).decode('utf-8') cmd = 'AT%MGRT=' codec_sep = '.' if self._mfr == Manufacturer.QUECTEL: cmd = 'AT+QSMGT=' codec_sep = ',' cmd = (f'{cmd}"{message_name}",{priority},{codec_sin}{codec_sep}' f'{codec_min},{data_format},{formatted_data}') self._at_command_response(cmd) if kwargs.get('return_message', False) is True: return MoMessage(message_name, priority, MessageState.TX_READY, payload=(msg_payload_sin_min + data)) return message_name
def send_text(self, text: str, **kwargs) ‑> str|MoMessage
-
Submits a text string to send as data.
If
codec_sin
kwarg is not provided 128 is prepended as the first byte. Ifcodec_min
kwarg is not provided 1 is prepended as the second byte. Other kwargs as persend_data
.Args
text
:str
- The text message to send.
Returns
(str) The message name assigned or MoMessage if kwarg
return_message
is set.Expand source code
def send_text(self, text: str, **kwargs) -> 'str|MoMessage': """Submits a text string to send as data. If `codec_sin` kwarg is not provided 128 is prepended as the first byte. If `codec_min` kwarg is not provided 1 is prepended as the second byte. Other kwargs as per `send_data`. Args: text (str): The text message to send. Returns: (str) The message name assigned or MoMessage if kwarg `return_message` is set. """ data = b'' codec_sin = int(kwargs.get('codec_sin', 128)) data += codec_sin.to_bytes(1, 'big') codec_min = int(kwargs.get('codec_min', 1)) data += codec_min.to_bytes(1, 'big') data += text.encode() flowthru = ['message_name', 'priority', 'return_message'] next_kwargs = { k:v for k, v in kwargs if k in flowthru } return self.send_data(data, **next_kwargs)
def set_crc(self, enable: bool = False) ‑> bool
-
Enable or disable CRC error checking on the modem serial port.
Expand source code
def set_crc(self, enable: bool = False) -> bool: """Enable or disable CRC error checking on the modem serial port.""" try: self._at_command_response(f'AT%CRC={int(enable)}') return True except ModemCrcConfig: if ((self._modem.crc and enable) or (not self._modem.crc and not enable)): return True return False
def set_deepsleep_enable(self, enable: bool) ‑> None
-
Set the deepsleep configuration flag.
Expand source code
def set_deepsleep_enable(self, enable: bool) -> None: """Set the deepsleep configuration flag.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') self._at_command_response(f'AT+QSCLK={int(enable)}')
def set_event_mask(self, event_mask: int) ‑> None
-
Set monitored events that trigger event notification.
Expand source code
def set_event_mask(self, event_mask: int) -> None: """Set monitored events that trigger event notification.""" if self._mfr != Manufacturer.ORBCOMM: raise ModemError('Operation not supported by this modem') max_bits = 12 if not isinstance(event_mask, int) or event_mask > 2**max_bits-1: raise ValueError('Invalid event bitmask') cmd = f'ATS88={event_mask}' self._at_command_response(cmd)
def set_gnss_continuous(self, interval: int) ‑> None
-
Set the modem's GNSS continuous refresh interval in seconds.
Args
interval
:int
- Automatic update interval 0..30 seconds.
Returns
True
if successful.Raises
ValueError
if invalid interval is specified.Expand source code
def set_gnss_continuous(self, interval: int) -> None: """Set the modem's GNSS continuous refresh interval in seconds. Args: interval (int): Automatic update interval 0..30 seconds. Returns: `True` if successful. Raises: `ValueError` if invalid interval is specified. """ if interval not in range (0, 31): raise ValueError('Invalid GNSS refresh interval') cmd = f'ATS55={interval}' if self._mfr == Manufacturer.QUECTEL: cmd = f'AT+QGNSSCW={interval}' self._at_command_response(cmd)
def set_gnss_mode(self, gnss_mode: GnssMode) ‑> None
-
Get the modem's GNSS receiver mode.
Expand source code
def set_gnss_mode(self, gnss_mode: GnssMode) -> None: """Get the modem's GNSS receiver mode.""" cmd = f'ATS39={gnss_mode}' prefix = '' if self._mfr == Manufacturer.QUECTEL: if not GnssModeQuectel.is_valid(gnss_mode): raise ValueError('Invalid GNSS mode') cmd = f'AT+QGNSSMOD={gnss_mode}' prefix = '+QGNSSMOD:' else: if not GnssModeOrbcomm.is_valid(gnss_mode): raise ValueError('Invalid GNSS mode') self._at_command_response(cmd, prefix)
def set_power_mode(self, power_mode: PowerMode) ‑> None
-
Set the modem's power mode configuration.
Expand source code
def set_power_mode(self, power_mode: PowerMode) -> None: """Set the modem's power mode configuration.""" if not PowerMode.is_valid(power_mode): raise ValueError('Invalid Power Mode') cmd = f'ATS50={power_mode}' if self._mfr == Manufacturer.QUECTEL: cmd = f'AT+QPMD={power_mode}' self._at_command_response(cmd)
def set_register(self, s_register_number: int, value: int) ‑> None
-
Set a modem register value.
Expand source code
def set_register(self, s_register_number: int, value: int) -> None: """Set a modem register value.""" cmd = f'ATS{s_register_number}={value}' self._at_command_response(cmd)
def set_trace_event_monitor(self, events: list[tuple[int, int]]) ‑> None
-
Set the list of monitored trace events.
Expand source code
def set_trace_event_monitor(self, events: 'list[tuple[int, int]]') -> None: """Set the list of monitored trace events.""" cmd = 'AT%EVMON=' for event in events: if not cmd.endswith('='): cmd += ',' cmd += f'{event[0]}.{event[1]}' self._at_command_response(cmd)
def set_urc_ctl(self, qurc_mask: int) ‑> None
-
Set the event list that trigger Unsolicited Report Codes.
Expand source code
def set_urc_ctl(self, qurc_mask: int) -> None: """Set the event list that trigger Unsolicited Report Codes.""" if self._mfr != Manufacturer.QUECTEL: raise ValueError('Modem does not support this feature') cmd = f'AT+QURCCTL=0x{qurc_mask:04X}' self._at_command_response(cmd)
def set_wakeup_period(self, wakeup_period: WakeupPeriod, wakeup_way: WakeupWay|None = None)
-
Set the modem's wakeup period configuration.
The configuration does not update until confimed by the network.
Expand source code
def set_wakeup_period(self, wakeup_period: WakeupPeriod, wakeup_way: 'WakeupWay|None' = None, ) -> None: """Set the modem's wakeup period configuration. The configuration does not update until confimed by the network. """ if not WakeupPeriod.is_valid(wakeup_period): raise ValueError('Invalid wakeup period') cmd = f'ATS51={wakeup_period}' if self._mfr == Manufacturer.QUECTEL: if wakeup_way is None: query = self._at_command_response('AT+QWKUPCFG?', '+QWKUPCFG:') wakeup_way = WakeupWay(int(query.split(',')[1])) cmd = f'AT+QWKUPCFG={wakeup_period},{wakeup_way}' self._at_command_response(cmd)
def set_workmode(self, workmode: WorkMode) ‑> None
-
Set the modem working mode.
Expand source code
def set_workmode(self, workmode: WorkMode) -> None: """Set the modem working mode.""" if self._mfr != Manufacturer.QUECTEL: raise ModemError('Operation not supported by this modem') if not WorkMode.is_valid(workmode): raise ValueError('Invalid workmode') cmd = f'AT+QMOD={workmode}' self._at_command_response(cmd)
class PowerMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The Power Mode setting of the NIMO modem.
Implies various internal state machine settings for balancing power consumption against speed of recovery from line of sight blockages.
Expand source code
class PowerMode(NimoIntEnum): """The Power Mode setting of the NIMO modem. Implies various internal state machine settings for balancing power consumption against speed of recovery from line of sight blockages. """ MOBILE_POWERED = 0 FIXED_POWERED = 1 MOBILE_BATTERY = 2 FIXED_BATTERY = 3 MOBILE_MINIMAL = 4 MOBILE_PARKED = 5 def gnss_refresh_hours(self): """The minimum GNSS refresh interval [hours].""" if self.value == 0: return 3 if self.value == 1: return 24 if self.value == 2: return 6 if self.value == 3: return 14 * 24 if self.value == 4: return 12 if self.value == 5: return 24 def transmit_lifetime_seconds(self): """The maximum duration of a message in the transmit queue [seconds].""" if self.value in [0, 1]: return 3 * 3600 return 3 * 60 def beam_search_interval_seconds(self): """The minimum time between background beam searches [seconds].""" if self.value in [0, 2, 4]: return 20 * 60 return 60 * 60 def short_term_blockage_seconds(self): """The time in blockage before initiating a beam search [seconds].""" if self.value in [0, 1]: return 5 * 60 if self.value in [2, 3, 4]: return 20 * 60 return 15 * 60 def beam_search_maximum_seconds(self): """The maximum backoff interval between searches when blocked [seconds]. """ if self.value in [0, 1]: return 0 return 1600 * 60
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var FIXED_BATTERY
var FIXED_POWERED
var MOBILE_BATTERY
var MOBILE_MINIMAL
var MOBILE_PARKED
var MOBILE_POWERED
Methods
def beam_search_interval_seconds(self)
-
The minimum time between background beam searches [seconds].
Expand source code
def beam_search_interval_seconds(self): """The minimum time between background beam searches [seconds].""" if self.value in [0, 2, 4]: return 20 * 60 return 60 * 60
def beam_search_maximum_seconds(self)
-
The maximum backoff interval between searches when blocked [seconds].
Expand source code
def beam_search_maximum_seconds(self): """The maximum backoff interval between searches when blocked [seconds]. """ if self.value in [0, 1]: return 0 return 1600 * 60
def gnss_refresh_hours(self)
-
The minimum GNSS refresh interval [hours].
Expand source code
def gnss_refresh_hours(self): """The minimum GNSS refresh interval [hours].""" if self.value == 0: return 3 if self.value == 1: return 24 if self.value == 2: return 6 if self.value == 3: return 14 * 24 if self.value == 4: return 12 if self.value == 5: return 24
def short_term_blockage_seconds(self)
-
The time in blockage before initiating a beam search [seconds].
Expand source code
def short_term_blockage_seconds(self): """The time in blockage before initiating a beam search [seconds].""" if self.value in [0, 1]: return 5 * 60 if self.value in [2, 3, 4]: return 20 * 60 return 15 * 60
def transmit_lifetime_seconds(self)
-
The maximum duration of a message in the transmit queue [seconds].
Expand source code
def transmit_lifetime_seconds(self): """The maximum duration of a message in the transmit queue [seconds].""" if self.value in [0, 1]: return 3 * 3600 return 3 * 60
Inherited members
class SatelliteLocation (name: str = '', latitude: float = 0.0, longitude: float = 180.0, altitude: float = 35786000, azimuth: float = 0.0, elevation: float = 0.0, geobeam: GeoBeam|None = None)
-
Represents a geostationary satellite location relative to a modem.
Expand source code
@dataclass class SatelliteLocation: """Represents a geostationary satellite location relative to a modem.""" name: str = '' latitude: float = 0.0 longitude: float = 180.0 altitude: float = GEOSTATIONARY_DISTANCE_M azimuth: float = 0.0 elevation: float = 0.0 geobeam: 'GeoBeam|None' = None
Class variables
var altitude : float
var azimuth : float
var elevation : float
var geobeam : GeoBeam|None
var latitude : float
var longitude : float
var name : str
class SignalQuality (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Qualitative descriptor corresponding to a SignalLevel
Expand source code
class SignalQuality(NimoIntEnum): """Qualitative descriptor corresponding to a SignalLevel""" NONE = 0 WEAK = 1 LOW = 2 MID = 3 GOOD = 4 STRONG = 5 WARNING = 6
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var GOOD
var LOW
var MID
var NONE
var STRONG
var WARNING
var WEAK
Inherited members
class UrcCode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Quectel URC code map.
Expand source code
class UrcCode(NimoIntEnum): """Quectel URC code map.""" GPS_FIX = 0 RX_END = 1 TX_END = 2 REGED = 3 ITV_CHG = 4 TIME_UPD = 5 GPS_TMO = 6 PLG_RESP = 7
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var GPS_FIX
var GPS_TMO
var ITV_CHG
var PLG_RESP
var REGED
var RX_END
var TIME_UPD
var TX_END
Inherited members
class UrcControl (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Control bits for Quectel Unsolicited Response Codes.
Expand source code
class UrcControl(IntFlag): """Control bits for Quectel Unsolicited Response Codes.""" GNSS_FIX_NEW = 0b00000001 MESSAGE_MT_RECEIVED = 0b00000010 MESSAGE_MO_COMPLETE = 0b00000100 NETWORK_REGISTERED = 0b00001000 WAKEUP_PERIOD_CHANGE = 0b00010000 UTC_TIME_SYNC = 0b00100000 GNSS_FIX_TIMEOUT = 0b01000000 NETWORK_PING_ACKNOWLEDGED = 0b10000000 @classmethod def get_events(cls, event_mask: int) -> 'list[EventNotification]': """Parses a bitmask to return a list of events.""" return [item for item in cls if item.value & event_mask]
Ancestors
- enum.IntFlag
- builtins.int
- enum.Flag
- enum.Enum
Class variables
var GNSS_FIX_NEW
var GNSS_FIX_TIMEOUT
var MESSAGE_MO_COMPLETE
var MESSAGE_MT_RECEIVED
var NETWORK_PING_ACKNOWLEDGED
var NETWORK_REGISTERED
var UTC_TIME_SYNC
var WAKEUP_PERIOD_CHANGE
Static methods
def get_events(event_mask: int) ‑> list[EventNotification]
-
Parses a bitmask to return a list of events.
Expand source code
@classmethod def get_events(cls, event_mask: int) -> 'list[EventNotification]': """Parses a bitmask to return a list of events.""" return [item for item in cls if item.value & event_mask]
class WakeupPeriod (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
The Wakeup Period setting of a NIMO modem.
Determines how often the modem wakes up to listen briefly for potential mobile-terminated messages to be delivered by the network.
Expand source code
class WakeupPeriod(NimoIntEnum): """The Wakeup Period setting of a NIMO modem. Determines how often the modem wakes up to listen briefly for potential mobile-terminated messages to be delivered by the network. """ NONE = 0 # 5 seconds SECONDS_30 = 1 MINUTES_1 = 2 MINUTES_3 = 3 MINUTES_10 = 4 MINUTES_30 = 5 MINUTES_60 = 6 MINUTES_2 = 7 MINUTES_5 = 8 MINUTES_15 = 9 MINUTES_20 = 10 def seconds(self): if self.name == 'NONE': return 5 value = int(self.name.split('_'))[1] if self.name.startswith('MINUTES'): return value * 60 return value
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var MINUTES_1
var MINUTES_10
var MINUTES_15
var MINUTES_2
var MINUTES_20
var MINUTES_3
var MINUTES_30
var MINUTES_5
var MINUTES_60
var NONE
var SECONDS_30
Methods
def seconds(self)
-
Expand source code
def seconds(self): if self.name == 'NONE': return 5 value = int(self.name.split('_'))[1] if self.name.startswith('MINUTES'): return value * 60 return value
Inherited members
class WakeupWay (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Quectel CC200A-LB wakeup methods.
Expand source code
class WakeupWay(NimoIntEnum): """Quectel CC200A-LB wakeup methods.""" WAKEUP_PIN = 0 UART = 1
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var UART
var WAKEUP_PIN
Inherited members
class WorkMode (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Quectel CC200A-LB working modes.
Expand source code
class WorkMode(NimoIntEnum): """Quectel CC200A-LB working modes.""" WORKING = 1 GNSS = 2 PERIODIC_SLEEP = 3
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var GNSS
var PERIODIC_SLEEP
var WORKING
Inherited members