How-to — Serializers


The Basics

Where Are They Used?

A serializer is used by a protocol object:

Which Ones Are Already Available?

Several serializers are provided in the easynetwork.serializers module. Do not hesitate to use them.

If nothing fits your needs, you can implement your own serializer.

Writing A One-Shot Serializer

One-shot serializers are the easiest piece of code to write. They can be used directly on DatagramProtocol instances, and you can use a serializer wrapper to use the StreamProtocol class.

See also

easynetwork.serializers.wrapper module

The full list of available wrappers for serializers.

To write a one-shot serializer, you must create a subclass of AbstractPacketSerializer and override its serialize() and deserialize() methods.

A naive implementation of JSONSerializer should look something like this:

 1from __future__ import annotations
 2
 3import json
 4from typing import Any, TypeAlias
 5
 6from easynetwork.serializers.abc import AbstractPacketSerializer
 7
 8SentPacket: TypeAlias = Any
 9ReceivedPacket: TypeAlias = Any
10
11
12class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
13    def serialize(self, packet: SentPacket) -> bytes:
14        document = json.dumps(packet)
15        return document.encode()
16
17    def deserialize(self, data: bytes) -> ReceivedPacket:
18        document = data.decode()
19        return json.loads(document)

Parsing Error

The deserialize() function must raise a DeserializeError to indicate that a parsing error was “expected” so that the received data is considered invalid.

 1from __future__ import annotations
 2
 3import json
 4from typing import Any, TypeAlias
 5
 6from easynetwork.exceptions import DeserializeError
 7from easynetwork.serializers.abc import AbstractPacketSerializer
 8
 9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
14    def serialize(self, packet: SentPacket) -> bytes:
15        document = json.dumps(packet)
16        return document.encode()
17
18    def deserialize(self, data: bytes) -> ReceivedPacket:
19        try:
20            document = data.decode()
21            return json.loads(document)
22        except (UnicodeError, json.JSONDecodeError) as exc:
23            raise DeserializeError("JSON decode error") from exc

Warning

Otherwise, any other error is considered a serializer crash.

The Use Of self

A serializer is intended to be shared by multiple protocols (and protocols are intended to be shared by multiple endpoints).

Therefore, the object should only store additional configuration used for serialization/deserialization.

For example:

 1from __future__ import annotations
 2
 3import json
 4from typing import Any, TypeAlias
 5
 6from easynetwork.exceptions import DeserializeError
 7from easynetwork.serializers.abc import AbstractPacketSerializer
 8
 9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AbstractPacketSerializer[SentPacket, ReceivedPacket]):
14    def __init__(self, *, ensure_ascii: bool = True) -> None:
15        self._ensure_ascii: bool = ensure_ascii
16
17        self._encoding: str
18        if self._ensure_ascii:
19            self._encoding = "ascii"
20        else:
21            self._encoding = "utf-8"
22
23    def serialize(self, packet: SentPacket) -> bytes:
24        document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
25        return document.encode(self._encoding)
26
27    def deserialize(self, data: bytes) -> ReceivedPacket:
28        try:
29            document = data.decode(self._encoding)
30            return json.loads(document)
31        except (UnicodeError, json.JSONDecodeError) as exc:
32            raise DeserializeError("JSON decode error") from exc

Warning

Do not store per-serialization data. You might see some magic.

Danger

Seriously, don’t do that.

Using A One-Shot Serializer

You must pass the serializer to the appropriate protocol object that is expected by the endpoint class:

1def main() -> None:
2    serializer = MyJSONSerializer()
3    protocol = DatagramProtocol(serializer)
4
5    with UDPNetworkClient(("remote_address", 12345), protocol) as endpoint:
6        endpoint.send_packet({"data": 42})
7
8        ...

Note

Using a serializer wrapper means that the transferred data can be completely different from the original output.

If this is important to you, don’t choose one of them lightly.

Writing An Incremental Serializer

Incremental serializers are a bit trickier to implement. They can be used directly on both StreamProtocol and DatagramProtocol instances.

To write an incremental serializer, you must create a subclass of AbstractIncrementalPacketSerializer and override its incremental_serialize() and incremental_deserialize() methods. The serialize() and deserialize() methods have a default implementation that uses the incremental serialization methods.

Option 1: Using Common Patterns

Chances are that the communication protocol uses a simple principle to determine the end of a packet. The most common cases are:

  • All your packet frames use a precise byte sequence (most likely a newline).

  • Each packet has a fixed size.

In these cases you can use the base classes in easynetwork.serializers.base_stream.

Let’s say that for the incremental part, we consider each line received to be a JSON object, separated by \r\n:

 1from __future__ import annotations
 2
 3import json
 4from typing import Any, TypeAlias
 5
 6from easynetwork.exceptions import DeserializeError
 7from easynetwork.serializers.base_stream import AutoSeparatedPacketSerializer
 8
 9SentPacket: TypeAlias = Any
10ReceivedPacket: TypeAlias = Any
11
12
13class MyJSONSerializer(AutoSeparatedPacketSerializer[SentPacket, ReceivedPacket]):
14    def __init__(self, *, ensure_ascii: bool = True) -> None:
15        super().__init__(separator=b"\r\n")
16
17        self._ensure_ascii: bool = ensure_ascii
18
19        self._encoding: str
20        if self._ensure_ascii:
21            self._encoding = "ascii"
22        else:
23            self._encoding = "utf-8"
24
25    def serialize(self, packet: SentPacket) -> bytes:
26        document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
27        return document.encode(self._encoding)
28
29    def deserialize(self, data: bytes) -> ReceivedPacket:
30        try:
31            document = data.decode(self._encoding)
32            return json.loads(document)
33        except (UnicodeError, json.JSONDecodeError) as exc:
34            raise DeserializeError("JSON decode error") from exc

AutoSeparatedPacketSerializer adds the following behaviors:

  • incremental_serialize() will append \r\n to the end of the serialize() output.

  • incremental_deserialize() waits until \r\n is found in the input, removes the separator, and calls deserialize() on it.

Tip

Take a look at other available base classes in easynetwork.serializers before rewriting something that already exists.

Option 2: From Scratch

Let’s see how we can get by without using the AutoSeparatedPacketSerializer:

 1from __future__ import annotations
 2
 3import json
 4from collections.abc import Generator
 5from typing import Any, TypeAlias
 6
 7from easynetwork.exceptions import DeserializeError, IncrementalDeserializeError
 8from easynetwork.serializers.abc import AbstractIncrementalPacketSerializer
 9
10SentPacket: TypeAlias = Any
11ReceivedPacket: TypeAlias = Any
12
13
14class MyJSONSerializer(AbstractIncrementalPacketSerializer[SentPacket, ReceivedPacket]):
15    def __init__(self, *, ensure_ascii: bool = True) -> None:
16        self._ensure_ascii: bool = ensure_ascii
17
18        self._encoding: str
19        if self._ensure_ascii:
20            self._encoding = "ascii"
21        else:
22            self._encoding = "utf-8"
23
24    def _dump(self, packet: SentPacket) -> bytes:
25        document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
26        return document.encode(self._encoding)
27
28    def _load(self, data: bytes) -> ReceivedPacket:
29        document = data.decode(self._encoding)
30        return json.loads(document)
31
32    def serialize(self, packet: SentPacket) -> bytes:
33        return self._dump(packet)
34
35    def deserialize(self, data: bytes) -> ReceivedPacket:
36        try:
37            return self._load(data)
38        except (UnicodeError, json.JSONDecodeError) as exc:
39            raise DeserializeError("JSON decode error") from exc
40
41    def incremental_serialize(self, packet: SentPacket) -> Generator[bytes, None, None]:
42        yield self._dump(packet) + b"\r\n"
43
44    def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45        data = yield
46        newline = b"\r\n"
47        while (index := data.find(newline)) < 0:
48            data += yield
49
50        remainder = data[index + len(newline) :]
51        data = data[:index]
52
53        try:
54            document = self._load(data)
55        except (UnicodeError, json.JSONDecodeError) as exc:
56            raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58        return document, remainder

This adds a lot of code! Let’s take a closer look at the implementation.

Code Mutualization

To avoid duplication of code between the one-shot part and the incremental part, the serialization/deserialization part of the code goes to a private method.

24def _dump(self, packet: SentPacket) -> bytes:
25    document = json.dumps(packet, ensure_ascii=self._ensure_ascii)
26    return document.encode(self._encoding)
27
28def _load(self, data: bytes) -> ReceivedPacket:
29    document = data.decode(self._encoding)
30    return json.loads(document)

And now serialize() and deserialize() use them instead:

32def serialize(self, packet: SentPacket) -> bytes:
33    return self._dump(packet)
34
35def deserialize(self, data: bytes) -> ReceivedPacket:
36    try:
37        return self._load(data)
38    except (UnicodeError, json.JSONDecodeError) as exc:
39        raise DeserializeError("JSON decode error") from exc

The Purpose Of incremental_serialize()

incremental_serialize() must be a generator function (or at least return a generator iterator) that yields all the parts of the serialized packet. It must also add any useful metadata to help incremental_deserialize() find the end of the packet.

41def incremental_serialize(self, packet: SentPacket) -> Generator[bytes, None, None]:
42    yield self._dump(packet) + b"\r\n"

Most of the time, you will have a single yield. The goal is: each yield must send as many bytes as possible without copying or concatenating.

Tip

There may be exceptions, like this example. (Your RAM will not cry because you added 2 bytes to a byte sequence of almost 100 bytes. The question may be asked if the byte sequence is ending up to 4 GB.)

It is up to you to find the balance between RAM explosion and performance degradation.

Note

The endpoint implementation can decide to concatenate all the pieces and do one big send. However, it may be more attractive to do something else with the returned bytes. incremental_serialize() is here to give endpoints this freedom.

The Purpose Of incremental_deserialize()

incremental_deserialize() must be a generator function (or at least return a generator iterator) that yields None until all the data parts of the packet have been retrieved and parsed.

This generator must return a pair of (packet, remainder) where packet is the deserialized packet and remainder is any superfluous trailing bytes that was useless.

At each yield checkpoint, the endpoint implementation sends to the generator the data received from the remote endpoint.

44def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45    data = yield
46    newline = b"\r\n"
47    while (index := data.find(newline)) < 0:
48        data += yield
49
50    remainder = data[index + len(newline) :]
51    data = data[:index]
52
53    try:
54        document = self._load(data)
55    except (UnicodeError, json.JSONDecodeError) as exc:
56        raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58    return document, remainder

Note

Even if we could create 5 more JSON packets from remainder, incremental_deserialize() must always deserialize the first one available and return the rest as is.

This allows the endpoint implementation to deserialize only the needed packet. The rest is reused when the application wants an other packet.

See also

Serializer implementation tools

Regroups helpers for (incremental) serializer implementations.

PEP 255 — Simple Generators

The proposal for adding generators and the yield statement to Python.

PEP 342 — Coroutines via Enhanced Generators

The proposal to enhance the API and syntax of generators, making them usable as simple coroutines.

PEP 380 — Syntax for Delegating to a Subgenerator

The proposal to introduce the yield_from syntax, making delegation to subgenerators easy.

Parsing Error

The incremental_deserialize() function must raise an IncrementalDeserializeError to indicate that a parsing error was “expected” so that the received data is considered invalid.

44def incremental_deserialize(self) -> Generator[None, bytes, tuple[ReceivedPacket, bytes]]:
45    data = yield
46    newline = b"\r\n"
47    while (index := data.find(newline)) < 0:
48        data += yield
49
50    remainder = data[index + len(newline) :]
51    data = data[:index]
52
53    try:
54        document = self._load(data)
55    except (UnicodeError, json.JSONDecodeError) as exc:
56        raise IncrementalDeserializeError("JSON decode error", remainder) from exc
57
58    return document, remainder

Warning

Otherwise, any other error is considered a serializer crash.

If DeserializeError is raised instead, this is converted to a RuntimeError.

Note

IncrementalDeserializeError needs the possible valid remainder, that is not the root cause of the error. In the example, even if data is an invalid JSON object, all bytes after the \r\n token (in remainder) are not lost.