How-to — Buffered Serializers

Note

This section assumes that you have read the How-to — Serializers page.


Introduction

incremental_deserialize() is useful to wait for all data parts of a packet, but there is a small problem with it: the memory consumption.

The Actual Behavior

What happens when you hit a yield statement:

  • A large enough byte buffer is allocated (one call to malloc(3)).

  • recv(2) will write as much as possible to this buffer.

  • The byte buffer is shrunk to fit the bytes actually written (one call to realloc(3)).

  • This finished bytes instance is sent to the generator.

What are you likely to do with this byte buffer:

  • Append the content to an existing byte buffer, which can be:

  • Drop the reference to the given byte buffer, since you no longer need it (one call to free(3)).

Now Imagine That…

…you want recv(2) to write directly to your byte buffer (and perhaps at a given position).

It might look like this:

  • recv(2) will write as much as possible to the byte buffer you provided.

  • The number of bytes written is sent to the generator.

This principle involves several things:

  • The byte buffer can be reused (so there is no more buffer allocation per recv(2) call).

  • It is no longer necessary to copy the content to another location.

This is what the BufferedIncrementalPacketSerializer is designed for.

The benefits?

This model exists for performance purposes only. However, it requires careful consideration of the needs to be met. It’s not a quick fix that will solve application performance problems. In fact, it may not be convenient.

The scenario described above is a real problem for servers with high workloads and/or low bandwidth that exchange large amounts of data with their clients. In such a situation, the slightest typo can bring the whole thing crashing down. Under “normal” conditions, the basic implementation is more than enough to keep a server running with acceptable performance.


Writing A Buffered Serializer

To write a buffered serializer, you must create a subclass of BufferedIncrementalPacketSerializer and override its create_deserializer_buffer() and buffered_incremental_deserialize() methods.

Note

You still need to implement AbstractIncrementalPacketSerializer methods. Writing input directly to an external object that implements the buffer protocol is not supported by all transport layer implementations.

Let’s see how we can use it for MyJSONSerializer (from How-to — Serializers):

Click here to expand/collapse the full code
 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Generator
 5from typing import TYPE_CHECKING, Any, TypeAlias
 6
 7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
 8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 9
10if TYPE_CHECKING:
11    from _typeshed import ReadableBuffer
12
13SentPacket: TypeAlias = Any
14ReceivedPacket: TypeAlias = Any
15
16
17class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):
18    def __init__(self, *, ensure_ascii: bool = True) -> None:
19        self._ensure_ascii: bool = ensure_ascii
20
21        self._encoding: str
22        if self._ensure_ascii:
23            self._encoding = "ascii"
24        else:
25            self._encoding = "utf-8"
26
27    def _dump(self, packet: SentPacket) -> bytes:
28        document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
29        return document.encode(self._encoding)
30
31    def _load(self, data: bytes | bytearray) -> ReceivedPacket:
32        document = data.decode(self._encoding)
33        return json.loads(document)
34
35    def serialize(self, packet: SentPacket) -> bytes:
36        return self._dump(packet)
37
38    def deserialize(self, data: bytes) -> ReceivedPacket:
39        try:
40            return self._load(data)
41        except (UnicodeError, json.JSONDecodeError) as exc:
42            raise DeserializeError("JSON decode error") from exc
43
44    def incremental_serialize(self, packet: SentPacket) -> Generator[bytes, None, None]:
45        yield self._dump(packet) + b"\r\n"
46
47    def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
48        data = yield
49        newline = b"\r\n"
50        while (index := data.find(newline)) < 0:
51            data += yield
52
53        remainder = data[index + len(newline) :]
54        data = data[:index]
55
56        try:
57            document = self._load(data)
58        except (UnicodeError, json.JSONDecodeError) as exc:
59            raise IncrementalDeserializeError("JSON decode error", remainder) from exc
60
61        return document, remainder
62
63    def create_deserializer_buffer(self, sizehint: int) -> bytearray:
64        buffer_size: int = max(sizehint, 65536)
65        return bytearray(buffer_size)
66
67    def buffered_incremental_deserialize(
68        self,
69        buffer: bytearray,
70    ) -> Generator[int | None, int, tuple[ReceivedPacket, ReadableBuffer]]:
71        buffer_size = len(buffer)
72        newline = b"\r\n"
73        separator_length = len(newline)
74
75        nb_written_bytes: int = (yield None)
76
77        while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
78            start_idx: int = nb_written_bytes
79            if start_idx > buffer_size - separator_length:
80                raise IncrementalDeserializeError("Too long line", remaining_data=b"")
81            nb_written_bytes += yield start_idx
82
83        remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
84        data: bytearray = buffer[:index]
85
86        try:
87            document = self._load(data)
88        except (UnicodeError, json.JSONDecodeError) as exc:
89            raise IncrementalDeserializeError("JSON decode error", remainder) from exc
90
91        return document, remainder

Choose the buffer type

When subclassing BufferedIncrementalPacketSerializer, you must pass the buffer type. This should be the type of the instance returned by create_deserializer_buffer().

A buffer object is an object that implements the buffer protocol.

Tip

As of Python 3.12, a buffer object can also be an object that implements the collections.abc.Buffer interface.

For the tutorial, we use a bytearray:

 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Generator
 5from typing import TYPE_CHECKING, Any, TypeAlias
 6
 7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
 8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 9
10if TYPE_CHECKING:
11    from _typeshed import ReadableBuffer
12
13SentPacket: TypeAlias = Any
14ReceivedPacket: TypeAlias = Any
15
16
17class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):

Buffer Instantiation

63def create_deserializer_buffer(self, sizehint: int) -> bytearray:
64    buffer_size: int = max(sizehint, 65536)
65    return bytearray(buffer_size)

Note

sizehint is the recommended buffer size, but the buffer can be of any size.

In the example, we assume that a JSON line can be up to 64KiB. Therefore, the minimum buffer size must be 64KiB.

Important

This function is called only once per connection. This means that buffered_incremental_deserialize() will reuse the same buffer for a given stream.

The Purpose Of buffered_incremental_deserialize()

buffered_incremental_deserialize() must be a generator function (or at least return a generator iterator) that yields until all the data parts of the packet have been retrieved and parsed.

The value yielded is the position to start writing to the buffer. It can be:

  • None: Just use the whole buffer. Therefore, yield and yield None are equivalent to yield 0.

  • A positive integer (starting at 0): Skips the first n bytes.

  • A negative integer: Skips until the last n bytes. For example, yield -10 means to write from the last 10th byte of the buffer.

This generator must return a pair of (packet, remainder) where packet is the deserialized packet and remainder is any superfluous trailing bytes that was useless. The remainder can be a memoryview pointing to buffer or an external bytes-like object.

At each yield checkpoint, the endpoint implementation sends to the generator the number of bytes written to the buffer from the given start position.

67def buffered_incremental_deserialize(
68    self,
69    buffer: bytearray,
70) -> Generator[int | None, int, tuple[ReceivedPacket, ReadableBuffer]]:
71    buffer_size = len(buffer)
72    newline = b"\r\n"
73    separator_length = len(newline)
74
75    nb_written_bytes: int = (yield None)
76
77    while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
78        start_idx: int = nb_written_bytes
79        if start_idx > buffer_size - separator_length:
80            raise IncrementalDeserializeError("Too long line", remaining_data=b"")
81        nb_written_bytes += yield start_idx
82
83    remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
84    data: bytearray = buffer[:index]
85
86    try:
87        document = self._load(data)
88    except (UnicodeError, json.JSONDecodeError) as exc:
89        raise IncrementalDeserializeError("JSON decode error", remainder) from exc
90
91    return document, remainder

Warning

Buffer consistency

Because of buffer reuse, it is up to you to reset the buffer state upon entry and exit of the generator. If you are relying only on the area that has already been written, you can skip this step (as in the example above).

Growing buffers

It is not officially supported to increase/decrease the buffer size during deserialization. But if you want to, remember to reset it to its initial size for consistency.

Tip

Buffer initialization

buffered_incremental_deserialize() is called before the read. You can reinitialize the buffer (for example, by filling it to zero) before the first read:

def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator:
    for i in range(len(buffer)):
        buffer[i] = 0

    nbytes = yield

    ...
Start writing anytime, anywhere

It is not mandatory to yield None or 0 for the first yield.

Avoid copying data as much as possible

memoryview is your best friend, use it.

Tips & Tricks

Common implementations between incremental_deserialize() and buffered_incremental_deserialize()

If the API you are using does not care about using bytes or memoryview, you can write a single function that handles any byte buffer:

 1from __future__ import annotations
 2
 3import io
 4from collections.abc import Generator
 5from typing import Any, TypeAlias
 6
 7from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
 8
 9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MySerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, memoryview]):
14    ...
15
16    # It can receive either 'bytes' from endpoint or 'memoryviews' from buffered_incremental_deserialize()
17    def incremental_deserialize(self) -> Generator[None, bytes | memoryview, tuple[ReceivedPacket, bytes]]:
18        initial_bytes = yield
19        with io.BytesIO(initial_bytes) as buffer:
20            while True:
21                try:
22                    packet = self._load_from_file(buffer)
23                except EOFError:
24                    pass
25                else:
26                    break
27                buffer.write((yield))
28                buffer.seek(0)
29
30            remainder = buffer.read()
31            return packet, remainder
32
33    def _load_from_file(self, file: io.IOBase) -> ReceivedPacket:
34        ...
35
36    def create_deserializer_buffer(self, sizehint: int) -> memoryview:
37        # Don't care about buffer size
38        buffer = bytearray(sizehint)
39        return memoryview(buffer)
40
41    def buffered_incremental_deserialize(self, buffer: memoryview) -> Generator[None, int, tuple[ReceivedPacket, bytes]]:
42        incremental_deserialize = self.incremental_deserialize()
43        # Start the generator
44        next(incremental_deserialize)
45
46        while True:
47            nb_bytes_written: int = yield
48            try:
49                incremental_deserialize.send(buffer[:nb_bytes_written])
50            except StopIteration as exc:
51                # incremental_deserialize() returned
52                return exc.value