How-to — Buffered Serializers
Note
This section assumes that you have read the How-to — Serializers page.
Introduction
incremental_deserialize()
is useful to wait for all data parts of a packet,
but there is a small problem with it: the memory consumption.
The Actual Behavior
What happens when you hit a yield
statement:
A large enough byte buffer is allocated (one call to malloc(3)).
recv(2) will write as much as possible to this buffer.
The byte buffer is shrunk to fit the bytes actually written (one call to realloc(3)).
This finished
bytes
instance is sent to the generator.
What are you likely to do with this byte buffer:
Append the content to an existing byte buffer, which can be:
a
bytearray
(involves a call to realloc(3)).a custom stream-like API such as
io.BytesIO
(involves a call to realloc(3)).or something else, but which will somehow do one of the things above.
Drop the reference to the given byte buffer, since you no longer need it (one call to free(3)).
Now Imagine That…
…you want recv(2) to write directly to your byte buffer (and perhaps at a given position).
It might look like this:
recv(2) will write as much as possible to the byte buffer you provided.
The number of bytes written is sent to the generator.
This principle involves several things:
The byte buffer can be reused (so there is no more buffer allocation per recv(2) call).
It is no longer necessary to copy the content to another location.
This is what the BufferedIncrementalPacketSerializer
is designed for.
The benefits?
This model exists for performance purposes only. However, it requires careful consideration of the needs to be met. It’s not a quick fix that will solve application performance problems. In fact, it may not be convenient.
The scenario described above is a real problem for servers with high workloads and/or low bandwidth that exchange large amounts of data with their clients. In such a situation, the slightest typo can bring the whole thing crashing down. Under “normal” conditions, the basic implementation is more than enough to keep a server running with acceptable performance.
Writing A Buffered Serializer
To write a buffered serializer, you must create a subclass of BufferedIncrementalPacketSerializer
and override
its create_deserializer_buffer()
and
buffered_incremental_deserialize()
methods.
Note
You still need to implement AbstractIncrementalPacketSerializer
methods.
Writing input directly to an external object that implements the buffer protocol is not supported by
all transport layer implementations.
Let’s see how we can use it for MyJSONSerializer
(from How-to — Serializers):
Choose the buffer type
When subclassing BufferedIncrementalPacketSerializer
, you must pass the buffer type. This should be the type of the instance returned
by create_deserializer_buffer()
.
A buffer object is an object that implements the buffer protocol.
Tip
As of Python 3.12, a buffer object can also be an object that implements the collections.abc.Buffer
interface.
For the tutorial, we use a bytearray
:
1from __future__ import annotations
2
3import json
4from collections.abc import Generator
5from typing import TYPE_CHECKING, Any, TypeAlias
6
7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
9
10if TYPE_CHECKING:
11 from _typeshed import ReadableBuffer
12
13SentPacket: TypeAlias = Any
14ReceivedPacket: TypeAlias = Any
15
16
17class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):
Buffer Instantiation
63def create_deserializer_buffer(self, sizehint: int) -> bytearray:
64 buffer_size: int = max(sizehint, 65536)
65 return bytearray(buffer_size)
Note
sizehint
is the recommended buffer size, but the buffer can be of any size.
In the example, we assume that a JSON line can be up to 64KiB. Therefore, the minimum buffer size must be 64KiB.
Important
This function is called only once per connection. This means that
buffered_incremental_deserialize()
will reuse the same buffer for a given stream.
The Purpose Of buffered_incremental_deserialize()
buffered_incremental_deserialize()
must be a generator function
(or at least return a generator iterator) that yields until all the data parts of the packet have been retrieved and parsed.
The value yielded is the position to start writing to the buffer. It can be:
None
: Just use the whole buffer. Therefore,yield
andyield None
are equivalent toyield 0
.A positive integer (starting at
0
): Skips the first n bytes.A negative integer: Skips until the last n bytes. For example,
yield -10
means to write from the last 10th byte of the buffer.
This generator must return a pair of (packet, remainder)
where packet
is the deserialized packet and remainder
is any
superfluous trailing bytes that was useless.
The remainder can be a memoryview
pointing to buffer
or an external bytes-like object.
At each yield
checkpoint, the endpoint implementation sends to the generator the number of bytes written to the buffer
from the given start position.
67def buffered_incremental_deserialize(
68 self,
69 buffer: bytearray,
70) -> Generator[int | None, int, tuple[ReceivedPacket, ReadableBuffer]]:
71 buffer_size = len(buffer)
72 newline = b"\r\n"
73 separator_length = len(newline)
74
75 nb_written_bytes: int = (yield None)
76
77 while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
78 start_idx: int = nb_written_bytes
79 if start_idx > buffer_size - separator_length:
80 raise IncrementalDeserializeError("Too long line", remaining_data=b"")
81 nb_written_bytes += yield start_idx
82
83 remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
84 data: bytearray = buffer[:index]
85
86 try:
87 document = self._load(data)
88 except (UnicodeError, json.JSONDecodeError) as exc:
89 raise IncrementalDeserializeError("JSON decode error", remainder) from exc
90
91 return document, remainder
Warning
- Buffer consistency
Because of buffer reuse, it is up to you to reset the buffer state upon entry and exit of the generator. If you are relying only on the area that has already been written, you can skip this step (as in the example above).
- Growing buffers
It is not officially supported to increase/decrease the buffer size during deserialization. But if you want to, remember to reset it to its initial size for consistency.
Tip
- Buffer initialization
buffered_incremental_deserialize()
is called before the read. You can reinitialize the buffer (for example, by filling it to zero) before the first read:def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator: for i in range(len(buffer)): buffer[i] = 0 nbytes = yield ...
- Start writing anytime, anywhere
It is not mandatory to yield
None
or0
for the first yield.- Avoid copying data as much as possible
memoryview
is your best friend, use it.
Tips & Tricks
Common implementations between incremental_deserialize()
and buffered_incremental_deserialize()
If the API you are using does not care about using bytes
or memoryview
,
you can write a single function that handles any byte buffer:
1from __future__ import annotations
2
3import io
4from collections.abc import Generator
5from typing import Any, TypeAlias
6
7from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
8
9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MySerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, memoryview]):
14 ...
15
16 # It can receive either 'bytes' from endpoint or 'memoryviews' from buffered_incremental_deserialize()
17 def incremental_deserialize(self) -> Generator[None, bytes | memoryview, tuple[ReceivedPacket, bytes]]:
18 initial_bytes = yield
19 with io.BytesIO(initial_bytes) as buffer:
20 while True:
21 try:
22 packet = self._load_from_file(buffer)
23 except EOFError:
24 pass
25 else:
26 break
27 buffer.write((yield))
28 buffer.seek(0)
29
30 remainder = buffer.read()
31 return packet, remainder
32
33 def _load_from_file(self, file: io.IOBase) -> ReceivedPacket:
34 ...
35
36 def create_deserializer_buffer(self, sizehint: int) -> memoryview:
37 # Don't care about buffer size
38 buffer = bytearray(sizehint)
39 return memoryview(buffer)
40
41 def buffered_incremental_deserialize(self, buffer: memoryview) -> Generator[None, int, tuple[ReceivedPacket, bytes]]:
42 incremental_deserialize = self.incremental_deserialize()
43 # Start the generator
44 next(incremental_deserialize)
45
46 while True:
47 nb_bytes_written: int = yield
48 try:
49 incremental_deserialize.send(buffer[:nb_bytes_written])
50 except StopIteration as exc:
51 # incremental_deserialize() returned
52 return exc.value