Module pynimomodem.location
Classes and methods for location, elevation and azimuth for NIMO modems.
This module:
- Parses NMEA-0183 data into a
ModemLocation
object. - Calculates azimuth and elevation to a geostationary
SatelliteLocation
.
Thanks for Azimuth/Elevation derived from code at: https://github.com/sq3tle/altazrange/tree/master
Expand source code
"""Classes and methods for location, elevation and azimuth for NIMO modems.
This module:
* Parses NMEA-0183 data into a `ModemLocation` object.
* Calculates azimuth and elevation to a geostationary `SatelliteLocation`.
Thanks for Azimuth/Elevation derived from code at:
https://github.com/sq3tle/altazrange/tree/master
"""
import json
import logging
import math
from copy import deepcopy
from dataclasses import dataclass
from .constants import (
GEOSTATIONARY_DISTANCE_M,
GeoBeam,
GeoSatellite,
NimoIntEnum,
)
from .nimoutils import iso_to_ts, ts_to_iso, vlog
VLOG_TAG = 'nmealocation'
TRACE_TAG = VLOG_TAG + 'trace'
_log = logging.getLogger(__name__)
class GnssFixType(NimoIntEnum):
"""Enumerated fix type from NMEA-0183 standard."""
NONE = 1
D2 = 2
D3 = 3
class GnssFixQuality(NimoIntEnum):
"""Enumerated fix quality from NMEA-0183 standard."""
INVALID = 0
GPS_SPS = 1
DGPS = 2
PPS = 3
RTK = 4
FLOAT_RTK = 5
EST_DEAD_RECKONING = 6
MANUAL = 7
SIMULATION = 8
@dataclass
class GnssSatelliteInfo(object):
"""Information specific to a GNSS satellite.
Attributes:
prn: The PRN code (Pseudo-Random Number sequence)
elevation: The satellite elevation
azimuth: The satellite azimuth
snr: The satellite Signal-to-Noise Ratio
"""
prn: int
elevation: int
azimuth: int
snr: int
class ModemLocation:
"""A set of location-based information derived from the modem's NMEA data.
Uses 90.0/180.0 if latitude/longitude are unknown
Attributes:
latitude (float): decimal degrees
longitude (float): decimal degrees
altitude (float): in metres
speed (float): in knots
heading (float): in degrees
timestamp (int): in seconds since 1970-01-01T00:00:00Z
satellites (int): in view at time of fix
fix_type (GnssFixType): 1=None, 2=2D or 3=3D
fix_quality (GnssFixQuality): Enumerated lookup value
pdop (float): Probability Dilution of Precision
hdop (float): Horizontal Dilution of Precision
vdop (float): Vertical Dilution of Precision
time_iso (str): ISO 8601 formatted timestamp
"""
def __init__(self, **kwargs):
"""Initializes a Location with default latitude/longitude 90/180."""
self.latitude = float(kwargs.get('latitude', 90.0))
self.longitude = float(kwargs.get('longitude', 180.0))
self.altitude = float(kwargs.get('altitude', 0.0)) # metres
self.speed = float(kwargs.get('speed', 0.0)) # knots
self.heading = float(kwargs.get('heading', 0.0)) # degrees
self.timestamp = int(kwargs.get('timestamp', 0)) # seconds (unix)
self.satellites = int(kwargs.get('satellites', 0))
self.fix_type = GnssFixType(int(kwargs.get('fix_type', 1)))
self.fix_quality = GnssFixQuality(int(kwargs.get('fix_quality', 0)))
self.pdop = float(kwargs.get('pdop', 99))
self.hdop = float(kwargs.get('hdop', 99))
self.vdop = float(kwargs.get('vdop', 99))
# self.satellites_info: 'list[GnssSatelliteInfo]' = kwargs.get(
# 'satellites_info', []
# )
@property
def time_iso(self) -> str:
return f'{ts_to_iso(self.timestamp)}'
# def _update_satellites_info(self,
# satellites_info: 'list[GnssSatelliteInfo]'):
# """Populates satellite information based on NMEA GSV data."""
# for satellite_info in satellites_info:
# if isinstance(satellite_info, GnssSatelliteInfo):
# new = True
# for i, info in enumerate(self.satellites_info):
# if info.prn == satellite_info.prn:
# new = False
# self.satellites_info[i] = satellite_info
# break
# if new:
# self.satellites_info.append(satellite_info)
def __repr__(self) -> str:
obj = deepcopy(self.__dict__)
for k, v in obj.items():
if k in ['latitude', 'longitude']:
obj[k] = round(v, 5)
elif isinstance(v, float):
obj[k] = round(v, 1)
return json.dumps(obj, skipkeys=True)
@dataclass
class SatelliteLocation:
"""Represents a geostationary satellite location relative to a modem."""
name: str = ''
latitude: float = 0.0
longitude: float = 180.0
altitude: float = GEOSTATIONARY_DISTANCE_M
azimuth: float = 0.0
elevation: float = 0.0
geobeam: 'GeoBeam|None' = None
@dataclass
class _PointLocation:
"""Location used for spherical coordinates (for azimuth/elevation)."""
x: float = 0.0
y: float = 0.0
z: float = 0.0
radius: float = 0.0
nx: float = 0.0
ny: float = 0.0
nz: float = 0.0
def validate_nmea(nmea_sentence: str) -> bool:
"""Validates a given NMEA-0183 sentence with CRC.
Args:
nmea_sentence (str): NMEA-0183 sentence ending in checksum.
"""
if '*' not in nmea_sentence:
return False
data, cs_hex = nmea_sentence.split('*')
candidate = int(cs_hex, 16)
crc = 0 # initial
for i in range(1, len(data)): # ignore initial $
crc ^= ord(data[i])
return candidate == crc
def parse_nmea_to_location(location: ModemLocation, nmea_sentence: str) -> None:
"""Parses a NMEA-0183 sentence to update a ModemLocation."""
if vlog(VLOG_TAG):
_log.debug('Parsing NMEA: %s', nmea_sentence)
if not validate_nmea(nmea_sentence):
raise ValueError('Invalid NMEA-0183 sentence')
data = nmea_sentence.split('*')[0]
nmea_type = ''
cache = {}
for i, field_data in enumerate(data.split(',')):
if i == 0:
nmea_type = field_data[-3:]
if nmea_type == 'GSV':
_log.warn('No processing required for GSV sentence')
return
if nmea_type == 'GSA' and location.vdop != 99:
if vlog(TRACE_TAG):
_log.debug('Skipping redundant GSA data')
return
if vlog(TRACE_TAG):
_log.debug('Processing NMEA type: %s', nmea_type)
elif i == 1:
if nmea_type == 'RMC':
cache['fix_hour'] = field_data[0:2]
cache['fix_min'] = field_data[2:4]
cache['fix_sec'] = field_data[4:6]
if vlog(TRACE_TAG):
_log.debug('Fix time %s:%s:%s', cache['fix_hour'],
cache['fix_min'], cache['fix_sec'])
elif i == 2:
if nmea_type == 'RMC':
if (field_data == 'V'):
_log.warn('Fix Void')
elif nmea_type == 'GSA':
location.fix_type = GnssFixType(int(field_data))
if vlog(TRACE_TAG):
_log.debug('Fix type: %s', location.fix_type.name)
elif i == 3:
if nmea_type == 'RMC':
location.latitude = (float(field_data[0:2]) +
float(field_data[2]) / 60.0)
elif i == 4:
if nmea_type == 'RMC':
if field_data == 'S':
location.latitude *= -1
if vlog(TRACE_TAG):
_log.debug('Latitude: %.5f', location.latitude)
elif i == 5:
if nmea_type == 'RMC':
location.longitude = (float(field_data[0:3]) +
float(field_data[3]) / 60.0)
elif i == 6:
if nmea_type == 'RMC':
if field_data == 'W':
location.longitude *= -1
if vlog(TRACE_TAG):
_log.debug('Longitude: %.5f', location.longitude)
elif nmea_type == 'GGA':
location.fix_quality = GnssFixQuality(int(field_data))
if vlog(TRACE_TAG):
_log.debug('Fix quality: %s', location.fix_quality.name)
elif i == 7:
if nmea_type == 'RMC':
location.speed = float(field_data)
if vlog(TRACE_TAG):
_log.debug('Speed: %.1f', location.speed)
elif nmea_type == 'GGA':
location.satellites = int(field_data)
if vlog(TRACE_TAG):
_log.debug('GNSS satellites used: %d', location.satellites)
elif i == 8:
if nmea_type == 'RMC':
location.heading = float(field_data)
if vlog(TRACE_TAG):
_log.debug('Heading: %.1f', location.heading)
elif nmea_type == 'GGA':
location.hdop = round(float(field_data), 1)
if vlog(TRACE_TAG):
_log.debug('HDOP: %.1f', location.heading)
elif i == 9:
if nmea_type == 'RMC':
fix_day = field_data[0:2]
fix_month = field_data[2:4]
fix_yy = int(field_data[4:])
fix_yy += 1900 if fix_yy >= 73 else 2000
if vlog(TRACE_TAG):
_log.debug('Fix date %d-%s-%s', fix_yy, fix_month, fix_day)
iso_time = (f'{fix_yy}-{fix_month}-{fix_day}T'
f'{cache["fix_hour"]}:{cache["fix_min"]}'
f':{cache["fix_sec"]}Z')
unix_timestamp = iso_to_ts(iso_time)
if vlog(TRACE_TAG):
_log.debug('Fix time ISO 8601: %s | Unix: %d',
iso_time, unix_timestamp)
location.timestamp = unix_timestamp
elif nmea_type == 'GGA':
location.altitude = float(field_data)
if vlog(TRACE_TAG):
_log.debug('Altitude: %.1f', location.altitude)
elif i == 10:
# RMC magnetic variation - ignore
if nmea_type == 'GGA' and field_data != 'M':
_log.warning('Unexpected altitude units: %s', field_data)
# elif i == 11: # RMC magnetic variation direction, GGA height of geoid - ignore
# elif i == 12: # GGA units height of geoid - ignore
# elif i == 13: # GGA seconds since last DGPS update - ignore
# elif i == 14: # GGA DGPS station ID - ignore
elif i == 15: # GSA PDOP - ignore (unused)
if nmea_type == 'GSA':
location.pdop = round(float(field_data), 1)
if vlog(TRACE_TAG):
_log.debug('PDOP: %d', location.pdop)
# elif i == 16: # GSA HDOP - ignore (use GGA)
elif i == 17:
if nmea_type == 'GSA':
location.vdop = round(float(field_data), 1)
if vlog(TRACE_TAG):
_log.debug('VDOP: %d', location.vdop)
def get_location_from_nmea_data(nmea_data: 'str|list[str]') -> ModemLocation:
"""Derives a ModemLocation from a set of NMEA-0183 sentences.
Args:
nmea_data (str): A set of NMEA-0183 sentences separated by `\n` or
a `list` of NMEA sentences.
Returns:
`Location` object.
"""
location = ModemLocation()
if isinstance(nmea_data, list):
if not all(isinstance(x, str) for x in nmea_data):
raise ValueError('Invalid NMEA sentence list')
elif isinstance(nmea_data, str):
nmea_data = nmea_data.split('\n')
for nmea_sentence in nmea_data:
parse_nmea_to_location(location, nmea_sentence)
if vlog(VLOG_TAG):
_log.debug('Location: %s', repr(location))
return location
def get_closest_satellite(latitude: float, longitude: float) -> GeoSatellite:
"""Get the closest geostationary satellite to a given location."""
satellites = list(map(lambda x: x.value,GeoSatellite._member_map_.values()))
closest = GeoSatellite(min(satellites, key=lambda x:abs(x-longitude)))
if closest == GeoSatellite.AORWSC: #: single regional beam only
if latitude >= 15.0 or latitude <= -45.0:
if longitude >= -27.0:
return GeoSatellite.EMEA
return GeoSatellite.AMER
# elif closest == GeoSatellite.MEAS: #: not all beams lit
# if latitude <= -4.5:
# if longitude >= 63.5:
# return GeoSatellite.APAC
# return GeoSatellite.EMEA
# elif latitude >= 40.9:
# if longitude <= 45.0:
# return GeoSatellite.EMEA
# if longitude >= 82.5:
# return GeoSatellite.APAC
# elif longitude >= 63.5:
# if latitude <= -4.0 or latitude >= 30.0:
# return GeoSatellite.APAC
return closest
def get_satellite_location(modem_location: ModemLocation,
geobeam: 'GeoBeam|None' = None,
) -> SatelliteLocation:
"""Derives the azimuth and elevation of the nearest satellite.
If not provided the current GeoBeam, derives the closest satellite from
the location provided.
Args:
modem_location (ModemLocation): The modem's Location object.
geobeam (GeoBeam): The GeoBeam, if known. If `None` it will be
assumed from fixed locations of satellites.
Returns:
`SatelliteLocation` including azimuth, elevation from the modem.
Raises:
`ValueError` if modem_location is not valid
"""
if not isinstance(modem_location, ModemLocation):
raise ValueError('Invalid modem location')
# internal helper functions
def location_to_point(latitude: float,
longitude: float,
altitude: float,
) -> _PointLocation:
"""Converts lat/lon/alt to point location."""
lat_rad = latitude * math.pi / 180.0
lon_rad = longitude * math.pi / 180.0
radius = get_geographic_radius(lat_rad)
clat = get_geocentric_latitude(lat_rad)
cos_lon = math.cos(lon_rad)
sin_lon = math.sin(lon_rad)
cos_lat = math.cos(clat)
sin_lat = math.sin(clat)
x = radius * cos_lon * cos_lat
y = radius * sin_lon * cos_lat
z = radius * sin_lat
cos_glat = math.cos(lat_rad)
sin_glat = math.sin(lat_rad)
nx = cos_glat * cos_lon
ny = cos_glat * sin_lon
nz = sin_glat
x += altitude * nx
y += altitude * ny
z += altitude * nz
return _PointLocation(x, y, z, radius, nx, ny, nz)
def get_geographic_radius(lat_rad: float) -> float:
"""Adjust radius for earth shape."""
equatorial = 6378137.0
polar = 6356752.3
cos = math.cos(lat_rad)
sin = math.sin(lat_rad)
t1 = equatorial**2 * cos
t2 = polar**2 * sin
t3 = equatorial * cos
t4 = polar * sin
return math.sqrt((t1 * t1 + t2 * t2) / (t3 * t3 + t4 * t4))
def get_geocentric_latitude(lat_rad: float) -> float:
"""Derives the geocentric latitude."""
e2 = 0.00669437999014 # first eccentricity squared, constant
clat = math.atan((1.0 - e2) * math.tan(lat_rad))
return clat
def rotate_globe(b: SatelliteLocation, a: ModemLocation, b_radius: float):
"""Rotate the globe for vector alignment."""
brp = location_to_point(b.latitude, b.longitude - a.longitude, b.altitude)
alat = get_geocentric_latitude(-a.latitude * math.pi / 180.0)
acos = math.cos(alat)
asin = math.sin(alat)
bx = (brp.x * acos) - (brp.z * asin)
by = brp.y
bz = (brp.x * asin) + (brp.z * acos)
return _PointLocation(bx, by, bz, b_radius, 0.0, 0.0, 0.0)
def normalize_vector(b: _PointLocation, a: _PointLocation):
"""Normalize the difference between vectors."""
dx = b.x - a.x
dy = b.y - a.y
dz = b.z - a.z
dist2 = dx**2 + dy**2 +dz**2
if dist2 == 0:
return None
dist = math.sqrt(dist2)
return _PointLocation(dx/dist, dy/dist, dz/dist, 1.0, 0.0, 0.0, 0.0)
azimuth = None
elevation = None
if isinstance(geobeam, GeoBeam) and geobeam > 0:
satellite_name = geobeam.name.split('_')[0]
satellite = GeoSatellite[satellite_name]
else:
satellite = get_closest_satellite(modem_location.latitude,
modem_location.longitude)
# modem and satellite Cartesian location
mc = modem_location
sc = SatelliteLocation(name=satellite.name,
longitude=satellite.value,
altitude=GEOSTATIONARY_DISTANCE_M,
geobeam=geobeam)
# modem and satellite Point location
mp = location_to_point(mc.latitude, mc.longitude, mc.altitude)
sp = location_to_point(sc.latitude, sc.longitude, sc.altitude)
ref_point = rotate_globe(sc, mc, sp.radius)
if ref_point.z**2 + ref_point.y**2 > 1.0e-6:
theta = math.atan2(ref_point.z, ref_point.y) * 180.0 / math.pi
azimuth = 90.0 - theta
if azimuth < 0.0:
azimuth += 360.0
if azimuth > 360.0:
azimuth -= 360.0
bma = normalize_vector(sp, mp) # Bayesian model averaging
if isinstance(bma, _PointLocation):
elevation = (90.0 - (180.0 / math.pi) * math.acos(
bma.x * mp.nx + bma.y * mp.ny + bma.z * mp.nz))
sc.azimuth = round(azimuth, 1)
sc.elevation = round(elevation, 1)
return sc
# Below is parked for potential future use
# def _parse_gsv_to_location(location: Location, gsv_sentence: str) -> None:
# """Returns a Location object based on an NMEA sentences data set.
#
# Placeholder - overcomplicates Location object
#
# Args:
# location: The Location object to update
# gsv_sentence: The Satellites in View sentence to parse
#
# """
# update_satellites = getattr(location, '_update_satellites_info', None)
# if not callable(update_satellites):
# raise ValueError('Location object does not support GSV parsing')
# gsv = gsv_sentence.split(',') # $GPGSV,2,1,08,01,40,083,46,02,17,308,41,12,07,344,39,14,22,228,45*75
# '''
# gsv_sentences = gsv[1] # Number of sentences for full data
# gsv_sentence = gsv[2] # Sentence number (up to 4 satellites per sentence)
# '''
# gsv_satellites = gsv[3] # Number of satellites in view
# # following supports up to 4 satellites per sentence
# satellites_info = []
# if (len(gsv) - 4) % 4 > 0:
# # TODO: warn/log this case of extra GSV data in sentence
# pass
# num_satellites_in_sentence = int((len(gsv)-4)/4)
# for i in range(1, num_satellites_in_sentence+1):
# prn = int(gsv[i*4]) if gsv[i*4] != '' else 0 # satellite PRN number
# elevation = int(gsv[i*4+1]) if gsv[i*4+1] != '' else 0 # Elevation in degrees
# azimuth = int(gsv[i*4+2]) if gsv[i*4+2] != '' else 0 # Azimuth in degrees
# snr = int(gsv[i*4+3]) if gsv[i*4+3] != '' else 0 # Signal to Noise Ratio
# satellites_info.append(GnssSatelliteInfo(prn,
# elevation,
# azimuth,
# snr))
# location._update_satellites_info(satellites_info)
# satellites = int(gsv_satellites) if gsv_satellites != '' else 0
# if location.satellites < satellites:
# location.satellites = satellites
# else:
# # TODO: log this case; should be limited to GPS simulation in Modem Simulator (3 satellites)
# pass
Functions
def get_closest_satellite(latitude: float, longitude: float) ‑> GeoSatellite
-
Get the closest geostationary satellite to a given location.
Expand source code
def get_closest_satellite(latitude: float, longitude: float) -> GeoSatellite: """Get the closest geostationary satellite to a given location.""" satellites = list(map(lambda x: x.value,GeoSatellite._member_map_.values())) closest = GeoSatellite(min(satellites, key=lambda x:abs(x-longitude))) if closest == GeoSatellite.AORWSC: #: single regional beam only if latitude >= 15.0 or latitude <= -45.0: if longitude >= -27.0: return GeoSatellite.EMEA return GeoSatellite.AMER # elif closest == GeoSatellite.MEAS: #: not all beams lit # if latitude <= -4.5: # if longitude >= 63.5: # return GeoSatellite.APAC # return GeoSatellite.EMEA # elif latitude >= 40.9: # if longitude <= 45.0: # return GeoSatellite.EMEA # if longitude >= 82.5: # return GeoSatellite.APAC # elif longitude >= 63.5: # if latitude <= -4.0 or latitude >= 30.0: # return GeoSatellite.APAC return closest
def get_location_from_nmea_data(nmea_data: str|list[str]) ‑> ModemLocation
-
Derives a ModemLocation from a set of NMEA-0183 sentences.
Args: nmea_data (str): A set of NMEA-0183 sentences separated by `
` or a
list
of NMEA sentences.Returns: <code>Location</code> object.
Expand source code
def get_location_from_nmea_data(nmea_data: 'str|list[str]') -> ModemLocation: """Derives a ModemLocation from a set of NMEA-0183 sentences. Args: nmea_data (str): A set of NMEA-0183 sentences separated by `\n` or a `list` of NMEA sentences. Returns: `Location` object. """ location = ModemLocation() if isinstance(nmea_data, list): if not all(isinstance(x, str) for x in nmea_data): raise ValueError('Invalid NMEA sentence list') elif isinstance(nmea_data, str): nmea_data = nmea_data.split('\n') for nmea_sentence in nmea_data: parse_nmea_to_location(location, nmea_sentence) if vlog(VLOG_TAG): _log.debug('Location: %s', repr(location)) return location
def get_satellite_location(modem_location: ModemLocation, geobeam: GeoBeam|None = None) ‑> SatelliteLocation
-
Derives the azimuth and elevation of the nearest satellite.
If not provided the current GeoBeam, derives the closest satellite from the location provided.
Args
modem_location
:ModemLocation
- The modem's Location object.
geobeam
:GeoBeam
- The GeoBeam, if known. If
None
it will be assumed from fixed locations of satellites.
Returns
SatelliteLocation
including azimuth, elevation from the modem.Raises
ValueError
if modem_location is not validExpand source code
def get_satellite_location(modem_location: ModemLocation, geobeam: 'GeoBeam|None' = None, ) -> SatelliteLocation: """Derives the azimuth and elevation of the nearest satellite. If not provided the current GeoBeam, derives the closest satellite from the location provided. Args: modem_location (ModemLocation): The modem's Location object. geobeam (GeoBeam): The GeoBeam, if known. If `None` it will be assumed from fixed locations of satellites. Returns: `SatelliteLocation` including azimuth, elevation from the modem. Raises: `ValueError` if modem_location is not valid """ if not isinstance(modem_location, ModemLocation): raise ValueError('Invalid modem location') # internal helper functions def location_to_point(latitude: float, longitude: float, altitude: float, ) -> _PointLocation: """Converts lat/lon/alt to point location.""" lat_rad = latitude * math.pi / 180.0 lon_rad = longitude * math.pi / 180.0 radius = get_geographic_radius(lat_rad) clat = get_geocentric_latitude(lat_rad) cos_lon = math.cos(lon_rad) sin_lon = math.sin(lon_rad) cos_lat = math.cos(clat) sin_lat = math.sin(clat) x = radius * cos_lon * cos_lat y = radius * sin_lon * cos_lat z = radius * sin_lat cos_glat = math.cos(lat_rad) sin_glat = math.sin(lat_rad) nx = cos_glat * cos_lon ny = cos_glat * sin_lon nz = sin_glat x += altitude * nx y += altitude * ny z += altitude * nz return _PointLocation(x, y, z, radius, nx, ny, nz) def get_geographic_radius(lat_rad: float) -> float: """Adjust radius for earth shape.""" equatorial = 6378137.0 polar = 6356752.3 cos = math.cos(lat_rad) sin = math.sin(lat_rad) t1 = equatorial**2 * cos t2 = polar**2 * sin t3 = equatorial * cos t4 = polar * sin return math.sqrt((t1 * t1 + t2 * t2) / (t3 * t3 + t4 * t4)) def get_geocentric_latitude(lat_rad: float) -> float: """Derives the geocentric latitude.""" e2 = 0.00669437999014 # first eccentricity squared, constant clat = math.atan((1.0 - e2) * math.tan(lat_rad)) return clat def rotate_globe(b: SatelliteLocation, a: ModemLocation, b_radius: float): """Rotate the globe for vector alignment.""" brp = location_to_point(b.latitude, b.longitude - a.longitude, b.altitude) alat = get_geocentric_latitude(-a.latitude * math.pi / 180.0) acos = math.cos(alat) asin = math.sin(alat) bx = (brp.x * acos) - (brp.z * asin) by = brp.y bz = (brp.x * asin) + (brp.z * acos) return _PointLocation(bx, by, bz, b_radius, 0.0, 0.0, 0.0) def normalize_vector(b: _PointLocation, a: _PointLocation): """Normalize the difference between vectors.""" dx = b.x - a.x dy = b.y - a.y dz = b.z - a.z dist2 = dx**2 + dy**2 +dz**2 if dist2 == 0: return None dist = math.sqrt(dist2) return _PointLocation(dx/dist, dy/dist, dz/dist, 1.0, 0.0, 0.0, 0.0) azimuth = None elevation = None if isinstance(geobeam, GeoBeam) and geobeam > 0: satellite_name = geobeam.name.split('_')[0] satellite = GeoSatellite[satellite_name] else: satellite = get_closest_satellite(modem_location.latitude, modem_location.longitude) # modem and satellite Cartesian location mc = modem_location sc = SatelliteLocation(name=satellite.name, longitude=satellite.value, altitude=GEOSTATIONARY_DISTANCE_M, geobeam=geobeam) # modem and satellite Point location mp = location_to_point(mc.latitude, mc.longitude, mc.altitude) sp = location_to_point(sc.latitude, sc.longitude, sc.altitude) ref_point = rotate_globe(sc, mc, sp.radius) if ref_point.z**2 + ref_point.y**2 > 1.0e-6: theta = math.atan2(ref_point.z, ref_point.y) * 180.0 / math.pi azimuth = 90.0 - theta if azimuth < 0.0: azimuth += 360.0 if azimuth > 360.0: azimuth -= 360.0 bma = normalize_vector(sp, mp) # Bayesian model averaging if isinstance(bma, _PointLocation): elevation = (90.0 - (180.0 / math.pi) * math.acos( bma.x * mp.nx + bma.y * mp.ny + bma.z * mp.nz)) sc.azimuth = round(azimuth, 1) sc.elevation = round(elevation, 1) return sc
def parse_nmea_to_location(location: ModemLocation, nmea_sentence: str) ‑> None
-
Parses a NMEA-0183 sentence to update a ModemLocation.
Expand source code
def parse_nmea_to_location(location: ModemLocation, nmea_sentence: str) -> None: """Parses a NMEA-0183 sentence to update a ModemLocation.""" if vlog(VLOG_TAG): _log.debug('Parsing NMEA: %s', nmea_sentence) if not validate_nmea(nmea_sentence): raise ValueError('Invalid NMEA-0183 sentence') data = nmea_sentence.split('*')[0] nmea_type = '' cache = {} for i, field_data in enumerate(data.split(',')): if i == 0: nmea_type = field_data[-3:] if nmea_type == 'GSV': _log.warn('No processing required for GSV sentence') return if nmea_type == 'GSA' and location.vdop != 99: if vlog(TRACE_TAG): _log.debug('Skipping redundant GSA data') return if vlog(TRACE_TAG): _log.debug('Processing NMEA type: %s', nmea_type) elif i == 1: if nmea_type == 'RMC': cache['fix_hour'] = field_data[0:2] cache['fix_min'] = field_data[2:4] cache['fix_sec'] = field_data[4:6] if vlog(TRACE_TAG): _log.debug('Fix time %s:%s:%s', cache['fix_hour'], cache['fix_min'], cache['fix_sec']) elif i == 2: if nmea_type == 'RMC': if (field_data == 'V'): _log.warn('Fix Void') elif nmea_type == 'GSA': location.fix_type = GnssFixType(int(field_data)) if vlog(TRACE_TAG): _log.debug('Fix type: %s', location.fix_type.name) elif i == 3: if nmea_type == 'RMC': location.latitude = (float(field_data[0:2]) + float(field_data[2]) / 60.0) elif i == 4: if nmea_type == 'RMC': if field_data == 'S': location.latitude *= -1 if vlog(TRACE_TAG): _log.debug('Latitude: %.5f', location.latitude) elif i == 5: if nmea_type == 'RMC': location.longitude = (float(field_data[0:3]) + float(field_data[3]) / 60.0) elif i == 6: if nmea_type == 'RMC': if field_data == 'W': location.longitude *= -1 if vlog(TRACE_TAG): _log.debug('Longitude: %.5f', location.longitude) elif nmea_type == 'GGA': location.fix_quality = GnssFixQuality(int(field_data)) if vlog(TRACE_TAG): _log.debug('Fix quality: %s', location.fix_quality.name) elif i == 7: if nmea_type == 'RMC': location.speed = float(field_data) if vlog(TRACE_TAG): _log.debug('Speed: %.1f', location.speed) elif nmea_type == 'GGA': location.satellites = int(field_data) if vlog(TRACE_TAG): _log.debug('GNSS satellites used: %d', location.satellites) elif i == 8: if nmea_type == 'RMC': location.heading = float(field_data) if vlog(TRACE_TAG): _log.debug('Heading: %.1f', location.heading) elif nmea_type == 'GGA': location.hdop = round(float(field_data), 1) if vlog(TRACE_TAG): _log.debug('HDOP: %.1f', location.heading) elif i == 9: if nmea_type == 'RMC': fix_day = field_data[0:2] fix_month = field_data[2:4] fix_yy = int(field_data[4:]) fix_yy += 1900 if fix_yy >= 73 else 2000 if vlog(TRACE_TAG): _log.debug('Fix date %d-%s-%s', fix_yy, fix_month, fix_day) iso_time = (f'{fix_yy}-{fix_month}-{fix_day}T' f'{cache["fix_hour"]}:{cache["fix_min"]}' f':{cache["fix_sec"]}Z') unix_timestamp = iso_to_ts(iso_time) if vlog(TRACE_TAG): _log.debug('Fix time ISO 8601: %s | Unix: %d', iso_time, unix_timestamp) location.timestamp = unix_timestamp elif nmea_type == 'GGA': location.altitude = float(field_data) if vlog(TRACE_TAG): _log.debug('Altitude: %.1f', location.altitude) elif i == 10: # RMC magnetic variation - ignore if nmea_type == 'GGA' and field_data != 'M': _log.warning('Unexpected altitude units: %s', field_data) # elif i == 11: # RMC magnetic variation direction, GGA height of geoid - ignore # elif i == 12: # GGA units height of geoid - ignore # elif i == 13: # GGA seconds since last DGPS update - ignore # elif i == 14: # GGA DGPS station ID - ignore elif i == 15: # GSA PDOP - ignore (unused) if nmea_type == 'GSA': location.pdop = round(float(field_data), 1) if vlog(TRACE_TAG): _log.debug('PDOP: %d', location.pdop) # elif i == 16: # GSA HDOP - ignore (use GGA) elif i == 17: if nmea_type == 'GSA': location.vdop = round(float(field_data), 1) if vlog(TRACE_TAG): _log.debug('VDOP: %d', location.vdop)
def validate_nmea(nmea_sentence: str) ‑> bool
-
Validates a given NMEA-0183 sentence with CRC.
Args
nmea_sentence
:str
- NMEA-0183 sentence ending in checksum.
Expand source code
def validate_nmea(nmea_sentence: str) -> bool: """Validates a given NMEA-0183 sentence with CRC. Args: nmea_sentence (str): NMEA-0183 sentence ending in checksum. """ if '*' not in nmea_sentence: return False data, cs_hex = nmea_sentence.split('*') candidate = int(cs_hex, 16) crc = 0 # initial for i in range(1, len(data)): # ignore initial $ crc ^= ord(data[i]) return candidate == crc
Classes
class GnssFixQuality (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumerated fix quality from NMEA-0183 standard.
Expand source code
class GnssFixQuality(NimoIntEnum): """Enumerated fix quality from NMEA-0183 standard.""" INVALID = 0 GPS_SPS = 1 DGPS = 2 PPS = 3 RTK = 4 FLOAT_RTK = 5 EST_DEAD_RECKONING = 6 MANUAL = 7 SIMULATION = 8
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var DGPS
var EST_DEAD_RECKONING
var FLOAT_RTK
var GPS_SPS
var INVALID
var MANUAL
var PPS
var RTK
var SIMULATION
Inherited members
class GnssFixType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Enumerated fix type from NMEA-0183 standard.
Expand source code
class GnssFixType(NimoIntEnum): """Enumerated fix type from NMEA-0183 standard.""" NONE = 1 D2 = 2 D3 = 3
Ancestors
- NimoIntEnum
- enum.IntEnum
- builtins.int
- enum.Enum
Class variables
var D2
var D3
var NONE
Inherited members
class GnssSatelliteInfo (prn: int, elevation: int, azimuth: int, snr: int)
-
Information specific to a GNSS satellite.
Attributes
prn
- The PRN code (Pseudo-Random Number sequence)
elevation
- The satellite elevation
azimuth
- The satellite azimuth
snr
- The satellite Signal-to-Noise Ratio
Expand source code
@dataclass class GnssSatelliteInfo(object): """Information specific to a GNSS satellite. Attributes: prn: The PRN code (Pseudo-Random Number sequence) elevation: The satellite elevation azimuth: The satellite azimuth snr: The satellite Signal-to-Noise Ratio """ prn: int elevation: int azimuth: int snr: int
Class variables
var azimuth : int
var elevation : int
var prn : int
var snr : int
class ModemLocation (**kwargs)
-
A set of location-based information derived from the modem's NMEA data.
Uses 90.0/180.0 if latitude/longitude are unknown
Attributes
latitude
:float
- decimal degrees
longitude
:float
- decimal degrees
altitude
:float
- in metres
speed
:float
- in knots
heading
:float
- in degrees
timestamp
:int
- in seconds since 1970-01-01T00:00:00Z
satellites
:int
- in view at time of fix
fix_type
:GnssFixType
- 1=None, 2=2D or 3=3D
fix_quality
:GnssFixQuality
- Enumerated lookup value
pdop
:float
- Probability Dilution of Precision
hdop
:float
- Horizontal Dilution of Precision
vdop
:float
- Vertical Dilution of Precision
time_iso
:str
- ISO 8601 formatted timestamp
Initializes a Location with default latitude/longitude 90/180.
Expand source code
class ModemLocation: """A set of location-based information derived from the modem's NMEA data. Uses 90.0/180.0 if latitude/longitude are unknown Attributes: latitude (float): decimal degrees longitude (float): decimal degrees altitude (float): in metres speed (float): in knots heading (float): in degrees timestamp (int): in seconds since 1970-01-01T00:00:00Z satellites (int): in view at time of fix fix_type (GnssFixType): 1=None, 2=2D or 3=3D fix_quality (GnssFixQuality): Enumerated lookup value pdop (float): Probability Dilution of Precision hdop (float): Horizontal Dilution of Precision vdop (float): Vertical Dilution of Precision time_iso (str): ISO 8601 formatted timestamp """ def __init__(self, **kwargs): """Initializes a Location with default latitude/longitude 90/180.""" self.latitude = float(kwargs.get('latitude', 90.0)) self.longitude = float(kwargs.get('longitude', 180.0)) self.altitude = float(kwargs.get('altitude', 0.0)) # metres self.speed = float(kwargs.get('speed', 0.0)) # knots self.heading = float(kwargs.get('heading', 0.0)) # degrees self.timestamp = int(kwargs.get('timestamp', 0)) # seconds (unix) self.satellites = int(kwargs.get('satellites', 0)) self.fix_type = GnssFixType(int(kwargs.get('fix_type', 1))) self.fix_quality = GnssFixQuality(int(kwargs.get('fix_quality', 0))) self.pdop = float(kwargs.get('pdop', 99)) self.hdop = float(kwargs.get('hdop', 99)) self.vdop = float(kwargs.get('vdop', 99)) # self.satellites_info: 'list[GnssSatelliteInfo]' = kwargs.get( # 'satellites_info', [] # ) @property def time_iso(self) -> str: return f'{ts_to_iso(self.timestamp)}' # def _update_satellites_info(self, # satellites_info: 'list[GnssSatelliteInfo]'): # """Populates satellite information based on NMEA GSV data.""" # for satellite_info in satellites_info: # if isinstance(satellite_info, GnssSatelliteInfo): # new = True # for i, info in enumerate(self.satellites_info): # if info.prn == satellite_info.prn: # new = False # self.satellites_info[i] = satellite_info # break # if new: # self.satellites_info.append(satellite_info) def __repr__(self) -> str: obj = deepcopy(self.__dict__) for k, v in obj.items(): if k in ['latitude', 'longitude']: obj[k] = round(v, 5) elif isinstance(v, float): obj[k] = round(v, 1) return json.dumps(obj, skipkeys=True)
Instance variables
var time_iso : str
-
Expand source code
@property def time_iso(self) -> str: return f'{ts_to_iso(self.timestamp)}'
class SatelliteLocation (name: str = '', latitude: float = 0.0, longitude: float = 180.0, altitude: float = 35786000, azimuth: float = 0.0, elevation: float = 0.0, geobeam: GeoBeam|None = None)
-
Represents a geostationary satellite location relative to a modem.
Expand source code
@dataclass class SatelliteLocation: """Represents a geostationary satellite location relative to a modem.""" name: str = '' latitude: float = 0.0 longitude: float = 180.0 altitude: float = GEOSTATIONARY_DISTANCE_M azimuth: float = 0.0 elevation: float = 0.0 geobeam: 'GeoBeam|None' = None
Class variables
var altitude : float
var azimuth : float
var elevation : float
var geobeam : GeoBeam|None
var latitude : float
var longitude : float
var name : str