Quixstreams

Latest version: v3.3.0

Safety actively analyzes 681881 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 3 of 7

2.8.0

What's Changed

`.update()` and `.to_topic()` now modify the `StreamingDataFrame` objects in-place.
In previous versions, methods like `StreamingDataFrame.update()` and `StreamingDataFrame.to_topic()` always returned a new SDF object.
We were getting feedback that this behavior is not always obvious, and it's easy to forget to re-assign the result of `.update()` or `.to_topic()` calls to the variable.
Now, both of these methods modify the SDF object in place and return itself, so the previous usage will still work:


sdf = app.dataframe(...)

The SDF is now modified in place and it's not necessary to re-assign it to the variable
sdf.update(...)
sdf.to_topic(...)

This code will keep working as before
sdf = sdf.update(...)
sdf = sdf.to_topic(...)
or
sdf = sdf.update(...).to_topic(...)


⚠️ **Note:** If there's an `sdf.update()`or `sdf.to_topic()` in the code not assigned back to the variable, it will now update the SDF instance.

----
New method `StreamingDataFrame.print()`

Users can now easily debug the value and metadata of the current record in the stream with `StreamingDataFrame.print()`.
By default, it will print the current record value wrapped into a dictionary.
If called with `metadata=True,` it will also print the record's key, timestamp, and headers.

It's aimed to be a shortcut for the previous workaround with `StreamingDataFrame.update(print)`.

Example:

sdf = app.dataframe(...)

Print only the value of the current record
sdf.print()

>>> {'value': {'number': 163937}}
{'value': {'number': 163938}}


Print value and metadata of the current record
sdf.print(metadata=True)

>>> { 'value': {'number': 12175},
'key': b'd22d884a-eb88-44de-b22f-abfdc0b215f6',
'timestamp': 1721129697926,
'headers': [('header', b'123'), ('header', b'1234')]}

-----
New method `StreamingDataFrame.drop()`
With `StreamingDataFrame.drop()`, users can easily drop the unnecessary keys from the dictionary-like values.

`StreamingDataFrame.drop()` mutates data in place by deleting the keys from the original dictionary.


sdf = app.dataframe(...)

sdf.drop('A') Drop a key "A" from the value assuming it's a dict

sdf.drop(['A', 'B']) Drop multiple keys from the value


----
Other changes

* Fix doc formatting for processing guarantees by stereosky in https://github.com/quixio/quix-streams/pull/396
* Update doc reference to exactly once delivery guarantees by stereosky in https://github.com/quixio/quix-streams/pull/397
* Optimize tree() method of Stream class to reduce time complexity by aparnadotk in https://github.com/quixio/quix-streams/pull/400
* Consumer docstring cleanup by tim-quix in https://github.com/quixio/quix-streams/pull/394

New Contributors
* aparnadotk made their first contribution in https://github.com/quixio/quix-streams/pull/400

**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.7.0...v2.8.0

2.7.0

What's changed
[New] Support for Exactly-once processing guarantees using Kafka transactions.

With exactly-once processing guarantees enabled, each Kafka message is processed only one time and without duplicated outputs.

It is especially helpful when consistency of data in the output topics is crucial, and the downstream consumers don't handle duplicated data gracefully.

To learn more about the exactly-once processing and configuration, see the ["Processing Guarantees" section here](https://quix.io/docs/quix-streams/configuration.html#processing-guarantees).


Other Improvements
* Removed `column_name` parameter from the `Deserializer` class by tim-quix in 392
* Update quickstart code and copy to match by stereosky in 389

2.6.0

In this release we introduce new features as well as several breaking changes.
Please read the notes carefully before upgrading to 2.6.0.

------

What's changed
[BREAKING] The original Timestamps and Headers are now passed to the output when using `StreamingDataFrame.to_topic()`

Previously, `StreamingDataFrame.to_topic()` used the current epoch when producing output messages were used, and headers were omitted.

Since version 2.6.0, Quix Streams passes the original timestamps and headers of the messages to the output topics for more consistent data processing.

This change affects the data in the output topics, therefore it is marked a breaking one.

If you want to keep the previous behavior, you may set the timestamp to the current epoch and drop message headers before producing the output message:

python
import time

output_topic = app.topic(...)

sdf = app.dataframe(...)
Do some processing here ...

Set the timestamp to the current epoch
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))

Set empty message headers
sdf = sdf.set_headers(lambda value, key, timestamp, headers: [])

Producing message to the output topic
sdf = sdf.to_topic(output_topic)




[BREAKING] Window results timestamps are set to the window start by default
Since 2.6.0, the results of the windowed aggregations use the window start timestamp as a message timestamp.

You may adjust the timestamps using the new `StreamingDataFrame.set_timestamp()` API.


[BREAKING] Removed `key` and `timestamp` attributes from the `MessageContext` class
To access the message keys and timestamps, please use the new API described below

[BREAKING] `final()` and `current()` methods of Windows don't have the `expand` parameter anymore



[NEW] New APIs to access and update message metadata in `StreamingDataFrame`

Accessing message metadata during processing

Docs:

- https://quix.io/docs/quix-streams/processing.html#accessing-kafka-keys-timestamps-and-headers

Previously, the Kafka message metadata resided in a separate `MessageContext` instance.
For example, to access a message key or a timestamp, users needed to import a `quixstreams.message_context` function, which is not straightforward:
python
from quixstreams import message_context

sdf = app.dataframe(...)
Previous way of getting a message key in versions < 2.6.0
sdf['message_key'] = sdf.apply(lambda value: message_context().key)


Now, the `.apply()`, `.filter()`, and `.update()` methods of `StreamingDataFrame` accept a new parameter - `metadata=True`.
Passing `metadata=True` to any of the functions above will inform `StreamingDataFrame` to provide additional positional arguments with the message metadata to the callback.

Example:


from quixstreams import Application

sdf = app.dataframe(...) a StreamingDataFrame instance

Using a message key to filter incoming messages
Note that the callback now must accept four positional arguments instead of one.
sdf = sdf.filter(lambda value, key, timestamp, headers: key != b'BAD_KEY', metadata=True)


This way, you may access metadata without additional imports.

Updating timestamps and headers with `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()`

Docs:

- https://quix.io/docs/quix-streams/processing.html#updating-kafka-timestamps
- https://quix.io/docs/quix-streams/processing.html#updating-kafka-headers

Since version 2.6.0, you can update timestamps and message headers during processing using the `StreamingDataFrame.set_timestamp()` and `StreamingDataFrame.set_headers()` methods.
These methods accept callbacks similar to other operations, so you can use

The new timestamps will be used in windowed aggregations and when producing messages to the output topics using the `StreamingDataFrame.to_topic()` method.

The new headers will be set for the output messages as well.

Examples:
python
import time

sdf = app.dataframe(...)

Update the timestamp to be the current epoch using the "set_timestamp" API.
"set_timestamp()" requires the callback accepting four positional arguments: value, key, current timestamp, and headers.
The callback must return a new timestamp as integer in milliseconds.
sdf = sdf.set_timestamp(lambda value, key, timestamp, headers: int(time.time() * 1000))


Add the value of APP_VERSION to the message headers for debugging purposes using the "set_headers()" API.
"set_headers()" also requires the callback accepting four positional arguments: value, key, timestamp, and current headers.
It must return a new set of headers as a list of (header, value) tuples.
If incoming message doesn't have headers attached, the "headers" parameter will be None.

2.5.1

What's Changed

**Fixes**:
* Correcting the Topic parameter definition by shrutimantri in https://github.com/quixio/quix-streams/pull/364
* Don't set default topics params in Quix apps by daniil-quix in https://github.com/quixio/quix-streams/pull/365
* Fix admin timeouts by tim-quix in https://github.com/quixio/quix-streams/pull/342
* Correcting the error message to be more explicit by shrutimantri in https://github.com/quixio/quix-streams/pull/348
* Fix Quix changelog topics failing first validation by tim-quix in https://github.com/quixio/quix-streams/pull/366
* Update exception messages for TopicConfigurationMismatch by tim-quix in https://github.com/quixio/quix-streams/pull/370



**Docs updates**
- fix tutorial bullets for correct rendering in website version by tim-quix in https://github.com/quixio/quix-streams/pull/362
- Update README Community section by stereosky in https://github.com/quixio/quix-streams/pull/368
- update docs to include a local pattern, adjust SDK token language by tim-quix in https://github.com/quixio/quix-streams/pull/369

New Contributors
* shrutimantri made their first contribution in https://github.com/quixio/quix-streams/pull/364

**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.5.0...v2.5.1

2.5.0

What's Changed
Features
Checkpointing

`Checkpointing` is an overhaul of the previous commit structure. It is meant to better synchronize processing progress (i.e. committing topic offsets) and state updates to ensure consistency of the state.

It should also increase processing speed anywhere from 1.3x-2.5x due to its new batched commit approach.

To adjust this new commit frequency, users can set a (new) `commit_interval` (Default: 5 seconds):

python
app = Application(commit_interval=5)


For more details, [see the `Checkpoint` docs](https://quix.io/docs/quix-streams/advanced/checkpointing.html).


GroupBy

`GroupBy` enables users to "group" or "re-key" their messages based on the message value, typically to perform (stateful) aggregations on them (much like SQL).

With the new `StreamingDataFrame.group_by()`, you can do this while including other `StreamingDataFrame` operations before or after (so only one `Application` is needed):

python
data: {"user_id": "abc", "int_field": 5}
app = Application()
sdf = app.dataframe()
sdf["new_col"] = sdf["int_field"] + 1
sdf = sdf.group_by("user_id")
sdf = sdf.apply(lambda r: r["new_col"])
sdf = sdf.tumbling_window(duration_ms=3600).sum().final()
...etc...


Users can group by a column name, or provide a custom grouping function.

For more details, [see the `GroupBy` docs](https://quix.io/docs/quix-streams/groupby.html).



Enhancements
* Docs updates by stereosky in https://github.com/quixio/quix-streams/pull/344, https://github.com/quixio/quix-streams/pull/352
* add default error cb to Admin by tim-quix in https://github.com/quixio/quix-streams/pull/343


**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.4.2...v2.5.0

2.4.2

What's Changed

* Fix handling of topics created outside of Quix Cloud by tim-quix in https://github.com/quixio/quix-streams/pull/338
* Add clearer error messages for invalid SDF column name references by tim-quix in https://github.com/quixio/quix-streams/pull/322
* Better handling of topic creation errors in Quix Cloud by tim-quix in https://github.com/quixio/quix-streams/pull/337
* Use pyproject.toml instead of setup.cfg by tim-quix in https://github.com/quixio/quix-streams/pull/339

* Update docs by tbedford https://github.com/quixio/quix-streams/pull/325 https://github.com/quixio/quix-streams/pull/331
* Update README by stereosky https://github.com/quixio/quix-streams/pull/332 https://github.com/quixio/quix-streams/pull/335

**Full Changelog**: https://github.com/quixio/quix-streams/compare/v2.4.1...v2.4.2

Page 3 of 7

© 2024 Safety CLI Cybersecurity Inc. All Rights Reserved.