Fluid Messaging

Simon Maxen
5 min readOct 18, 2018

The aim the Fluid Manifesto is to promote a set of techniques and approaches that allow web services to be continuously updated. Fluid Messaging describes how a similar approach can be used when working with asynchronous messaging systems.

Approach

As a publisher we want to be able to start publishing an improved version of the message as soon as an updated version of the publishing service is available for deployment.

Consider the situation shows below where a consumer (C1) of the topic is working their way through records (left to right below) that have the V3 structure they understand.

We need to ensure that the C1 consumer:

  • Does not read V4 structured data
  • Does continue to get a stream of V3 structured data until they upgrade to the new data structure.

This is done by by always changing the consumer to only read from the main topic while the data structure is V3 and then to switch to reading from a temporary topic that will contain the V4 structured records downgraded to V3.

On restart the consumer will follow the same procedure, read from the main topic, read a singleV4 record discard and then move to the downgraded topic.

When we upgrade the consumer to work with V4 structured records we need to ensure that we do not re-process the records we processed on the temporary topic so on system startup we need to fast-forward our main topic offset before consuming.

Implementation

In this section I discuss how this is implemented using features available in Apache Kafka. It also assumes the payload is JSON backed by a JSON schema representation.

Metadata

For each topic any change to the JSON schema that represents the payload needs to result in a new version of fluid metadata. Each version (schema-version) of the metadata contains:

  • Which JSON schema representations are associated with each topic
  • Downgrade scripts that can be used to downgrade the latest JSON structure to an earlier one.

How this is maintained on a commit by commit basis is described in Fluid Schema Evolution.

Message Schema Version

In order to know when it is necessary to switch to a downgraded topic every Kafka message published must contain a fluid-schema-version header which contains the schema-version that the publisher was developed against.

Bridge

We need to detect when a non-compatible structural change has been made to a topic payload and then populate a fallback topic with downgraded messages. The Fluid Messaging Bridge does this and is described next.

Fluid Messaging Bridge

The Fluid Messaging Bridge is configured with the metadata and the Kafka cluster being used for messaging. It provides the following web service endpoints.

Downgrade Topic

GET /fluid/downgrade-topic?
topic=<topic>&
group=<groupdId>&
compatible-schema-version=<version>

Returns the name of the downgrade topic for the given topic and client schema-version. There will always be a downgrade topic even if the for-schema-version is the latest schema-version or there is no downgrading currently needed. This would usually be called once at startup.

Schema Version Compatible

GET /fluid/compatible?
topic=<topic>&
observed-schema-version=<version>&
compatible-schema-version=<version>

Returns true if the observed schema-version is compatible with the compatible schema-version. For example if the observed schema-version does not change any objects published on this topic the answer will be true. The result for a given parameter list does not change so can be cached. It should be called every time a new schema-version is observed on a message.

When the schema versions are not compatible false will be returned. As this point two things happen:

  • The client will terminate reading on the main topic and switch to the downgrade topic.
  • The Fluid Message Bridge will derive a starting offset by finding the maximum offset associated with any of the registered groupIds. It will then start reading from the topic looking for the first observed-schema-version message. Starting from that message all messages will be passed through a downgrade script and re-published onto the downgrade topic.

Upgrade Schema Version

DELETE /fluid/topics/<topic>/groups/<groupId>/schema-version

The following location is set as a side effect when a downgrade topic is requested so the bridge can keep track of which topic groups are using which schema-versions.

Deleting the schema-version will result in the offset on the topic being fast forwarded such that the point is is the message ahead of the last downgraded message read. When the client reads from the main topic it will not be given messages it has already processed.

Additionally when the bridge detects that all groups that were using downgrade topic have been upgraded it will stop republishing messages from the main topic onto the downgrade topic.

Finally note that it is only necessary to call this endpoint when a groupId is no longer being used. This is because the previous schema-version will be deleted and a new one put in place automatically whenever a request for a temporary topic is made with an updated schema-version.

Consumer Application Operation

On startup a web service call is made to /fluid/downgrade-topic to get the downgrade topic for the main topic. The application now has two topics at startup. The application starts reading messages from the main topic but for each record checks that the fluid-schema-version record header is compatible with the version the application was developed against (this is done with a web service call to /fluid/compatible). As soon as a non-compatible message is detected the consumer stops reading from the main topic and switches to reading from the downgrade-topic established at startup.

Conclusion

The great thing about the approach and implementation described above is that the bridge comes to the rescue only when needed and then only for as long as is needed. The clients working through their topic messages do the detection and the give the bridge a heads-up (see Schema Version Compatible) that some downgrading is required.

--

--