How-to — Serializers
The Basics
Where Are They Used?
A serializer is used by a protocol object:
DatagramProtocol
: accepts any serializer object.StreamProtocol
: accepts incremental serializer objects only.
Which Ones Are Already Available?
Several serializers are provided in the easynetwork.serializers
module. Do not hesitate to use them.
If nothing fits your needs, you can implement your own serializer.
Writing A One-Shot Serializer
One-shot serializers are the easiest piece of code to write. They can be used directly on
DatagramProtocol
instances, and you can use a serializer wrapper to use the StreamProtocol
class.
See also
easynetwork.serializers.wrapper
moduleThe full list of available wrappers for serializers.
To write a one-shot serializer, you must create a subclass of AbstractPacketSerializer
and override
its serialize()
and deserialize()
methods.
A naive implementation of JSONSerializer
should look something like this:
1from __future__ import annotations
2
3import json
4from typing import Any, TypeAlias
5
6from easynetwork.serializers.abc import AbstractPacketSerializer
7
8SentPacket: TypeAlias = Any
9ReceivedPacket: TypeAlias = Any
10
11
12class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
13 def serialize(self, packet: SentPacket) -> bytes:
14 document = json.dumps(packet)
15 return document.encode()
16
17 def deserialize(self, data: bytes) -> ReceivedPacket:
18 document = data.decode()
19 return json.loads(document)
Parsing Error
The deserialize()
function must raise a DeserializeError
to indicate that a parsing error
was “expected” so that the received data is considered invalid.
1from __future__ import annotations
2
3import json
4from typing import Any, TypeAlias
5
6from easynetwork.exceptions import DeserializeError
7from easynetwork.serializers.abc import AbstractPacketSerializer
8
9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
14 def serialize(self, packet: SentPacket) -> bytes:
15 document = json.dumps(packet)
16 return document.encode()
17
18 def deserialize(self, data: bytes) -> ReceivedPacket:
19 try:
20 document = data.decode()
21 return json.loads(document)
22 except (UnicodeError, json.JSONDecodeError) as exc:
23 raise DeserializeError("JSON decode error") from exc
Warning
Otherwise, any other error is considered a serializer crash.
The Use Of self
A serializer is intended to be shared by multiple protocols (and protocols are intended to be shared by multiple endpoints).
Therefore, the object should only store additional configuration used for serialization/deserialization.
For example:
1from __future__ import annotations
2
3import json
4from typing import Any, TypeAlias
5
6from easynetwork.exceptions import DeserializeError
7from easynetwork.serializers.abc import AbstractPacketSerializer
8
9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
14 def __init__(self, *, ensure_ascii: bool = True) -> None:
15 self._ensure_ascii: bool = ensure_ascii
16
17 self._encoding: str
18 if self._ensure_ascii:
19 self._encoding = "ascii"
20 else:
21 self._encoding = "utf-8"
22
23 def serialize(self, packet: SentPacket) -> bytes:
24 document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
25 return document.encode(self._encoding)
26
27 def deserialize(self, data: bytes) -> ReceivedPacket:
28 try:
29 document = data.decode(self._encoding)
30 return json.loads(document)
31 except (UnicodeError, json.JSONDecodeError) as exc:
32 raise DeserializeError("JSON decode error") from exc
Warning
Do not store per-serialization data. You might see some magic.
Danger
Seriously, don’t do that.
Using A One-Shot Serializer
You must pass the serializer to the appropriate protocol object that is expected by the endpoint class:
1def main() -> None:
2 serializer = MyJSONSerializer()
3 protocol = DatagramProtocol(serializer)
4
5 with UDPNetworkClient(("remote_address", 12345), protocol) as endpoint:
6 endpoint.send_packet({"data": 42})
7
8 ...
1def main() -> None:
2 # Use of Base64EncoderSerializer as an incremental serializer wrapper
3 serializer = Base64EncoderSerializer(MyJSONSerializer())
4 protocol = StreamProtocol(serializer)
5
6 with TCPNetworkClient(("remote_address", 12345), protocol) as endpoint:
7 endpoint.send_packet({"data": 42})
8
9 ...
Note
Using a serializer wrapper means that the transferred data can be completely different from the original output.
If this is important to you, don’t choose one of them lightly.
Writing An Incremental Serializer
Incremental serializers are a bit trickier to implement. They can be used directly on both
StreamProtocol
and DatagramProtocol
instances.
To write an incremental serializer, you must create a subclass of AbstractIncrementalPacketSerializer
and override
its incremental_serialize()
and incremental_deserialize()
methods. The serialize()
and deserialize()
methods
have a default implementation that uses the incremental serialization methods.
Option 1: Using Common Patterns
Chances are that the communication protocol uses a simple principle to determine the end of a packet. The most common cases are:
All your packet frames use a precise byte sequence (most likely a newline).
Each packet has a fixed size.
In these cases you can use the base classes in easynetwork.serializers.base_stream
.
Let’s say that for the incremental part, we consider each line received to be a JSON object, separated by \r\n
:
1from __future__ import annotations
2
3import json
4from typing import Any, TypeAlias
5
6from easynetwork.exceptions import DeserializeError
7from easynetwork.serializers.base_stream import AutoSeparatedPacketSerializer
8
9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AutoSeparatedPacketSerializer[SentPacket, ReceivedPacket]):
14 def __init__(self, *, ensure_ascii: bool = True) -> None:
15 super().__init__(separator=b"\r\n")
16
17 self._ensure_ascii: bool = ensure_ascii
18
19 self._encoding: str
20 if self._ensure_ascii:
21 self._encoding = "ascii"
22 else:
23 self._encoding = "utf-8"
24
25 def serialize(self, packet: SentPacket) -> bytes:
26 document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
27 return document.encode(self._encoding)
28
29 def deserialize(self, data: bytes) -> ReceivedPacket:
30 try:
31 document = data.decode(self._encoding)
32 return json.loads(document)
33 except (UnicodeError, json.JSONDecodeError) as exc:
34 raise DeserializeError("JSON decode error") from exc
AutoSeparatedPacketSerializer
adds the following behaviors:
incremental_serialize()
will append\r\n
to the end of theserialize()
output.incremental_deserialize()
waits until\r\n
is found in the input, removes the separator, and callsdeserialize()
on it.
Tip
Take a look at other available base classes in easynetwork.serializers
before rewriting something that already exists.
Option 2: From Scratch
Let’s see how we can get by without using the AutoSeparatedPacketSerializer
:
1from __future__ import annotations
2
3import json
4from collections.abc import Generator
5from typing import Any, TypeAlias
6
7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
8from easynetwork.serializers.abc import AbstractIncrementalPacketSerializer
9
10SentPacket: TypeAlias = Any
11ReceivedPacket: TypeAlias = Any
12
13
14class MyJSONSerializer(AbstractIncrementalPacketSerializer[SentPacket, ReceivedPacket]):
15 def __init__(self, *, ensure_ascii: bool = True) -> None:
16 self._ensure_ascii: bool = ensure_ascii
17
18 self._encoding: str
19 if self._ensure_ascii:
20 self._encoding = "ascii"
21 else:
22 self._encoding = "utf-8"
23
24 def _dump(self, packet: SentPacket) -> bytes:
25 document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
26 return document.encode(self._encoding)
27
28 def _load(self, data: bytes) -> ReceivedPacket:
29 document = data.decode(self._encoding)
30 return json.loads(document)
31
32 def serialize(self, packet: SentPacket) -> bytes:
33 return self._dump(packet)
34
35 def deserialize(self, data: bytes) -> ReceivedPacket:
36 try:
37 return self._load(data)
38 except (UnicodeError, json.JSONDecodeError) as exc:
39 raise DeserializeError("JSON decode error") from exc
40
41 def incremental_serialize(self, packet: SentPacket) -> Generator[bytes, None, None]:
42 yield self._dump(packet) + b"\r\n"
43
44 def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45 data = yield
46 newline = b"\r\n"
47 while (index := data.find(newline)) < 0:
48 data += yield
49
50 remainder = data[index + len(newline) :]
51 data = data[:index]
52
53 try:
54 document = self._load(data)
55 except (UnicodeError, json.JSONDecodeError) as exc:
56 raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58 return document, remainder
This adds a lot of code! Let’s take a closer look at the implementation.
Code Mutualization
To avoid duplication of code between the one-shot part and the incremental part, the serialization/deserialization part of the code goes to a private method.
24def _dump(self, packet: SentPacket) -> bytes:
25 document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
26 return document.encode(self._encoding)
27
28def _load(self, data: bytes) -> ReceivedPacket:
29 document = data.decode(self._encoding)
30 return json.loads(document)
And now serialize()
and deserialize()
use them instead:
32def serialize(self, packet: SentPacket) -> bytes:
33 return self._dump(packet)
34
35def deserialize(self, data: bytes) -> ReceivedPacket:
36 try:
37 return self._load(data)
38 except (UnicodeError, json.JSONDecodeError) as exc:
39 raise DeserializeError("JSON decode error") from exc
The Purpose Of incremental_serialize()
incremental_serialize()
must be a generator function
(or at least return a generator iterator) that yields all the parts of the serialized packet.
It must also add any useful metadata to help incremental_deserialize()
find the end of the packet.
41def incremental_serialize(self, packet: SentPacket) -> Generator[bytes, None, None]:
42 yield self._dump(packet) + b"\r\n"
Most of the time, you will have a single yield
. The goal is: each yield
must send as many bytes
as possible
without copying or concatenating.
Tip
There may be exceptions, like this example. (Your RAM will not cry because you added 2 bytes to a byte sequence of almost 100 bytes. The question may be asked if the byte sequence is ending up to 4 GB.)
It is up to you to find the balance between RAM explosion and performance degradation.
Note
The endpoint implementation can decide to concatenate all the pieces and do one big send. However, it may be more attractive to do something else
with the returned bytes. incremental_serialize()
is here to give endpoints this freedom.
The Purpose Of incremental_deserialize()
incremental_deserialize()
must be a generator function
(or at least return a generator iterator) that yields None
until all the data parts of the packet have been retrieved and parsed.
This generator must return a pair of (packet, remainder)
where packet
is the deserialized packet and remainder
is any
superfluous trailing bytes that was useless.
At each yield
checkpoint, the endpoint implementation sends to the generator the data received from the remote endpoint.
44def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45 data = yield
46 newline = b"\r\n"
47 while (index := data.find(newline)) < 0:
48 data += yield
49
50 remainder = data[index + len(newline) :]
51 data = data[:index]
52
53 try:
54 document = self._load(data)
55 except (UnicodeError, json.JSONDecodeError) as exc:
56 raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58 return document, remainder
Note
Even if we could create 5 more JSON packets from remainder
, incremental_deserialize()
must always deserialize the first one available
and return the rest as is.
This allows the endpoint implementation to deserialize only the needed packet. The rest is reused when the application wants an other packet.
See also
- Serializer implementation tools
Regroups helpers for (incremental) serializer implementations.
- PEP 255 — Simple Generators
The proposal for adding generators and the
yield
statement to Python.- PEP 342 — Coroutines via Enhanced Generators
The proposal to enhance the API and syntax of generators, making them usable as simple coroutines.
- PEP 380 — Syntax for Delegating to a Subgenerator
The proposal to introduce the
yield_from
syntax, making delegation to subgenerators easy.
Parsing Error
The incremental_deserialize()
function must raise an IncrementalDeserializeError
to indicate that a parsing error was “expected” so that the received data is considered invalid.
44def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45 data = yield
46 newline = b"\r\n"
47 while (index := data.find(newline)) < 0:
48 data += yield
49
50 remainder = data[index + len(newline) :]
51 data = data[:index]
52
53 try:
54 document = self._load(data)
55 except (UnicodeError, json.JSONDecodeError) as exc:
56 raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58 return document, remainder
Warning
Otherwise, any other error is considered a serializer crash.
If DeserializeError
is raised instead, this is converted to a RuntimeError
.
Note
IncrementalDeserializeError
needs the possible valid remainder, that is not the root cause of the error.
In the example, even if data
is an invalid JSON object, all bytes after the \r\n
token (in remainder
) are not lost.