How-to — Buffered Serializers
Introduction
incremental_deserialize() is useful to wait for all data parts of a packet,
but there is a small problem with it: the memory consumption.
The Actual Behavior
What happens when you hit a yield statement:
A large enough byte buffer is allocated (one call to malloc(3)).
recv(2) will write as much as possible to this buffer.
The byte buffer is shrunk to fit the bytes actually written (one call to realloc(3)).
This finished
bytesinstance is sent to the generator.
What are you likely to do with this byte buffer:
Append the content to an existing byte buffer, which can be:
a
bytearray(involves a call to realloc(3)).a custom stream-like API such as
io.BytesIO(involves a call to realloc(3)).or something else, but which will somehow do one of the things above.
Drop the reference to the given byte buffer, since you no longer need it (one call to free(3)).
Now Imagine That…
…you want recv(2) to write directly to your byte buffer (and perhaps at a given position).
It might look like this:
recv(2) will write as much as possible to the byte buffer you provided.
The number of bytes written is sent to the generator.
This principle involves several things:
The byte buffer can be reused (so there is no more buffer allocation per recv(2) call).
It is no longer necessary to copy the content to another location.
This is what the BufferedIncrementalPacketSerializer is designed for.
The benefits?
This model exists for performance purposes only. However, it requires careful consideration of the needs to be met. It’s not a quick fix that will solve application performance problems. In fact, it may not be convenient.
The scenario described above is a real problem for servers with high workloads and/or low bandwidth that exchange large amounts of data with their clients. In such a situation, the slightest typo can bring the whole thing crashing down. Under “normal” conditions, the basic implementation is more than enough to keep a server running with acceptable performance.
Important
I want to point out that it’s not about speed. It may slow the server down a bit, but it will be more robust.
Usage
For the serializers that support this API, you must use BufferedStreamProtocol instead of StreamProtocol:
1def main() -> None:
2 serializer = StringLineSerializer()
3 protocol = BufferedStreamProtocol(serializer)
4
5 with TCPNetworkClient(("remote_address", 12345), protocol) as endpoint:
6 endpoint.send_packet("Hello, world!")
Writing A Buffered Serializer
To write a buffered serializer, you must create a subclass of BufferedIncrementalPacketSerializer and override
its create_deserializer_buffer() and
buffered_incremental_deserialize() methods.
Note
You still need to implement AbstractIncrementalPacketSerializer methods.
Let’s see how we can use it for MyJSONSerializer (from How-to — Serializers):
Choose the buffer type
When subclassing BufferedIncrementalPacketSerializer, you must pass the buffer type. This should be the type of the instance returned
by create_deserializer_buffer().
A buffer object is an object that implements the buffer protocol.
Tip
As of Python 3.12, a buffer object can also be an object that implements the collections.abc.Buffer interface.
For the tutorial, we use a bytearray:
1from __future__ import annotations
2
3import json
4from collections.abc import Generator
5from typing import TYPE_CHECKING, Any, TypeAlias
6
7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
8from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
9
10if TYPE_CHECKING:
11 from _typeshed import ReadableBuffer
12
13SentPacket: TypeAlias = Any
14ReceivedPacket: TypeAlias = Any
15
16
17class MyJSONSerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, bytearray]):
Buffer Instantiation
63def create_deserializer_buffer(self, sizehint: int) -> bytearray:
64 buffer_size: int = max(sizehint, 65536)
65 return bytearray(buffer_size)
Note
sizehint is the recommended buffer size, but the buffer can be of any size.
In the example, we assume that a JSON line can be up to 64KiB. Therefore, the minimum buffer size must be 64KiB.
Important
This function is called only once per connection. This means that
buffered_incremental_deserialize() will reuse the same buffer for a given stream.
The Purpose Of buffered_incremental_deserialize()
buffered_incremental_deserialize() must be a generator function
(or at least return a generator iterator) that yields until all the data parts of the packet have been retrieved and parsed.
The value yielded is the position to start writing to the buffer. It can be:
None: Just use the whole buffer. Therefore,yieldandyield Noneare equivalent toyield 0.A positive integer (starting at
0): Skips the first n bytes.A negative integer: Skips until the last n bytes. For example,
yield -10means to write from the last 10th byte of the buffer.
This generator must return a pair of (packet, remainder) where packet is the deserialized packet and remainder is any
superfluous trailing bytes that was useless.
The remainder can be a memoryview pointing to buffer or an external bytes-like object.
At each yield checkpoint, the endpoint implementation sends to the generator the number of bytes written to the buffer
from the given start position.
67def buffered_incremental_deserialize(
68 self,
69 buffer: bytearray,
70) -> Generator[int | None, int, tuple[ReceivedPacket, ReadableBuffer]]:
71 buffer_size = len(buffer)
72 newline = b"\r\n"
73 separator_length = len(newline)
74
75 nb_written_bytes: int = (yield None)
76
77 while (index := buffer.find(newline, 0, nb_written_bytes)) < 0:
78 start_idx: int = nb_written_bytes
79 if start_idx > buffer_size - separator_length:
80 raise IncrementalDeserializeError("Too long line", remaining_data=b"")
81 nb_written_bytes += yield start_idx
82
83 remainder: bytearray = buffer[index + separator_length : nb_written_bytes]
84 data: bytearray = buffer[:index]
85
86 try:
87 document = self._load(data)
88 except (UnicodeError, json.JSONDecodeError) as exc:
89 raise IncrementalDeserializeError("JSON decode error", remainder) from exc
90
91 return document, remainder
Warning
- Buffer consistency
Because of buffer reuse, it is up to you to reset the buffer state upon entry and exit of the generator. If you are relying only on the area that has already been written, you can skip this step (as in the example above).
- Growing buffers
It is not supported to increase/decrease the buffer size during deserialization.
Tip
- Buffer initialization
buffered_incremental_deserialize()is called before the read. You can reinitialize the buffer (for example, by filling it to zero) before the first read:def buffered_incremental_deserialize(self, buffer: bytearray) -> Generator[...]: for i in range(len(buffer)): buffer[i] = 0 nbytes = yield ...
- Start writing anytime, anywhere
It is not mandatory to yield
Noneor0for the first yield.- Avoid copying data as much as possible
memoryviewis your best friend, use it.
Tips & Tricks
Common implementations between incremental_deserialize() and buffered_incremental_deserialize()
If the API you are using does not care about using bytes or memoryview,
you can write a single function that handles any byte buffer:
1from __future__ import annotations
2
3import io
4from collections.abc import Generator
5from typing import Any, TypeAlias
6
7from easynetwork.serializers.abc import BufferedIncrementalPacketSerializer
8
9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MySerializer(BufferedIncrementalPacketSerializer[SentPacket, ReceivedPacket, memoryview]):
14 ...
15
16 # It can receive either 'bytes' from endpoint or 'memoryviews' from buffered_incremental_deserialize()
17 def incremental_deserialize(self) -> Generator[None, bytes | memoryview, tuple[ReceivedPacket, bytes]]:
18 initial_bytes = yield
19 with io.BytesIO(initial_bytes) as buffer:
20 while True:
21 try:
22 packet = self._load_from_file(buffer)
23 except EOFError:
24 pass
25 else:
26 break
27 buffer.write((yield))
28 buffer.seek(0)
29
30 remainder = buffer.read()
31 return packet, remainder
32
33 def _load_from_file(self, file: io.IOBase) -> ReceivedPacket: ...
34
35 def create_deserializer_buffer(self, sizehint: int) -> memoryview:
36 # Don't care about buffer size
37 buffer = bytearray(sizehint)
38 return memoryview(buffer)
39
40 def buffered_incremental_deserialize(self, buffer: memoryview) -> Generator[None, int, tuple[ReceivedPacket, bytes]]:
41 incremental_deserialize = self.incremental_deserialize()
42 # Start the generator
43 next(incremental_deserialize)
44
45 while True:
46 nb_bytes_written: int = yield
47 try:
48 incremental_deserialize.send(buffer[:nb_bytes_written])
49 except StopIteration as exc:
50 # incremental_deserialize() returned
51 return exc.value