Nats-py

Latest version: v2.10.0

Safety actively analyzes 723158 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 1 of 7

2.10.0

Added

- Added `KeysWithFilters` method for key filtering in KV bucket (by somratdutta in https://github.com/nats-io/nats.py/pull/602)

python
Retrieve keys with filters
filtered_keys = await kv.keys(filters=['hello', 'greet'])
print(f'Filtered Keys: {filtered_keys}')


- Add `discard_new_per_subject` to `StreamConfig` (by caspervonb in https://github.com/nats-io/nats.py/pull/609)

python
config = nats.js.api.StreamConfig(
name=stream_name,
discard=nats.js.api.DiscardPolicy.NEW,
discard_new_per_subject=True,
max_msgs_per_subject=100
)

await js.add_stream(config)


- Added support for passing `pathlib.Path` derived types to `user_credentials` (by johnweldon in https://github.com/nats-io/nats.py/pull/623)

- Add an `is_acked` property to `nats.aio.msg.Msg` (by charles-dyfis-net in https://github.com/nats-io/nats.py/pull/672)

Fixed

- Fixed typing of `JetStreamContext.publish` by rijenkii in https://github.com/nats-io/nats.py/pull/605
- Fixed supporting `REQUEST_TIMEOUT` status code for a batch fetch with `no_wait=True` (by diorcety in https://github.com/nats-io/nats.py/pull/618)
- Fixed `deliver_subject` in implicit subscription creation (by m3nowak in https://github.com/nats-io/nats.py/pull/615)
- Fixed issue where flusher task stops running (by debbyglance in https://github.com/nats-io/nats.py/pull/636)
- Fixed service start times to be utc (by apollo13 in https://github.com/nats-io/nats.py/pull/640)
- Fixed discovered server callback not being awaited (by caspervonb in https://github.com/nats-io/nats.py/pull/660)

Improved

- Validate stream name in `add_stream` method (by ff137 in https://github.com/nats-io/nats.py/pull/607)
- Timezone-aware datetime (by diorcety in https://github.com/nats-io/nats.py/pull/648)
- Improved server version semver handling (by robinbowes in https://github.com/nats-io/nats.py/pull/679)

2.9.1

Bugfix release which includes:

Improved

- Improved server version semver handling (by robinbowes in https://github.com/nats-io/nats.py/pull/679)

2.9.0

Added

- Added `micro` module implementing services [ADR-32](https://github.com/nats-io/nats-architecture-and-design/blob/main/adr/ADR-32.md) (https://github.com/nats-io/nats.py/pull/566)

Thank you to charbonnierg for the community implementation that served as a kick-off point.

python
import asyncio
import contextlib
import signal

import nats
import nats.micro


async def echo(req) -> None:
"""Echo the request data back to the client."""
await req.respond(req.data)


async def main():
Define an event to signal when to quit
quit_event = asyncio.Event()

Attach signal handler to the event loop
loop = asyncio.get_event_loop()
for sig in (signal.Signals.SIGINT, signal.Signals.SIGTERM):
loop.add_signal_handler(sig, lambda *_: quit_event.set())

Create an async exit stack
async with contextlib.AsyncExitStack() as stack:
Connect to NATS
nc = await stack.enter_async_context(await nats.connect())

Add the service
service = await stack.enter_async_context(
await nats.micro.add_service(nc, name="demo_service", version="0.0.1")
)

group = service.add_group(name="demo")
Add an endpoint to the service
await group.add_endpoint(
name="echo",
handler=echo,
)
Wait for the quit event
await quit_event.wait()


if __name__ == "__main__":
asyncio.run(main())


- Added pagination to JetStream stream info (https://github.com/nats-io/nats.py/pull/594)

python
nc = await nats.connect()
js = nc.jetstream()
jsm = nc.jsm()

for i in range(300):
await jsm.add_stream(name=f"stream_{i}")

streams_page_1 = await jsm.streams_info(offset=0)
streams_page_1 = await jsm.streams_info(offset=256)


Fixed

- Fixed resource leak in JetStream push subscription (https://github.com/nats-io/nats.py/pull/597)

2.8.0

Added

- Added `publish_async` method to jetstream
python
ack_future = await js.publish_async("foo", b'bar')
await ack_future


- Added the ability to file contents as `user_credentials` (https://github.com/nats-io/nats.py/pull/546)

python
from nats.aio.client import RawCredentials
...
await nats.connect(user_credentials=RawCredentials("<creds file contents as string>"))


Fixed

- Fixed a race condition that could trigger error callbacks if a timeout and a response from a request where in-conflict (https://github.com/nats-io/nats.py/pull/573).

2.7.2

- Added `heartbeat` option to pull subscribers `fetch` API

python
await sub.fetch(1, timeout=1, heartbeat=0.1)


It can be useful to help distinguish API timeouts from not receiving messages:

python
try:
await sub.fetch(100, timeout=1, heartbeat=0.2)
except nats.js.errors.FetchTimeoutError:
timeout due to not receiving messages
except asyncio.TimeoutError:
unexpected timeout


- Added `subject_transform` to `add_consumer`

python
await js.add_stream(
name="TRANSFORMS",
subjects=["test", "foo"],
subject_transform=nats.js.api.SubjectTransform(
src=">", dest="transformed.>"
),
)


- Added `subject_transform` to sources as well:

python
transformed_source = nats.js.api.StreamSource(
name="TRANSFORMS",
The source filters cannot overlap.
subject_transforms=[
nats.js.api.SubjectTransform(
src="transformed.>", dest="fromtest.transformed.>"
),
nats.js.api.SubjectTransform(
src="foo.>", dest="fromtest.foo.>"
),
],
)
await js.add_stream(
name="SOURCING",
sources=[transformed_source],
)


- Added `backoff` option to `add_consumer`

python
await js.add_consumer(
"events",
durable_name="a",
max_deliver=3, has to be greater than length as backoff array
backoff=[1, 2], defined in seconds
ack_wait=999999, ignored once using backoff
max_ack_pending=3,
filter_subject="events.>",
)


- Added `compression` to `add_consumer`

python
await js.add_stream(
name="COMPRESSION",
subjects=["test", "foo"],
compression="s2",
)


- Added `metadata` to `add_stream`

python
await js.add_stream(
name="META",
subjects=["test", "foo"],
metadata={'foo': 'bar'},
)

2.7.0

Added

- Added support for multiple filter consumers when using nats-server +v2.10
This is only supported when using the `pull_subscribe_bind` API:

python
await jsm.add_stream(name="multi", subjects=["a", "b", "c.>"])
cinfo = await jsm.add_consumer(
"multi",
name="myconsumer",
filter_subjects=["a", "b"],
)
psub = await js.pull_subscribe_bind("multi", "myconsumer")
msgs = await psub.fetch(2)
for msg in msgs:
await msg.ack()


- Added `subjects_filter` option to `js.stream_info()` API

python
stream = await js.add_stream(name="foo", subjects=["foo.>"])
for i in range(0, 5):
await js.publish("foo.%d" % i, b'A')

si = await js.stream_info("foo", subjects_filter=">")
print(si.state.subjects)
=> {'foo.0': 1, 'foo.1': 1, 'foo.2': 1, 'foo.3': 1, 'foo.4': 1}


Changed

- Changed kv.watch default `inactive_threshold` cleanup timeout to be 5 minutes.
It can now be customized as well by passing `inactive_threshold` as argument in seconds:


w = await kv.watchall(inactive_threshold=30.0)


- Changed `pull_subscribe_bind` first argument to be called `consumer` instead of `durable`
since it also supports ephemeral consumers. This should be backwards compatible.

python
psub = await js.pull_subscribe_bind(consumer="myconsumer", stream="test")

Page 1 of 7

© 2025 Safety CLI Cybersecurity Inc. All Rights Reserved.