How-to — Buffered Serializers


Introduction

incremental_deserialize() is useful to wait for all data parts of a packet, but there is a small problem with it: the memory consumption.

The Actual Behavior

What happens when you hit a yield statement:

  • A large enough byte buffer is allocated (one call to malloc(3)).

  • recv(2) will write as much as possible to this buffer.

  • The byte buffer is shrunk to fit the bytes actually written (one call to realloc(3)).

  • This finished bytes instance is sent to the generator.

What are you likely to do with this byte buffer:

  • Append the content to an existing byte buffer, which can be:

  • Drop the reference to the given byte buffer, since you no longer need it (one call to free(3)).

Now Imagine That…

…you want recv(2) to write directly to your byte buffer (and perhaps at a given position).

It might look like this:

  • recv(2) will write as much as possible to the byte buffer you provided.

  • The number of bytes written is sent to the generator.

This principle involves several things:

  • The byte buffer can be reused (so there is no more buffer allocation per recv(2) call).

  • It is no longer necessary to copy the content to another location.

This is what the BufferedIncrementalPacketSerializer is designed for.

The benefits?

This model exists for performance purposes only. However, it requires careful consideration of the needs to be met. It’s not a quick fix that will solve application performance problems. In fact, it may not be convenient.

The scenario described above is a real problem for servers with high workloads and/or low bandwidth that exchange large amounts of data with their clients. In such a situation, the slightest typo can bring the whole thing crashing down. Under “normal” conditions, the basic implementation is more than enough to keep a server running with acceptable performance.

Important

I want to point out that it’s not about speed. It may slow the server down a bit, but it will be more robust.


Usage

For the serializers that support this API, you must use BufferedStreamProtocol instead of StreamProtocol:

1def main() -> None:
2    serializer = StringLineSerializer()
3    protocol = BufferedStreamProtocol(serializer)
4
5    with TCPNetworkClient(("remote_address", 12345), protocol) as endpoint:
6        endpoint.send_packet("Hello, world!")

Writing A Buffered Serializer

To write a buffered serializer, you must create a subclass of BufferedIncrementalPacketSerializer and override its create_deserializer_buffer() and buffered_incremental_deserialize() methods.

Note

You still need to implement AbstractIncrementalPacketSerializer methods.

Let’s see how we can use it for MyJSONSerializer (from How-to — Serializers):

Click here to expand/collapse the full code
 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Buffer, Generator
 5from typing import Any
 6
 7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
 8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 9
10type SentPacket = Any
11type ReceivedPacket = Any
12
13
14class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):
15    def __init__(self, *, ensure_ascii: bool = True) -> None:
16        self._ensure_ascii: bool = ensure_ascii
17
18        self._encoding: str
19        if self._ensure_ascii:
20            self._encoding = "ascii"
21        else:
22            self._encoding = "utf-8"
23
24    def _dump(self, packet: SentPacket) -> bytes:
25        document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
26        return document.encode(self._encoding)
27
28    def _load(self, data: bytes | bytearray) -> ReceivedPacket:
29        document = data.decode(self._encoding)
30        return json.loads(document)
31
32    def serialize(self, packet: SentPacket) -> bytes:
33        return self._dump(packet)
34
35    def deserialize(self, data: bytes) -> ReceivedPacket:
36        try:
37            return self._load(data)
38        except (UnicodeError, json.JSONDecodeError) as exc:
39            raise DeserializeError("JSON decode error") from exc
40
41    def incremental_serialize(self, packet: SentPacket) -> Generator[bytes]:
42        yield self._dump(packet) + b"\r\n"
43
44    def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45        data = yield
46        newline = b"\r\n"
47        while (index := data.find(newline)) < 0:
48            data += yield
49
50        remainder = data[index + len(newline) :]
51        data = data[:index]
52
53        try:
54            document = self._load(data)
55        except (UnicodeError, json.JSONDecodeError) as exc:
56            raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58        return document, remainder
59
60    def create_deserializer_buffer(self, sizehint: int) -> bytearray:
61        buffer_size: int = max(sizehint, 65536)
62        return bytearray(buffer_size)
63
64    def buffered_incremental_deserialize(
65        self,
66        buffer: bytearray,
67    ) -> Generator[int | None, int, tuple[ReceivedPacket, Buffer]]:
68        buffer_size = len(buffer)
69        newline = b"\r\n"
70        separator_length = len(newline)
71
72        nb_written_bytes: int = (yield None)
73
74        while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
75            start_idx: int = nb_written_bytes
76            if start_idx > buffer_size - separator_length:
77                raise IncrementalDeserializeError("Too long line", remaining_data=b"")
78            nb_written_bytes += yield start_idx
79
80        remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
81        data: bytearray = buffer[:index]
82
83        try:
84            document = self._load(data)
85        except (UnicodeError, json.JSONDecodeError) as exc:
86            raise IncrementalDeserializeError("JSON decode error", remainder) from exc
87
88        return document, remainder

Choose the buffer type

When subclassing BufferedIncrementalPacketSerializer, you must pass the buffer type. This should be the type of the instance returned by create_deserializer_buffer().

A buffer object is an object that implements the buffer protocol.

Tip

As of Python 3.12, a buffer object can also be an object that implements the collections.abc.Buffer interface.

For the tutorial, we use a bytearray:

 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Buffer, Generator
 5from typing import Any
 6
 7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
 8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 9
10type SentPacket = Any
11type ReceivedPacket = Any
12
13
14class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):

Buffer Instantiation

60def create_deserializer_buffer(self, sizehint: int) -> bytearray:
61    buffer_size: int = max(sizehint, 65536)
62    return bytearray(buffer_size)

Note

sizehint is the recommended buffer size, but the buffer can be of any size.

In the example, we assume that a JSON line can be up to 64KiB. Therefore, the minimum buffer size must be 64KiB.

Important

This function is called only once per connection. This means that buffered_incremental_deserialize() will reuse the same buffer for a given stream.

The Purpose Of buffered_incremental_deserialize()

buffered_incremental_deserialize() must be a generator function (or at least return a generator iterator) that yields until all the data parts of the packet have been retrieved and parsed.

The value yielded is the position to start writing to the buffer. It can be:

  • None: Just use the whole buffer. Therefore, yield and yield None are equivalent to yield 0.

  • A positive integer (starting at 0): Skips the first n bytes.

  • A negative integer: Skips until the last n bytes. For example, yield -10 means to write from the last 10th byte of the buffer.

This generator must return a pair of (packet, remainder) where packet is the deserialized packet and remainder is any superfluous trailing bytes that was useless. The remainder can be a memoryview pointing to buffer or an external bytes-like object.

At each yield checkpoint, the endpoint implementation sends to the generator the number of bytes written to the buffer from the given start position.

64def buffered_incremental_deserialize(
65    self,
66    buffer: bytearray,
67) -> Generator[int | None, int, tuple[ReceivedPacket, Buffer]]:
68    buffer_size = len(buffer)
69    newline = b"\r\n"
70    separator_length = len(newline)
71
72    nb_written_bytes: int = (yield None)
73
74    while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
75        start_idx: int = nb_written_bytes
76        if start_idx > buffer_size - separator_length:
77            raise IncrementalDeserializeError("Too long line", remaining_data=b"")
78        nb_written_bytes += yield start_idx
79
80    remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
81    data: bytearray = buffer[:index]
82
83    try:
84        document = self._load(data)
85    except (UnicodeError, json.JSONDecodeError) as exc:
86        raise IncrementalDeserializeError("JSON decode error", remainder) from exc
87
88    return document, remainder

Warning

Buffer consistency

Because of buffer reuse, it is up to you to reset the buffer state upon entry and exit of the generator. If you are relying only on the area that has already been written, you can skip this step (as in the example above).

Growing buffers

It is not supported to increase/decrease the buffer size during deserialization.

Tip

Buffer initialization

buffered_incremental_deserialize() is called before the read. You can reinitialize the buffer (for example, by filling it to zero) before the first read:

def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator[...]:
    for i in range(len(buffer)):
        buffer[i] = 0

    nbytes = yield

    ...
Start writing anytime, anywhere

It is not mandatory to yield None or 0 for the first yield.

Avoid copying data as much as possible

memoryview is your best friend, use it.

Tips & Tricks

Common implementations between incremental_deserialize() and buffered_incremental_deserialize()

If the API you are using does not care about using bytes or memoryview, you can write a single function that handles any byte buffer:

 1from __future__ import annotations
 2
 3import io
 4from collections.abc import Generator
 5from typing import Any
 6
 7from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 8
 9type SentPacket = Any
10type ReceivedPacket = Any
11
12
13class MySerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, memoryview]):
14    ...
15
16    # It can receive either 'bytes' from endpoint or 'memoryviews' from buffered_incremental_deserialize()
17    def incremental_deserialize(self) -> Generator[None, bytes | memoryview, tuple[ReceivedPacket, bytes]]:
18        initial_bytes = yield
19        with io.BytesIO(initial_bytes) as buffer:
20            while True:
21                try:
22                    packet = self._load_from_file(buffer)
23                except EOFError:
24                    pass
25                else:
26                    break
27                buffer.write((yield))
28                buffer.seek(0)
29
30            remainder = buffer.read()
31            return packet, remainder
32
33    def _load_from_file(self, file: io.IOBase) -> ReceivedPacket: ...
34
35    def create_deserializer_buffer(self, sizehint: int) -> memoryview:
36        # Don't care about buffer size
37        buffer = bytearray(sizehint)
38        return memoryview(buffer)
39
40    def buffered_incremental_deserialize(self, buffer: memoryview) -> Generator[None, int, tuple[ReceivedPacket, bytes]]:
41        incremental_deserialize = self.incremental_deserialize()
42        # Start the generator
43        next(incremental_deserialize)
44
45        while True:
46            nb_bytes_written: int = yield
47            try:
48                incremental_deserialize.send(buffer[:nb_bytes_written])
49            except StopIteration as exc:
50                # incremental_deserialize() returned
51                return exc.value