Kaflow

Latest version: v0.2.0

Safety actively analyzes 623807 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 1 of 2

0.2.0

:sparkles: Features

- Add a `TestClient` that allows writing unit tests for consumer functions
python
import pytest

from kaflow import FromValue, Kaflow, Message
from kaflow.testclient import TestClient

app = Kaflow(client_id="test", brokers=["localhost:9092"])


class RandomDependency:
def __init__(self, random: str = "random") -> None:
self.random = random


app.consume(topic="test_topic", sink_topics=["test_topic_2"])
async def consume_test_topic(
value: FromValue[bytes], dependency: RandomDependency
) -> Message:
print("Value", value)
print("Override dependency:", dependency.random)
return Message(value=b"Hello Unit Test!")


pytest.fixture
def test_client() -> TestClient:
client = TestClient(app=app)

with app.dependency_overrides:
app.dependency_overrides[RandomDependency] = lambda: RandomDependency(
random="not so random"
)
yield client


def test_consume_test_topic(test_client: TestClient) -> None:
message = test_client.publish(
topic="test_topic",
value=b"Hello World!",
)
assert message.value == b"Hello Unit Test!"

0.1.4

:bug: Fixes

- `ssl_context` was not being built if `security_protocol=="SASL_SSL"`

0.1.3

:sparkles: Features

- Add all possible values for `security_protocol`
- Serialize and deserialize Avro messages including schema

0.1.2

:bug: Fixes

- Fix uncaught child exceptions
- Fix `security_protocol` allowed values

:fire: Removed code

- Remove `"Programming Language :: Python :: 3.7"` classifier

0.1.1

Minor release to remove a deprecated argument in `aiokafka`.

:fire: Removed code

- Remove passing `loop` argument to `aiokafka.{AIOKafkaConsumer, AIOKafkaProducer}`

0.1.0

:sparkles: Features

- Better data extraction from message:
- Extract data from the value of the message using `FromValue[...]`
- Extract data from the key of the message using `FromKey[...]`
- Extract data from the headers of the message using `FromHeader[...]`
python
from pydantic import BaseModel

from kaflow import FromHeader, FromKey, FromValue, Json, Kaflow


class Stock(BaseModel):
name: str
price: float


class Key(BaseModel):
key: str


class Header1(BaseModel):
header: str


class Header2(BaseModel):
header: str


app = Kaflow(client_id="kaflow", brokers="localhost:9092")


app.consume(topic="stock")
async def consume_stock(
value: FromValue[Json[Stock]],
key: FromKey[Json[Key]],
header_1: FromHeader[Json[Header1]],
header_2: FromHeader[Json[Header2]],
) -> None:
print("value", value)
print("key", key)
print("header 1", header_1)
print("header 2", header_2)


app.run()

- Add `String` serializer
- Add `aiokafka.{AIOKafkaProducer, AIOKafkaConsumer}` parameters to `Kaflow.__init__` method

Page 1 of 2

Links

Releases

© 2024 Safety CLI Cybersecurity Inc. All Rights Reserved.