Quixstreams

Latest version: v3.10.0

Safety actively analyzes 715032 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 4 of 9

2.10.0

What's Changed

Schema Registry Support
Introduced Schema Registry support for JSONSchema, Avro, and Protobuf formats.

To learn how to use Schema Registry, please follow the docs on the [Schema Registry page.](https://quix.io/docs/quix-streams/advanced/schema-registry.html).

PRs: 447, 449, 451, 454, 458, 472, 476).

Dependencies updates
* Support confluent-kafka versions 2.5.x by gwaramadze in https://github.com/quixio/quix-streams/pull/459
* Bump testcontainers from 4.5.1 to 4.8.0 by dependabot in https://github.com/quixio/quix-streams/pull/462
* Update pydantic requirement from <2.8,>=2.7 to >=2.7,<2.9 by dependabot in https://github.com/quixio/quix-streams/pull/463
* Update pydantic-settings requirement from <2.4,>=2.3 to >=2.3,<2.5 by dependabot in https://github.com/quixio/quix-streams/pull/464
* Update pre-commit requirement from <3.5,>=3.4 to >=3.4,<3.9 by dependabot in https://github.com/quixio/quix-streams/pull/465
* Update black requirement from <24.4,>=24.3.0 to >=24.3.0,<24.9 by dependabot in https://github.com/quixio/quix-streams/pull/466


Documentation updates

* fix(docs): minor correction in an example by shrutimantri in https://github.com/quixio/quix-streams/pull/444
* fix(docs): correcting the output showcased for word count with other minor corrections by shrutimantri in https://github.com/quixio/quix-streams/pull/445
* Update docs headers structure by daniil-quix in https://github.com/quixio/quix-streams/pull/456


Other changes
* Application config API by quentin-quix in https://github.com/quixio/quix-streams/pull/470


**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.9.0...v2.10.0

2.9.0

What's Changed
NEW: Optional installs (extras)

With this release, we have introduced optional requirements for various features. These requirements will be outlined alongside its given feature.

To install one, simply do `pip install quixstreams[{extra}]` (or a comma-separated list like `extra1,extra2`)

There is also an option to install all extras with extra=`all` (`pip install quixstreams[all]`)

Features
More Message Serialization Options

Additional serialization options have been added:

- `JSON Schema` (original plain `JSON` option still supported)
- `Avro` (requires installed extra=`avro`)
- `Protobuf` (requires installed extra=`protobuf`)

For more details on their usage, [see the Serialization docs](https://quix.io/docs/quix-streams/advanced/serialization.html).


Sinks (beta)

>NOTE: This feature is in beta; functionality may change at any time!

We have introduced a new `Sink` API/framework for sending data from Kafka to an external destination in a robust manner. It additionally includes a template/class for users to generate their own sink implementations!

We have also included two fully implemented sinks for use out of the box:

- `InfluxDB v3`
- `CSV`

Example usage with `InfluxDB v3`:

python
from quixstreams import Application
from quixstreams.sinks.influxdb3 import InfluxDB3Sink

app = Application(broker_address="localhost:9092")
topic = app.topic("numbers-topic")

Initialize InfluxDB3Sink
influx_sink = InfluxDB3Sink(...params...)

sdf = app.dataframe(topic)
Do some processing here ...
Sink data to InfluxDB
sdf.sink(influx_sink)


For more details on their usage, [see the Sinks docs](https://quix.io/docs/quix-streams/connectors/sinks/index.html)

`commit_every` option for `Applications`

Applications can now commit every `M` consumed messages in addition to every `N` seconds (whichever occurs first for that checkpoint).

By default, it is `0`, which means no limit (how it worked before introducing this setting).

For more details, [see the Checkpoint docs](https://quix.io/docs/quix-streams/advanced/checkpointing.html#configuring-the-checkpointing)

python
app = Application(commit_every=10000)


`errors` option for `StreamingDataFrame.drop()`

You can now ignore the default behavior of an exception being raised when the specified column(s) are missing with `errors="ignore"`.

python
app = Application()
sdf = app.dataframe()
sdf = sdf.drop(["col_a", "col_b"], errors="ignore")



Enhancements
* README updates
* Various Documentation improvements



Changelog
**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.8.1...v2.9.0

2.8.1

What's Changed
Bugfixes
* fix `Topic.deserialize` not using the correct value deserializer by tim-quix in https://github.com/quixio/quix-streams/pull/413

Other changes
* Update docs by daniil-quix in https://github.com/quixio/quix-streams/pull/406
* fix bad timestamp values in groupby tests by tim-quix in https://github.com/quixio/quix-streams/pull/412


**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.8.0...v2.8.1

2.8.0

What's Changed

`.update()` and `.to_topic()` now modify the `StreamingDataFrame` objects in-place.
In previous versions, methods like `StreamingDataFrame.update()` and `StreamingDataFrame.to_topic()` always returned a new SDF object.
We were getting feedback that this behavior is not always obvious, and it's easy to forget to re-assign the result of `.update()` or `.to_topic()` calls to the variable.
Now, both of these methods modify the SDF object in place and return itself, so the previous usage will still work:


sdf = app.dataframe(...)

The SDF is now modified in place and it's not necessary to re-assign it to the variable
sdf.update(...)
sdf.to_topic(...)

This code will keep working as before
sdf = sdf.update(...)
sdf = sdf.to_topic(...)
or
sdf = sdf.update(...).to_topic(...)


⚠️ **Note:** If there's an `sdf.update()`or `sdf.to_topic()` in the code not assigned back to the variable, it will now update the SDF instance.

----
New method `StreamingDataFrame.print()`

Users can now easily debug the value and metadata of the current record in the stream with `StreamingDataFrame.print()`.
By default, it will print the current record value wrapped into a dictionary.
If called with `metadata=True,` it will also print the record's key, timestamp, and headers.

It's aimed to be a shortcut for the previous workaround with `StreamingDataFrame.update(print)`.

Example:

sdf = app.dataframe(...)

Print only the value of the current record
sdf.print()

>>> {'value': {'number': 163937}}
{'value': {'number': 163938}}


Print value and metadata of the current record
sdf.print(metadata=True)

>>> { 'value': {'number': 12175},
'key': b'd22d884a-eb88-44de-b22f-abfdc0b215f6',
'timestamp': 1721129697926,
'headers': [('header', b'123'), ('header', b'1234')]}

-----
New method `StreamingDataFrame.drop()`
With `StreamingDataFrame.drop()`, users can easily drop the unnecessary keys from the dictionary-like values.

`StreamingDataFrame.drop()` mutates data in place by deleting the keys from the original dictionary.


sdf = app.dataframe(...)

sdf.drop('A') Drop a key "A" from the value assuming it's a dict

sdf.drop(['A', 'B']) Drop multiple keys from the value


----
Other changes

* Fix doc formatting for processing guarantees by stereosky in https://github.com/quixio/quix-streams/pull/396
* Update doc reference to exactly once delivery guarantees by stereosky in https://github.com/quixio/quix-streams/pull/397
* Optimize tree() method of Stream class to reduce time complexity by aparnadotk in https://github.com/quixio/quix-streams/pull/400
* Consumer docstring cleanup by tim-quix in https://github.com/quixio/quix-streams/pull/394

New Contributors
* aparnadotk made their first contribution in https://github.com/quixio/quix-streams/pull/400

**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.7.0...v2.8.0

2.7.0

What's changed
[New] Support for Exactly-once processing guarantees using Kafka transactions.

With exactly-once processing guarantees enabled, each Kafka message is processed only one time and without duplicated outputs.

It is especially helpful when consistency of data in the output topics is crucial, and the downstream consumers don't handle duplicated data gracefully.

To learn more about the exactly-once processing and configuration, see the ["Processing Guarantees" section here](https://quix.io/docs/quix-streams/configuration.html#processing-guarantees).


Other Improvements
* Removed `column_name` parameter from the `Deserializer` class by tim-quix in 392
* Update quickstart code and copy to match by stereosky in 389

2.6.0

In this release we introduce new features as well as several breaking changes.
Please read the notes carefully before upgrading to 2.6.0.

------

What's changed
[BREAKING] The original Timestamps and Headers are now passed to the output when using `StreamingDataFrame.to_topic()`

Previously, `StreamingDataFrame.to_topic()` used the current epoch when producing output messages were used, and headers were omitted.

Since version 2.6.0, Quix Streams passes the original timestamps and headers of the messages to the output topics for more consistent data processing.

This change affects the data in the output topics, therefore it is marked a breaking one.

If you want to keep the previous behavior, you may set the timestamp to the current epoch and drop message headers before producing the output message:

python
import time

output_topic = app.topic(...)

sdf = app.dataframe(...)
Do some processing here ...

Set the timestamp to the current epoch
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))

Set empty message headers
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [])

Producing message to the output topic
sdf = sdf.to_topic(output_topic)




[BREAKING] Window results timestamps are set to the window start by default
Since 2.6.0, the results of the windowed aggregations use the window start timestamp as a message timestamp.

You may adjust the timestamps using the new `StreamingDataFrame.set_timestamp()` API.


[BREAKING] Removed `key` and `timestamp` attributes from the `MessageContext` class
To access the message keys and timestamps, please use the new API described below

[BREAKING] `final()` and `current()` methods of Windows don't have the `expand` parameter anymore



[NEW] New APIs to access and update message metadata in `StreamingDataFrame`

Accessing message metadata during processing

Docs:

- https://quix.io/docs/quix-streams/processing.html#accessing-kafka-keys-timestamps-and-headers

Previously, the Kafka message metadata resided in a separate `MessageContext` instance.
For example, to access a message key or a timestamp, users needed to import a `quixstreams.message_context` function, which is not straightforward:
python
from quixstreams import message_context

sdf = app.dataframe(...)
Previous way of getting a message key in versions < 2.6.0
sdf['message_key'] = sdf.apply(lambda value: message_context().key)


Now, the `.apply()`, `.filter()`, and `.update()` methods of `StreamingDataFrame` accept a new parameter - `metadata=True`.
Passing `metadata=True` to any of the functions above will inform `StreamingDataFrame` to provide additional positional arguments with the message metadata to the callback.

Example:


from quixstreams import Application

sdf = app.dataframe(...) a StreamingDataFrame instance

Using a message key to filter incoming messages
Note that the callback now must accept four positional arguments instead of one.
sdf = sdf.filter(lambda value, key, timestamp, headers: key != b'BAD_KEY', metadata=True)


This way, you may access metadata without additional imports.

Updating timestamps and headers with `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()`

Docs:

- https://quix.io/docs/quix-streams/processing.html#updating-kafka-timestamps
- https://quix.io/docs/quix-streams/processing.html#updating-kafka-headers

Since version 2.6.0, you can update timestamps and message headers during processing using the `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()` methods.
These methods accept callbacks similar to other operations, so you can use

The new timestamps will be used in windowed aggregations and when producing messages to the output topics using the `StreamingDataFrame.to_topic()` method.

The new headers will be set for the output messages as well.

Examples:
python
import time

sdf = app.dataframe(...)

Update the timestamp to be the current epoch using the "set_timestamp" API.
"set_timestamp()" requires the callback accepting four positional arguments: value, key, current timestamp, and headers.
The callback must return a new timestamp as integer in milliseconds.
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))


Add the value of APP_VERSION to the message headers for debugging purposes using the "set_headers()" API.
"set_headers()" also requires the callback accepting four positional arguments: value, key, timestamp, and current headers.
It must return a new set of headers as a list of (header, value) tuples.
If incoming message doesn't have headers attached, the "headers" parameter will be None.

Page 4 of 9

© 2025 Safety CLI Cybersecurity Inc. All Rights Reserved.