"""pypozyx.pozyx_serial - contains the serial interface with Pozyx through PozyxSerial."""
from time import sleep
from pypozyx.core import PozyxConnectionError
from pypozyx.definitions.constants import (POZYX_SUCCESS, POZYX_FAILURE,
MAX_SERIAL_SIZE)
from pypozyx.lib import PozyxLib
from pypozyx.structures.generic import SingleRegister
from serial import Serial, VERSION as PYSERIAL_VERSION, SerialException
from serial.tools.list_ports import comports
from warnings import warn
# \addtogroup auxiliary_serial
# @{
[docs]def list_serial_ports():
"""Prints the open serial ports line per line"""
warn("list_serial_ports now deprecated, use print_all_serial_ports instead", DeprecationWarning)
ports = comports()
for port in ports:
print(port)
[docs]def print_all_serial_ports():
"""Prints the open serial ports line per line"""
ports = comports()
for port in ports:
print(port)
[docs]def get_serial_ports():
"""Returns the open serial ports"""
return comports()
[docs]def is_pozyx_port(port):
"""Returns whether the port is a Pozyx device"""
try:
if "Pozyx Labs" in port.manufacturer:
return True
except TypeError:
pass
try:
if "Pozyx" in port.product:
return True
except TypeError:
pass
try:
if "0483:" in port.hwid:
return True
except TypeError:
pass
return False
[docs]def get_port_object(device):
"""Returns the PySerial port object from a given port path"""
for port in get_serial_ports():
if port.device == device:
return port
[docs]def is_pozyx(device):
"""Returns whether the device is a recognized Pozyx device"""
port = get_port_object(device)
if port is not None and is_pozyx_port(port):
return True
return False
[docs]def get_pozyx_ports():
"""Returns the Pozyx serial ports. Windows only. Needs driver installed"""
pozyx_ports = []
for port in get_serial_ports():
if is_pozyx_port(port):
pozyx_ports.append(port.device)
return pozyx_ports
[docs]def get_first_pozyx_serial_port():
"""Returns the first encountered Pozyx serial port's identifier"""
for port in get_serial_ports():
if is_pozyx_port(port):
return port.device
[docs]def get_pozyx_ports_windows():
"""Returns the Pozyx serial ports. Windows only. Needs driver installed"""
ports = get_serial_ports()
pozyx_ports = []
for port in ports:
if "STMicroelectronics Virtual COM Port" in port.description:
pozyx_ports.append(port.device)
[docs]def is_correct_pyserial_version():
"""Returns whether the pyserial version is supported"""
version_tags = [int(version_tag)
for version_tag in PYSERIAL_VERSION.split('.')]
if version_tags[0] >= 3:
if version_tags[0] == 3 and version_tags[1] <= 3:
warn("PySerial out of date, please update to v3.4 if possible", stacklevel=0)
return True
return False
# @}
[docs]class PozyxSerial(PozyxLib):
"""This class provides the Pozyx Serial interface, and opens and locks the serial
port to use with Pozyx. All functionality from PozyxLib and PozyxCore is included.
Args:
port (str): Name of the serial port. On UNIX this will be '/dev/ttyACMX', on
Windows this will be 'COMX', with X a random number.
baudrate (optional): the baudrate of the serial port. Default value is 115200.
timeout (optional): timeout for the serial port communication in seconds. Default is 0.1s or 100ms.
print_output (optional): boolean for printing the serial exchanges, mainly for debugging purposes
suppress_warnings (optional): boolean for suppressing warnings in the Pozyx use, usage not recommended
debug_trace (optional): boolean for printing the trace on bad serial init (DEPRECATED)
show_trace (optional): boolean for printing the trace on bad serial init (DEPRECATED)
Example usage:
>>> pozyx = PozyxSerial('COMX') # Windows
>>> pozyx = PozyxSerial('/dev/ttyACMX', print_output=True) # Linux and OSX. Also puts debug output on.
Finding the serial port can be easily done with the following code:
>>> import serial.tools.list_ports
>>> print serial.tools.list_ports.comports()[0]
Putting one and two together, automating the correct port selection with one Pozyx attached:
>>> import serial.tools.list_ports
>>> pozyx = PozyxSerial(serial.tools.list_ports.comports()[0])
"""
# \addtogroup core
# @{
def __init__(self, port, baudrate=115200, timeout=0.1, write_timeout=0.1,
print_output=False, debug_trace=False, show_trace=False,
suppress_warnings=False):
"""Initializes the PozyxSerial object. See above for details."""
super(PozyxSerial, self).__init__()
self.print_output = print_output
if debug_trace is True or show_trace is True:
if not suppress_warnings:
warn("debug_trace or show_trace are on their way out, exceptions of the type PozyxException are now raised.",
DeprecationWarning)
self.suppress_warnings = suppress_warnings
self.connectToPozyx(port, baudrate, timeout, write_timeout)
sleep(0.25)
self.validatePozyx()
[docs] def connectToPozyx(self, port, baudrate, timeout, write_timeout):
"""Attempts to connect to the Pozyx via a serial connection"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.write_timeout = write_timeout
try:
if is_correct_pyserial_version():
if not is_pozyx(port) and not self.suppress_warnings:
warn("The passed device is not a recognized Pozyx device, is %s" % get_port_object(port).description, stacklevel=2)
self.ser = Serial(port=port, baudrate=baudrate, timeout=timeout,
write_timeout=write_timeout)
else:
if not self.suppress_warnings:
warn("PySerial version %s not supported, please upgrade to 3.0 or (prefferably) higher" %
PYSERIAL_VERSION, stacklevel=0)
self.ser = Serial(port=port, baudrate=baudrate, timeout=timeout,
writeTimeout=write_timeout)
except SerialException as exc:
raise PozyxConnectionError("Wrong or busy serial port, SerialException: {}".format(str(exc)))
except Exception as exc:
raise PozyxConnectionError("Couldn't connect to Pozyx, {}: {}".format(exc.__class__.__name__, str(exc)))
[docs] def validatePozyx(self):
"""Validates whether the connected device is indeed a Pozyx device"""
whoami = SingleRegister()
if self.getWhoAmI(whoami) != POZYX_SUCCESS:
raise PozyxConnectionError("Connected to device, but couldn't read serial data. Is it a Pozyx?")
if whoami.value != 0x43:
raise PozyxConnectionError("POZYX_WHO_AM_I returned 0x%0.2x, something is wrong with Pozyx." % whoami.value)
# @}
[docs] def regWrite(self, address, data):
"""
Writes data to the Pozyx registers, starting at a register address,
if registers are writable.
Args:
address: Register address to start writing at.
data: Data to write to the Pozyx registers.
Has to be ByteStructure-derived object.
Returns:
POZYX_SUCCESS, POZYX_FAILURE
"""
data.load_hex_string()
index = 0
runs = int(data.byte_size / MAX_SERIAL_SIZE)
for i in range(runs):
s = 'W,%0.2x,%s\r' % (
address + index, data.byte_data[2 * index: 2 * (index + MAX_SERIAL_SIZE)])
index += MAX_SERIAL_SIZE
try:
self.ser.write(s.encode())
except SerialException:
return POZYX_FAILURE
# delay(POZYX_DELAY_LOCAL_WRITE)
s = 'W,%0.2x,%s\r' % (address + index, data.byte_data[2 * index:])
try:
self.ser.write(s.encode())
except SerialException:
return POZYX_FAILURE
return POZYX_SUCCESS
[docs] def serialExchange(self, s):
"""
Auxiliary. Performs a serial write to and read from the Pozyx.
Args:
s: Serial message to send to the Pozyx
Returns:
Serial message the Pozyx returns, stripped from 'D,' at its start
and NL+CR at the end.
"""
self.ser.write(s.encode())
response = self.ser.readline().decode()
if self.print_output:
print('The response to %s is %s.' % (s.strip(), response.strip()))
if len(response) == 0:
raise SerialException
if response[0] == 'D':
return response[2:-2]
raise SerialException
[docs] def regRead(self, address, data):
"""
Reads data from the Pozyx registers, starting at a register address,
if registers are readable.
Args:
address: Register address to start writing at.
data: Data to write to the Pozyx registers.
Has to be ByteStructure-derived object.
Returns:
POZYX_SUCCESS, POZYX_FAILURE
"""
runs = int(data.byte_size / MAX_SERIAL_SIZE)
r = ''
for i in range(runs):
s = 'R,%0.2x,%i\r' % (
address + i * MAX_SERIAL_SIZE, MAX_SERIAL_SIZE)
try:
r += self.serialExchange(s)
except SerialException:
return POZYX_FAILURE
s = 'R,%0.2x,%i\r' % (
address + runs * MAX_SERIAL_SIZE, data.byte_size - runs * MAX_SERIAL_SIZE)
try:
r += self.serialExchange(s)
except SerialException:
return POZYX_FAILURE
data.load_bytes(r)
return POZYX_SUCCESS
[docs] def regFunction(self, address, params, data):
"""
Performs a register function on the Pozyx, if the address is a register
function.
Args:
address: Register function address of function to perform.
params: Parameters for the register function.
Has to be ByteStructure-derived object.
data: Container for the data the register function returns.
Has to be ByteStructure-derived object.
Returns:
POZYX_SUCCESS, POZYX_FAILURE
"""
params.load_hex_string()
s = 'F,%0.2x,%s,%i\r' % (address, params.byte_data, data.byte_size + 1)
try:
r = self.serialExchange(s)
except SerialException:
return POZYX_FAILURE
if len(data) > 0:
data.load_bytes(r[2:])
return int(r[0:2], 16)
[docs] def waitForFlag(self, interrupt_flag, timeout_s, interrupt=None):
"""
Waits for a certain interrupt flag to be triggered, indicating that
that type of interrupt occured.
Args:
interrupt_flag: Flag indicating interrupt type.
timeout_s: time in seconds that POZYX_INT_STATUS will be checked
for the flag before returning POZYX_TIMEOUT.
Kwargs:
interrupt: Container for the POZYX_INT_STATUS data
Returns:
POZYX_SUCCESS, POZYX_FAILURE, POZYX_TIMEOUT
"""
if interrupt is None:
interrupt = SingleRegister()
return self.waitForFlagSafe(interrupt_flag, timeout_s, interrupt)