- Custom Serialization for Managed State
- Using custom state serializers
- State serializers and schema evolution
- The TypeSerializerSnapshot abstraction
- How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions
- Off-heap state backends (e.g. RocksDBStateBackend)
- Heap state backends (e.g. MemoryStateBackend, FsStateBackend)
- Predefined convenient TypeSerializerSnapshot classes
- Implementing a SimpleTypeSerializerSnapshot
- Implementing a CompositeTypeSerializerSnapshot
- Implementation notes and best practices
- 1. Flink restores serializer snapshots by instantiating them with their classname
- 2. Avoid sharing the same TypeSerializerSnapshot class across different serializers
- 3. Avoid using Java serialization for serializer snapshot content
- Migrating from deprecated serializer snapshot APIs before Flink 1.7
Custom Serialization for Managed State
This page is targeted as a guideline for users who require the use of custom serialization for their state, coveringhow to provide a custom state serializer as well as guidelines and best practices for implementing serializers that allowstate schema evolution.
If you’re simply using Flink’s own serializers, this page is irrelevant and can be ignored.
Using custom state serializers
When registering a managed operator or keyed state, a StateDescriptor
is requiredto specify the state’s name, as well as information about the type of the state. The type information is used by Flink’stype serialization framework to create appropriate serializers for the state.
It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,simply by directly instantiating the StateDescriptor
with your own TypeSerializer
implementation:
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"state-name",
new CustomTypeSerializer());
checkpointedState = getRuntimeContext().getListState(descriptor);
class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
val descriptor = new ListStateDescriptor[(String, Integer)](
"state-name",
new CustomTypeSerializer)
)
checkpointedState = getRuntimeContext.getListState(descriptor)
State serializers and schema evolution
This section explains the user-facing abstractions related to state serialization and schema evolution, and necessaryinternal details about how Flink interacts with these abstractions.
When restoring from savepoints, Flink allows changing the serializers used to read and write previously registered state,so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will beregistered for the state (i.e., the serializer that comes with the StateDescriptor
used to access the state in therestored job). This new serializer may have a different schema than that of the previous serializer. Therefore, whenimplementing state serializers, besides the basic logic of reading / writing data, another important thing to keep inmind is how the serialization schema can be changed in the future.
When speaking of schema, in this context the term is interchangeable between referring to the data model of a statetype and the serialized binary format of a state type. The schema, generally speaking, can change for a few cases:
- Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state.
- Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded.
- Configuration of the serializer has changed.
In order for the new execution to have information about the written schema of state and detect whether or not theschema has changed, upon taking a savepoint of an operator’s state, a snapshot of the state serializer needs to bewritten along with the state bytes. This is abstracted a
TypeSerializerSnapshot
, explained in the next subsection.
The TypeSerializerSnapshot abstraction
public interface TypeSerializerSnapshot<T> {
int getCurrentVersion();
void writeSnapshot(DataOuputView out) throws IOException;
void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException;
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);
TypeSerializer<T> restoreSerializer();
}
public abstract class TypeSerializer<T> {
// ...
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}
A serializer’s TypeSerializerSnapshot
is a point-in-time information that serves as the single source of truth aboutthe state serializer’s write schema, as well as any additional information mandatory to restore a serializer thatwould be identical to the given point-in-time. The logic about what should be written and read at restore timeas the serializer snapshot is defined in the writeSnapshot
and readSnapshot
methods.
Note that the snapshot’s own write schema may also need to change over time (e.g. when you wish to add more informationabout the serializer to the snapshot). To facilitate this, snapshots are versioned, with the current versionnumber defined in the getCurrentVersion
method. On restore, when the serializer snapshot is read from savepoints,the version of the schema in which the snapshot was written in will be provided to the readSnapshot
method so thatthe read implementation can handle different versions.
At restore time, the logic that detects whether or not the new serializer’s schema has changed should be implemented inthe resolveSchemaCompatibility
method. When previous registered state is registered again with new serializers in therestored execution of an operator, the new serializer is provided to the previous serializer’s snapshot via this method.This method returns a TypeSerializerSchemaCompatibility
representing the result of the compatibility resolution,which can be one of the following:
TypeSerializerSchemaCompatibility.compatibleAsIs()
: this result signals that the new serializer is compatible, meaning that the new serializer has identical schema with the previous serializer. It is possible that the new serializer has been reconfigured in theresolveSchemaCompatibility
method so that it is compatible.TypeSerializerSchemaCompatibility.compatibleAfterMigration()
: this result signals that the new serializer has a different serialization schema, and it is possible to migrate from the old schema by using the previous serializer (which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with the new serializer (which recognizes the new schema).TypeSerializerSchemaCompatibility.incompatible()
: this result signals that the new serializer has a different serialization schema, but it is not possible to migrate from the old schema. The last bit of detail is how the previous serializer is obtained in the case that migration is required.Another important role of a serializer’sTypeSerializerSnapshot
is that it serves as a factory to restorethe previous serializer. More specifically, theTypeSerializerSnapshot
should implement therestoreSerializer
methodto instantiate a serializer instance that recognizes the previous serializer’s schema and configuration, and can thereforesafely read data written by the previous serializer.
How Flink interacts with the TypeSerializer and TypeSerializerSnapshot abstractions
To wrap up, this section concludes how Flink, or more specifically the state backends, interact with theabstractions. The interaction is slightly different depending on the state backend, but this is orthogonalto the implementation of state serializers and their serializer snapshots.
Off-heap state backends (e.g. RocksDBStateBackend)
- Register new state with a state serializer that has schema A
- the registered
TypeSerializer
for the state is used to read / write state on every state access. - State is written in schema A.
- the registered
- Take a savepoint
- The serializer snapshot is extracted via the
TypeSerializer#snapshotConfiguration
method. - The serializer snapshot is written to the savepoint, as well as the already-serialized state bytes (with schema A).
- The serializer snapshot is extracted via the
- Restored execution re-accesses restored state bytes with new state serializer that has schema B
- The previous state serializer’s snapshot is restored.
- State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema A).
- Upon receiving the new serializer, it is provided to the restored previous serializer’s snapshot via the
TypeSerializer#resolveSchemaCompatibility
to check for schema compatibility.
- Migrate state bytes in backend from schema A to schema B
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is performed. The previous state serializer which recognizes schema A will be obtained from the serializer snapshot, via
TypeSerializerSnapshot#restoreSerializer()
, and is used to deserialize state bytes to objects, which in turn are re-written again with the new serializer, which recognizes schema B to complete the migration. All entries of the accessed state is migrated all-together before processing continues. - If the resolution signals incompatibility, then the state access fails with an exception.
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is performed. The previous state serializer which recognizes schema A will be obtained from the serializer snapshot, via
Heap state backends (e.g. MemoryStateBackend, FsStateBackend)
- Register new state with a state serializer that has schema A
- the registered
TypeSerializer
is maintained by the state backend.
- the registered
- Take a savepoint, serializing all state with schema A
- The serializer snapshot is extracted via the
TypeSerializer#snapshotConfiguration
method. - The serializer snapshot is written to the savepoint.
- State objects are now serialized to the savepoint, written in schema A.
- The serializer snapshot is extracted via the
- On restore, deserialize state into objects in heap
- The previous state serializer’s snapshot is restored.
- The previous serializer, which recognizes schema A, is obtained from the serializer snapshot, via
TypeSerializerSnapshot#restoreSerializer()
, and is used to deserialize state bytes to objects. - From now on, all of the state is already deserialized.
- Restored execution re-accesses previous state with new state serializer that has schema B
- Upon receiving the new serializer, it is provided to the restored previous serializer’s snapshot via the
TypeSerializer#resolveSchemaCompatibility
to check for schema compatibility. - If the compatibility check signals that migration is required, nothing happens in this case since for heap backends, all state is already deserialized into objects.
- If the resolution signals incompatibility, then the state access fails with an exception.
- Upon receiving the new serializer, it is provided to the restored previous serializer’s snapshot via the
- Take another savepoint, serializing all state with schema B
- Same as step 2., but now state bytes are all in schema B.
Predefined convenient TypeSerializerSnapshot classes
Flink provides two abstract base TypeSerializerSnapshot
classes that can be used for typical scenarios:SimpleTypeSerializerSnapshot
and CompositeTypeSerializerSnapshot
.
Serializers that provide these predefined snapshots as their serializer snapshot must always have their own, independentsubclass implementation. This corresponds to the best practice of not sharing snapshot classesacross different serializers, which is more thoroughly explained in the next section.
Implementing a SimpleTypeSerializerSnapshot
The SimpleTypeSerializerSnapshot
is intended for serializers that do not have any state or configuration,essentially meaning that the serialization schema of the serializer is solely defined by the serializer’s class.
There will only be 2 possible results of the compatibility resolution when using the SimpleTypeSerializerSnapshot
as your serializer’s snapshot class:
TypeSerializerSchemaCompatibility.compatibleAsIs()
, if the new serializer class remains identical, orTypeSerializerSchemaCompatibility.incompatible()
, if the new serializer class is different then the previous one.
Below is an example of how the SimpleTypeSerializerSnapshot
is used, using Flink’s IntSerializer
as an example:
public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot<Integer> {
public IntSerializerSnapshot() {
super(() -> IntSerializer.INSTANCE);
}
}
The IntSerializer
has no state or configurations. Serialization format is solely defined by the serializerclass itself, and can only be read by another IntSerializer
. Therefore, it suits the use case of theSimpleTypeSerializerSnapshot
.
The base super constructor of the SimpleTypeSerializerSnapshot
expects a Supplier
of instancesof the corresponding serializer, regardless of whether the snapshot is currently being restored or being written duringsnapshots. That supplier is used to create the restore serializer, as well as type checks to verify that thenew serializer is of the same expected serializer class.
Implementing a CompositeTypeSerializerSnapshot
The CompositeTypeSerializerSnapshot
is intended for serializers that rely on multiple nested serializers for serialization.
Before further explanation, we call the serializer, which relies on multiple nested serializer(s), as the “outer” serializer in this context.Examples for this could be MapSerializer
, ListSerializer
, GenericArraySerializer
, etc.Consider the MapSerializer
, for example - the key and value serializers would be the nested serializers,while MapSerializer
itself is the “outer” serializer.
In this case, the snapshot of the outer serializer should also contain snapshots of the nested serializers, so thatthe compatibility of the nested serializers can be independently checked. When resolving the compatibility of theouter serializer, the compatibility of each nested serializer needs to be considered.
CompositeTypeSerializerSnapshot
is provided to assist in the implementation of snapshots for these kind ofcomposite serializers. It deals with reading and writing the nested serializer snapshots, as well as resolvingthe final compatibilty result taking into account the compatibility of all nested serializers.
Below is an example of how the CompositeTypeSerializerSnapshot
is used, using Flink’s MapSerializer
as an example:
public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot<Map<K, V>, MapSerializer> {
private static final int CURRENT_VERSION = 1;
public MapSerializerSnapshot() {
super(MapSerializer.class);
}
public MapSerializerSnapshot(MapSerializer<K, V> mapSerializer) {
super(mapSerializer);
}
@Override
public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
protected MapSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<K> keySerializer = (TypeSerializer<K>) nestedSerializers[0];
TypeSerializer<V> valueSerializer = (TypeSerializer<V>) nestedSerializers[1];
return new MapSerializer<>(keySerializer, valueSerializer);
}
@Override
protected TypeSerializer<?>[] getNestedSerializers(MapSerializer outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.getKeySerializer(), outerSerializer.getValueSerializer() };
}
}
When implementing a new serializer snapshot as a subclass of CompositeTypeSerializerSnapshot
,the following three methods must be implemented:
#getCurrentOuterSnapshotVersion()
: This method defines the version ofthe current outer serializer snapshot’s serialized binary format.#getNestedSerializers(TypeSerializer)
: Given the outer serializer, returns its nested serializers.#createOuterSerializerWithNestedSerializers(TypeSerializer[])
:Given the nested serializers, create an instance of the outer serializer.
The above example is a CompositeTypeSerializerSnapshot
where there are no extra information to be snapshottedapart from the nested serializers’ snapshots. Therefore, its outer snapshot version can be expected to neverrequire an uptick. Some other serializers, however, contains some additional static configurationthat needs to be persisted along with the nested component serializer. An example for this would be Flink’sGenericArraySerializer
, which contains as configuration the class of the array element type, besidesthe nested element serializer.
In these cases, an additional three methods need to be implemented on the CompositeTypeSerializerSnapshot
:
#writeOuterSnapshot(DataOutputView)
: defines how the outer snapshot information is written.#readOuterSnapshot(int, DataInputView, ClassLoader)
: defines how the outer snapshot information is read.#isOuterSnapshotCompatible(TypeSerializer)
: checks whether the outer snapshot information remains identical.
By default, the CompositeTypeSerializerSnapshot
assumes that there isn’t any outer snapshot information toread / write, and therefore have empty default implementations for the above methods. If the subclasshas outer snapshot information, then all three methods must be implemented.
Below is an example of how the CompositeTypeSerializerSnapshot
is used for composite serializer snapshotsthat do have outer snapshot information, using Flink’s GenericArraySerializer
as an example:
public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {
private static final int CURRENT_VERSION = 1;
private Class<C> componentClass;
public GenericArraySerializerSnapshot() {
super(GenericArraySerializer.class);
}
public GenericArraySerializerSnapshot(GenericArraySerializer<C> genericArraySerializer) {
super(genericArraySerializer);
this.componentClass = genericArraySerializer.getComponentClass();
}
@Override
protected int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}
@Override
protected void writeOuterSnapshot(DataOutputView out) throws IOException {
out.writeUTF(componentClass.getName());
}
@Override
protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.componentClass = InstantiationUtil.resolveClassByName(in, userCodeClassLoader);
}
@Override
protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
return this.componentClass == newSerializer.getComponentClass();
}
@Override
protected GenericArraySerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
TypeSerializer<C> componentSerializer = (TypeSerializer<C>) nestedSerializers[0];
return new GenericArraySerializer<>(componentClass, componentSerializer);
}
@Override
protected TypeSerializer<?>[] getNestedSerializers(GenericArraySerializer outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.getComponentSerializer() };
}
}
There are two important things to notice in the above code snippet. First of all, since thisCompositeTypeSerializerSnapshot
implementation has outer snapshot information that is written as part of the snapshot,the outer snapshot version, as defined by getCurrentOuterSnapshotVersion()
, must be upticked whenever theserialization format of the outer snapshot information changes.
Second of all, notice how we avoid using Java serialization when writing the component class, by only writingthe classname and dynamically loading it when reading back the snapshot. Avoiding Java serialization for writingcontents of serializer snapshots is in general a good practice to follow. More details about this is covered in thenext section.
Implementation notes and best practices
1. Flink restores serializer snapshots by instantiating them with their classname
A serializer’s snapshot, being the single source of truth for how a registered state was serialized, serves as anentry point to reading state in savepoints. In order to be able to restore and access previous state, the previous stateserializer’s snapshot must be able to be restored.
Flink restores serializer snapshots by first instantiating the TypeSerializerSnapshot
with its classname (writtenalong with the snapshot bytes). Therefore, to avoid being subject to unintended classname changes or instantiationfailures, TypeSerializerSnapshot
classes should:
- avoid being implemented as anonymous classes or nested classes,
- have a public, nullary constructor for instantiation
2. Avoid sharing the same TypeSerializerSnapshot class across different serializers
Since schema compatibility checks goes through the serializer snapshots, having multiple serializers returningthe same TypeSerializerSnapshot
class as their snapshot would complicate the implementation for theTypeSerializerSnapshot#resolveSchemaCompatibility
and TypeSerializerSnapshot#restoreSerializer()
method.
This would also be a bad separation of concerns; a single serializer’s serialization schema,configuration, as well as how to restore it, should be consolidated in its own dedicated TypeSerializerSnapshot
class.
3. Avoid using Java serialization for serializer snapshot content
Java serialization should not be used at all when writing contents of a persisted serializer snapshot.Take for example, a serializer which needs to persist a class of its target type as part of its snapshot.Information about the class should be persisted by writing the class name, instead of directly serializing the classusing Java. When reading the snapshot, the class name is read, and used to dynamically load the class via the name.
This practice ensures that serializer snapshots can always be safely read. In the above example, if the type classwas persisted using Java serialization, the snapshot may no longer be readable once the class implementation has changedand is no longer binary compatible according to Java serialization specifics.
Migrating from deprecated serializer snapshot APIs before Flink 1.7
This section is a guide for API migration from serializers and serializer snapshots that existed before Flink 1.7.
Before Flink 1.7, serializer snapshots were implemented as a TypeSerializerConfigSnapshot
(which is now deprecated,and will eventually be removed in the future to be fully replaced by the new TypeSerializerSnapshot
interface).Moreover, the responsibility of serializer schema compatibility checks lived within the TypeSerializer
,implemented in the TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
method.
Another major difference between the new and old abstractions is that the deprecated TypeSerializerConfigSnapshot
did not have the capability of instantiating the previous serializer. Therefore, in the case where your serializerstill returns a subclass of TypeSerializerConfigSnapshot
as its snapshot, the serializer instance itself will alwaysbe written to savepoints using Java serialization so that the previous serializer may be available at restore time.This is very undesirable, since whether or not restoring the job will be successful is susceptible to availabilityof the previous serializer’s class, or in general, whether or not the serializer instance can be read back at restoretime using Java serialization. This means that you be limited to the same serializer for your state,and could be problematic once you want to upgrade serializer classes or perform schema migration.
To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended tomigrate from the old abstractions. The steps to do this is as follows:
- Implement a new subclass of
TypeSerializerSnapshot
. This will be the new snapshot for your serializer. - Return the new
TypeSerializerSnapshot
as the serializer snapshot for your serializer in theTypeSerializer#snapshotConfiguration()
method. - Restore the job from the savepoint that existed before Flink 1.7, and then take a savepoint again. Note that at this step, the old
TypeSerializerConfigSnapshot
of the serializer must still exist in the classpath, and the implementation for theTypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
method must not be removed. The purpose of this process is to replace theTypeSerializerConfigSnapshot
written in old savepoints with the newly implementedTypeSerializerSnapshot
for the serializer. - Once you have a savepoint taken with Flink 1.7, the savepoint will contain
TypeSerializerSnapshot
as the state serializer snapshot, and the serializer instance will no longer be written in the savepoint. At this point, it is now safe to remove all implementations of the old abstraction (remove the oldTypeSerializerConfigSnapshot
implementation as will as theTypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)
from the serializer).