What's Changed
π Count-based windows
Count-based windows allow aggregating events based on their number instead of time.
They can be helpful when time is not relevant to the particular aggregation or when a large number of out-of-order events are expected in the data stream.
Count-based windows support the same aggregations as time-based windows, including `.reduce()` and `.collect()`.
Supported window types:
- `tumbling_count_window()` - slice incoming stream into fixed-sized batches
- `hopping_count_window()` - slice incoming stream into overlapping batches of a fixed size with a fixed step.
- `sliding_count_window()` - same as to count-based hopping windows with a step of 1 (e.g., last 10 events in the stream)
**Example:**
from quixstreams import Application
app = Application(...)
sdf = app.dataframe(...)
sdf = (
Define a count-based tumbling window of size 3
sdf.tumbling_count_window(count=3)
Specify the "collect" aggregate function
.collect()
Emit updates once the window is closed
.final()
)
Expected output:
{
"value": [<event1>, <event2>, <event3>],
"start": <min timestamp in the batch>,
"end": <max timestamp in the batch>
}
See the ["Windowed Aggregations"](https://quix.io/docs/quix-streams/windowing.html) docs page for more info.
By quentin-quix in https://github.com/quixio/quix-streams/pull/736 #739
π New Connectors
* [MongoDB Sink](https://quix.io/docs/quix-streams/connectors/sinks/mongodb-sink.html)
* [Neo4j Sink](https://quix.io/docs/quix-streams/connectors/sinks/neo4j-sink.html)
By tim-quix in https://github.com/quixio/quix-streams/pull/733 #727
π A callback to react to late messages in Windows
Time-based windows can now accept `on_late` callbacks to react to late messages in the windows.
You can use this callback to customize the logging of such messages or to send them to some dead-letter queue, for example.
**Example**:
from typing import Any
from datetime import timedelta
from quixstreams import Application
app = Application(...)
sdf = app.dataframe(...)
def on_late(
value: Any, Record value
key: Any, Record key
timestamp_ms: int, Record timestamp
late_by_ms: int, How late the record is in milliseconds
start: int, Start of the target window
end: int, End of the target window
name: str, Name of the window state store
topic: str, Topic name
partition: int, Topic partition
offset: int, Message offset
) -> bool:
"""
Define a callback to react on late records coming into windowed aggregations.
Return `False` to suppress the default logging behavior.
"""
print(f"Late message is detected at the window {(start, end)}")
return False
Define a 1-hour tumbling window and provide the "on_late" callback to it
sdf.tumbling_window(timedelta(hours=1), on_late=on_late)
Start the application
if __name__ == '__main__':
app.run()
See more in [the docs](https://quix.io/docs/quix-streams/windowing.html#reacting-on-late-events)
by daniil-quix in 701 732
π¦ Bugfixes
* Do not process late messages in sliding windows by gwaramadze in https://github.com/quixio/quix-streams/pull/728
Other Changes
* StreamingDataFrame.merge(): prep work by daniil-quix in https://github.com/quixio/quix-streams/pull/725
* windows: extract base class for windows and window definitions by quentin-quix in https://github.com/quixio/quix-streams/pull/730
* state: refactor collection store to not rely on timestamp by quentin-quix in https://github.com/quixio/quix-streams/pull/734
**Full Changelog**: https://github.com/quixio/quix-streams/compare/v3.7.0...v3.8.0