# Copyright (c) 2020. Ruud de Jong
# This file is part of the SlipLib project which is released under the MIT license.
# See https://github.com/rhjdjong/SlipLib for details.
"""
Module :mod:`~sliplib.slipstream`
==================================
The :mod:`~sliplib.slipstream` module contains the class :class:`SlipStream`.
The :class:`SlipStream` class can also be imported directly from the :mod:`sliplib` package.
.. autoprotocol:: IOStream
:show-inheritance:
.. autoclass:: SlipStream(stream, [chunk_size])
:show-inheritance:
A :class:`SlipStream` instance has the following attributes and read-only properties
in addition to the attributes offered by its base class :class:`~sliplib.slipwrapper.SlipWrapper`:
.. autoattribute:: chunk_size
.. autoproperty:: readable
.. autoproperty:: writable
"""
from __future__ import annotations
import io
import warnings
from typing import Any, Protocol
from sliplib.slipwrapper import SlipWrapper
[docs]
class IOStream(Protocol):
"""
Protocol class for wrappable byte streams.
Any object that produces and consumes a byte stream and contains the two required methods can be used.
Typically, an IOStream is a subclass of :class:`io.RawIOBase`, :class:`io.BufferedIOBase`,
:class:`io.FileIO`, or similar classes, but this is not required.
"""
[docs]
def read(self, chunksize: int) -> bytes:
"""Read `chunksize` bytes from the stream.
Args:
chunksize: The number of bytes to read from the :protocol:`IOStream`.
Returns:
The bytes read from the :protocol:`IOStream`.
The number of bytes read may be less than the number specified by `chunksize`.
"""
[docs]
def write(self, data: bytes) -> int:
"""Write data to the stream.
Args:
data: The bytes to write on to :protocol:`IOStream`.
Returns:
The number of bytes actually written. This may be less than the
number of bytes contained in `data`.
"""
[docs]
class SlipStream(SlipWrapper[IOStream]):
"""Class that wraps an IO stream with a :class:`~sliplib.slip.Driver`.
:class:`SlipStream` combines a :class:`~sliplib.slip.Driver` instance with a concrete byte stream.
The byte stream must support the methods :meth:`read` and :meth:`write`.
To avoid conflicts and ambiguities caused by different `newline` conventions,
streams that have an :attr:`encoding` attribute
(such as :class:`io.StringIO` objects, or text files that are not opened in binary mode)
are not accepted as a byte stream.
The :class:`SlipStream` class has all the methods and attributes
from its base class :class:`~sliplib.slipwrapper.SlipWrapper`.
In addition, it directly exposes all methods and attributes of
the contained :obj:`~sliplib.slipwrapper.SlipWrapper.stream`, except for the following:
* :meth:`read*` and :meth:`write*`. These methods are not
supported, because byte-oriented read and write operations
would invalidate the internal state maintained by :class:`SlipStream`.
* Similarly, :meth:`seek`, :meth:`tell`, and :meth:`truncate` are not supported,
because repositioning or truncating the stream would invalidate the internal state.
* :meth:`raw`, :meth:`detach` and other methods that provide access to or manipulate
the stream's internal data.
Instead of the :meth:`read*` and :meth:`write*` methods
a :class:`SlipStream` object provides the method :meth:`~sliplib.slipwrapper.SlipWrapper.recv_msg`
and :meth:`~sliplib.slipwrapper.SlipWrapper.send_msg`
to read and write SLIP-encoded messages.
.. deprecated:: 0.6
Direct access to the methods and attributes of the contained :obj:`~sliplib.slipwrapper.SlipWrapper.stream`
will be removed in version 1.0.
"""
def __init__(self, stream: IOStream, chunk_size: int = io.DEFAULT_BUFFER_SIZE):
"""
To instantiate a :class:`SlipStream` object, the user must provide
a pre-constructed open byte stream that is ready for reading and/or writing.
Args:
stream: The byte stream that will be wrapped.
chunk_size: The number of bytes to read per read operation.
The default value for `chunck_size` is :external:obj:`io.DEFAULT_BUFFER_SIZE`.
Setting `chunk_size` is useful when the stream has a low bandwidth
and/or bursty data (e.g. a serial port interface).
In such cases it is useful to have a `chunk_size` value of 1, to avoid that the application
hangs or becomes unresponsive.
.. versionadded:: 0.6
The `chunk_size` parameter.
A :class:`SlipStream` instance can e.g. be useful to read slip-encoded messages
from a file:
.. code::
with open('/path/to/a/slip/encoded/file', mode='rb') as f:
slip_file = SlipStream(f)
for msg in slip_file:
# Do something with the message
"""
for method in ("read", "write"):
if not hasattr(stream, method) or not callable(getattr(stream, method)):
error_msg = f"{stream.__class__.__name__} object has no method {method}"
raise TypeError(error_msg)
if hasattr(stream, "encoding"):
error_msg = f"{stream.__class__.__name__} object is not a byte stream"
raise TypeError(error_msg)
#: The number of bytes to read during each read operation.
self.chunk_size = chunk_size if chunk_size > 0 else io.DEFAULT_BUFFER_SIZE
super().__init__(stream)
def send_bytes(self, packet: bytes) -> None:
"""See base class."""
while packet:
number_of_bytes_written = self.stream.write(packet)
packet = packet[number_of_bytes_written:]
def recv_bytes(self) -> bytes:
"""See base class."""
return b"" if self._stream_is_closed else self.stream.read(self.chunk_size)
@property
def readable(self) -> bool:
"""Indicates if the wrapped stream is readable.
The value is :external:obj:`True` if the readability of the wrapped stream
cannot be determined.
"""
return getattr(self.stream, "readable", True)
@property
def writable(self) -> bool:
"""Indicates if the wrapped stream is writable.
The value is :external:obj:`True` if the writabilty of the wrapped stream
cannot be determined.
"""
return getattr(self.stream, "writable", True)
@property
def _stream_is_closed(self) -> bool:
"""Indicates if the wrapped stream is closed.
The value is :external:obj:`False` if it cannot be determined if the wrapped stream is closed.
"""
return getattr(self.stream, "closed", False)
def __getattr__(self, attribute: str) -> Any:
if attribute.startswith(("read", "write")) or attribute in (
"detach",
"flushInput",
"flushOutput",
"getbuffer",
"getvalue",
"peek",
"raw",
"reset_input_buffer",
"reset_output_buffer",
"seek",
"seekable",
"tell",
"truncate",
):
error_msg = f"'{self.__class__.__name__}' object has no attribute '{attribute}'"
raise AttributeError(error_msg)
# Deprecation warning
warning_msg = "Direct access to the enclosed stream attributes and methods will be removed in version 1.0"
warnings.warn(warning_msg, DeprecationWarning, stacklevel=2)
return getattr(self.stream, attribute)