In this release we introduce new features as well as several breaking changes.
Please read the notes carefully before upgrading to 2.6.0.
------
What's changed
[BREAKING] The original Timestamps and Headers are now passed to the output when using `StreamingDataFrame.to_topic()`
Previously, `StreamingDataFrame.to_topic()` used the current epoch when producing output messages were used, and headers were omitted.
Since version 2.6.0, Quix Streams passes the original timestamps and headers of the messages to the output topics for more consistent data processing.
This change affects the data in the output topics, therefore it is marked a breaking one.
If you want to keep the previous behavior, you may set the timestamp to the current epoch and drop message headers before producing the output message:
python
import time
output_topic = app.topic(...)
sdf = app.dataframe(...)
Do some processing here ...
Set the timestamp to the current epoch
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))
Set empty message headers
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [])
Producing message to the output topic
sdf = sdf.to_topic(output_topic)
[BREAKING] Window results timestamps are set to the window start by default
Since 2.6.0, the results of the windowed aggregations use the window start timestamp as a message timestamp.
You may adjust the timestamps using the new `StreamingDataFrame.set_timestamp()` API.
[BREAKING] Removed `key` and `timestamp` attributes from the `MessageContext` class
To access the message keys and timestamps, please use the new API described below
[BREAKING] `final()` and `current()` methods of Windows don't have the `expand` parameter anymore
[NEW] New APIs to access and update message metadata in `StreamingDataFrame`
Accessing message metadata during processing
Docs:
- https://quix.io/docs/quix-streams/processing.html#accessing-kafka-keys-timestamps-and-headers
Previously, the Kafka message metadata resided in a separate `MessageContext` instance.
For example, to access a message key or a timestamp, users needed to import a `quixstreams.message_context` function, which is not straightforward:
python
from quixstreams import message_context
sdf = app.dataframe(...)
Previous way of getting a message key in versions < 2.6.0
sdf['message_key'] = sdf.apply(lambda value: message_context().key)
Now, the `.apply()`, `.filter()`, and `.update()` methods of `StreamingDataFrame` accept a new parameter - `metadata=True`.
Passing `metadata=True` to any of the functions above will inform `StreamingDataFrame` to provide additional positional arguments with the message metadata to the callback.
Example:
from quixstreams import Application
sdf = app.dataframe(...) a StreamingDataFrame instance
Using a message key to filter incoming messages
Note that the callback now must accept four positional arguments instead of one.
sdf = sdf.filter(lambda value, key, timestamp, headers: key != b'BAD_KEY', metadata=True)
This way, you may access metadata without additional imports.
Updating timestamps and headers with `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()`
Docs:
- https://quix.io/docs/quix-streams/processing.html#updating-kafka-timestamps
- https://quix.io/docs/quix-streams/processing.html#updating-kafka-headers
Since version 2.6.0, you can update timestamps and message headers during processing using the `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()` methods.
These methods accept callbacks similar to other operations, so you can use
The new timestamps will be used in windowed aggregations and when producing messages to the output topics using the `StreamingDataFrame.to_topic()` method.
The new headers will be set for the output messages as well.
Examples:
python
import time
sdf = app.dataframe(...)
Update the timestamp to be the current epoch using the "set_timestamp" API.
"set_timestamp()" requires the callback accepting four positional arguments: value, key, current timestamp, and headers.
The callback must return a new timestamp as integer in milliseconds.
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))
Add the value of APP_VERSION to the message headers for debugging purposes using the "set_headers()" API.
"set_headers()" also requires the callback accepting four positional arguments: value, key, timestamp, and current headers.
It must return a new set of headers as a list of (header, value) tuples.
If incoming message doesn't have headers attached, the "headers" parameter will be None.