# Copyright (c) 2020. Ruud de Jong
# This file is part of the SlipLib project which is released under the MIT license.
# See https://github.com/rhjdjong/SlipLib for details.
"""
Module :mod:`~sliplib.slip`
===========================
The :mod:`~sliplib.slip` module contains configuration settings and
lower-level functions and classes that are mainly useful for extending the :mod:`sliplib` package.
The configuration settings, functions, classes, and exception in this module can also be imported directly from the
:mod:`sliplib` package.
Configuration
-------------
.. attribute:: config
The configuration settings are stored in the :attr:`config` configuration object.
The :attr:`config` object provides constants, configuration settings, and a context manager
for :mod:`sliplib`.
Constants
+++++++++
.. autoattribute:: config.END
The SLIP :const:`~config.END` byte. Used to delimit SLIP packages.
.. autoattribute:: config.ESC
The SLIP :const:`~config.ESC` byte.
Used to escape :const:`~config.ESC` and :const:`~config.END` bytes in the message.
.. autoattribute:: config.ESC_END
The escaped value of an :const:`~config.END` byte.
.. autoattribute:: config.ESC_ESC
The escaped value of an :const:`~config.ESC` byte.
For backwards compatibility reasons these constants can also be imported directly
from the top-level :mod:`sliplib` module.
Settings
++++++++
.. autoattribute:: config.USE_LEADING_END_BYTE
Indicates if a leading :const:`~config.END` byte must be sent.
Context Manager
+++++++++++++++
.. automethod:: config.use_leading_end_byte(value)
The :obj:`config` configuration object and its constants
:const:`~config.END`, :const:`~config.ESC`, :const:`~config.ESC_END`, and :const:`~config.ESC_ESC`,
as well as the :func:`~config.use_leading_end_byte` context manager
can also be imported directly from the :mod:`sliplib` module.
Functions
---------
The following are lower-level functions that should normally not be used directly.
.. autofunction:: encode
.. autofunction:: decode
.. autofunction:: is_valid
Classes
-------
.. autoclass:: Driver
Class :class:`Driver` offers the following methods:
.. automethod:: send
.. automethod:: receive
.. automethod:: get
.. autoproperty:: sends_leading_end_byte
Exceptions
----------
.. autoexception:: ProtocolError
"""
from __future__ import annotations
import re
from collections.abc import Iterator # noqa: TC003
from contextlib import contextmanager
from queue import Empty, Queue
from threading import RLock
from typing import Final
class _Configuration:
"""Configuration object for sliplib."""
_END: Final = b"\xc0"
_ESC: Final = b"\xdb"
_ESC_END: Final = b"\xdc"
_ESC_ESC: Final = b"\xdd"
#: Indicates if a leading :const:`~config.END` byte must be sent.
_USE_LEADING_END_BYTE = False
_use_leading_end_byte_lock: RLock = RLock()
#: The SLIP :const:`~config.END` byte. Used to delimit SLIP packages.
@property
def END(self) -> bytes: # noqa: N802
"""The SLIP :const:`~config.END` byte. Used to delimit SLIP packages."""
return self._END
@property
def ESC(self) -> bytes: # noqa: N802
"""The SLIP :const:`~config.ESC` byte.
Used to escape :const:`~config.ESC` and :const:`~config.END` bytes in the message."""
return self._ESC
@property
#: The escaped value of an :const:`~config.END` byte.
def ESC_END(self) -> bytes: # noqa: N802
"""#: The escaped value of an :const:`~config.END` byte."""
return self._ESC_END
@property
def ESC_ESC(self) -> bytes: # noqa: N802
"""The escaped value of an :const:`~config.ESC` byte."""
return self._ESC_ESC
@property
def USE_LEADING_END_BYTE(self) -> bool: # noqa: N802
"""Indicates if a leading :const:`~config.END` byte must be sent."""
return self._USE_LEADING_END_BYTE
@USE_LEADING_END_BYTE.setter
def USE_LEADING_END_BYTE(self, value: bool) -> None: # noqa: N802
with self._use_leading_end_byte_lock:
self._USE_LEADING_END_BYTE = value
@contextmanager
def use_leading_end_byte(self, value: bool) -> Iterator[None]: # noqa: FBT001
"""Temporarily modify the value of :data:`USE_LEADING_END_BYTE`.
This context manager ensures that any :class:`~sliplib.slip.Driver`
and :class:`~sliplib.slipwrapper.SlipWrapper` instances that are
defined in its body use a specific value for :data:`USE_LEADING_END_BYTE`.
This is particularly useful when the application interacts with different endpoints,
some that require a leading :const:`~config.END` byte,
and others that cannot handle multiple subsequent :const:`~config.END` bytes.
By using this context manager, the order of the creation of the instances can be modified
without having to worry about the current value of :data:`USE_LEADING_END_BYTE`.
Example:
.. code-block:: python
with use_leading_end_byte(True):
slip_socket = SlipSocket(sock) # Where sock is a previously created socket.
# Calling `slip_socket.send_msg(message)` will send the encoded message
# with both a leading and trailing END byte.
.. versionadded:: 0.7.0
.. note::
The temporary value of :data:`USE_LEADING_END_BYTE` does not propagate to processes
that are created or started in the body of the context manager.
.. warning::
:data:`USE_LEADING_END_BYTE` is a global setting.
For that reason, the temporary value of :data:`USE_LEADING_END_BYTE` is
protected from modification by other threads.
This means that inside the body of this context manager, you should e.g. not wait for notifications from
another thread when that other thread also uses this context manager.
As an example, the following will deadlock because the spawned thread will never
enter the body of the context manager.
.. code-block:: python
def make_server(address, handler_class, event):
with config.use_leading_end_byte(False):
slip_server = SlipServer(address, handler_class)
slip_server.handle_request()
event.set()
with config.use_leading_end_byte(True):
event = threading.Event()
server_thread = threading.Thread(
target=self.server,
args=(address, SlipRequestHandler, event),
)
server_thread.start()
event.wait()
client = SlipSocket.make_client(address)
As a general rule, the body of the context manager should only contain statements
that directly create :class:`~sliplib.slip.Driver`
and/or :class:`~sliplib.slipwrapper.SlipWrapper` instances.
The above example should be rewritten to something like:
.. code-block:: python
def make_server(address, handler_class, event):
with config.use_leading_end_byte(False):
slip_server = SlipServer(address, handler_class)
slip_server.handle_request()
event.set()
event = threading.Event()
server_thread = threading.Thread(
target=self.server,
args=(address, SlipRequestHandler, event),
)
server_thread.start()
event.wait()
with config.use_leading_end_byte(True):
client = SlipSocket.make_client(address)
Args:
value: The temporary value of :data:`USE_LEADING_END_BYTE`.
:rtype: :external:obj:`~typing.ContextManager` [:external:obj:`None`]
"""
with self._use_leading_end_byte_lock:
current_value = self.USE_LEADING_END_BYTE
try:
self.USE_LEADING_END_BYTE = value
yield
finally:
self.USE_LEADING_END_BYTE = current_value
config = _Configuration()
END = config.END
ESC = config.ESC
ESC_END = config.ESC_END
ESC_ESC = config.ESC_ESC
use_leading_end_byte = config.use_leading_end_byte
[docs]
class ProtocolError(ValueError):
"""Exception to indicate that a SLIP protocol error has occurred.
This exception is raised when an attempt is made to decode
a packet with invalid bytes or byte sequences.
Invalid bytes are :const:`~config.END` bytes or a trailing :const:`~config.ESC` byte.
An invalid byte sequence is an :const:`~config.ESC` byte followed
by any byte that is not an :const:`~config.ESC_ESC` or :const:`~config.ESC_END` byte.
The :exc:`ProtocolError` carries the invalid packet
as the first (and only) element in its :attr:`args` tuple.
"""
[docs]
def encode(msg: bytes) -> bytes:
"""Encode a message (a byte sequence) into a SLIP-encoded packet.
This function replaces any :const:`~config.END` or :const:`~config.ESC` byte
in the message with their SLIP-escaped value.
Args:
msg: The message that must be encoded
Returns:
The SLIP-encoded message
.. versionchanged:: 0.7.0
Leading and/or trailing :const:`~config.END` bytes are no longer included in the return value.
As of version 0.7.1, the original version of the :func:`~sliplib.legacy.encode()` function is available in the
:mod:`~sliplib.legacy` module.
"""
msg = bytes(msg)
return msg.replace(ESC, ESC + ESC_ESC).replace(END, ESC + ESC_END)
[docs]
def decode(packet: bytes) -> bytes:
"""Retrieve the message from the SLIP-encoded packet.
This function replaces any escaped SLIP bytes with their original values
Args:
packet: The SLIP-encoded message.
Note that this must be exactly one complete packet,
without any leading and/or trailing :const:`~config.END` bytes.
The :func:`decode` function does not provide any buffering
for incomplete packages, nor does it provide support
for decoding data with multiple packets.
Returns:
The decoded message
Raises:
ProtocolError: if the packet contains an invalid byte sequence.
.. versionchanged:: 0.7.0
Leading and/or trailing :const:`~config.END` bytes are no longer allowed.
As of version 0.7.1, the original version of the :func:`~sliplib.legacy.decode()` function is available in the
:mod:`~sliplib.legacy` module.
"""
if not is_valid(packet):
raise ProtocolError(packet)
return packet.replace(ESC + ESC_END, END).replace(ESC + ESC_ESC, ESC)
[docs]
def is_valid(packet: bytes) -> bool:
"""Indicate if the packet's contents conform to the SLIP specification.
A packet is valid if:
* It contains no :const:`~config.END` bytes, and
* Each :const:`~config.ESC` byte is followed by either
an :const:`~config.ESC_END` or an :const:`~config.ESC_ESC` byte.
Args:
packet: The packet to inspect.
Returns:
:external:obj:`True` if the packet is valid, :external:obj:`False` otherwise
.. versionchanged:: 0.7.0
Leading and/or trailing :const:`~config.END` bytes are no longer allowed.
"""
return not (END in packet or packet.endswith(ESC) or re.search(ESC + b"[^" + ESC_END + ESC_ESC + b"]", packet))
[docs]
class Driver:
"""Handle the SLIP-encoding and decoding of messages."""
def __init__(self) -> None:
self._prefix = END if config.USE_LEADING_END_BYTE else b""
self._finished = False
self._recv_buffer = b""
self._packets: Queue[bytes] = Queue()
[docs]
def send(self, message: bytes) -> bytes:
"""Encode a message into a SLIP-encoded packet.
The message can be any arbitrary byte sequence.
Args:
message: The message that must be encoded.
Returns:
A packet with the SLIP-encoded message, including any required
leading and trailing :const:`~config.END` bytes.
"""
return self._prefix + encode(message) + END
[docs]
def receive(self, data: bytes | int) -> None:
"""Extract and buffer SLIP packets.
Processes `data`, which must be a bytes-like object,
and extracts and buffers the SLIP packets contained therein.
A non-terminated SLIP packet in `data`
is also buffered, and extended with the next call to :meth:`receive`.
Args:
data: A bytes-like object to be processed.
An empty `data` parameter indicates that no more data will follow.
To accommodate iteration over byte sequences, an
integer in the range(0, 256) is also accepted.
Returns:
:external:obj:`None`:
.. versionchanged:: 0.7
`receive()` no longer returns a list of decoded messages.
"""
# When a single byte is fed into this function
# it is received as an integer, not as a bytes object.
# It must first be converted into a bytes object.
if isinstance(data, int):
data = bytes((data,))
# Empty data indicates that the data reception is complete.
# To force a buffer flush, an END byte is added, so that the
# current contents of _recv_buffer will form a complete message.
if not data:
self._finished = True
data = END
self._recv_buffer += data
# The following situations can occur:
#
# 1) _recv_buffer is empty or contains only END bytes --> no packets available
# 2) _recv_buffer contains non-END bytes --> one or more (partial) packets are available
#
# Strip leading END bytes from _recv_buffer to avoid handling empty packets.
self._recv_buffer = self._recv_buffer.lstrip(END)
# The _recv_buffer is now split on sequences of one or more END bytes.
# The trailing element from the split operation is a possibly incomplete
# packet; this element is therefore used as the new _recv_buffer.
# If _recv_buffer contains one or more trailing END bytes,
# (meaning that there are no incomplete packets), then the last element,
# and therefore the new _recv_buffer, is an empty bytes object.
*new_packets, self._recv_buffer = re.split(END + b"+", self._recv_buffer)
# Add the packets to the buffer
for packet in new_packets:
self._packets.put(packet)
[docs]
def get(self, *, block: bool = True, timeout: float | None = None) -> bytes | None:
"""Get the next decoded message.
Remove and decode a SLIP packet from the internal buffer, and return the resulting message.
If `block` is :external:obj:`True` and `timeout` is :external:obj:`None` (the default),
then this method blocks until a message is available.
If `timeout` is a positive number, the blocking will last for at most `timeout` seconds,
and the method will return :external:obj:`None` if no message became available in that time.
If `block` is :external:obj:`False`,
the method returns immediately with either a message or :external:obj:`None`.
Note:
`block` and `timeout` are keyword-only parameters.
Args:
block: If :external:obj:`True`, then block for at most timeout seconds. Otherwise, return immediately.
timeout: The number of seconds to wait for a message to become available.
Returns:
- :external:obj:`None` if no message is available,
- a decoded SLIP message, or
- an empty bytestring :obj:`b""` if no further messages will come available.
Raises:
ProtocolError: When the packet that contained the message had an invalid byte sequence.
.. versionadded:: 0.7
"""
try:
packet = self._packets.get(block, timeout)
except Empty:
return b"" if self._finished else None
return decode(packet)
@property
def sends_leading_end_byte(self) -> bool:
"""Indicates whether this :class:`Driver` instance sends a leading :const:`~config.END` byte.
.. versionadded:: 0.7.0
"""
return self._prefix == END