Onetl

Latest version: v0.10.2

Safety actively analyzes 622513 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 1 of 3

0.10.2

Features

- Add support of Pydantic v2. (230)

Improvements

- Improve database connections documentation:
- Add "Types" section describing mapping between Clickhouse and Spark types
- Add "Prerequisites" section describing different aspects of connecting to Clickhouse
- Separate documentation of `DBReader` and `.sql()` / `.pipeline(...)`
- Add examples for `.fetch()` and `.execute()` (211\`, :github`228`, :github`229`, :github`233`, :github`234`, :github`235`, :github`236`, :github:pull:\`240)

- Add notes to Greenplum documentation about issues with IP resolution and building `gpfdist` URL (228)

- Allow calling `MongoDB.pipeline(...)` with passing just collection name, without explicit aggregation pipeline. (237)

- Update default `Postgres(extra={...})` to include `{"stringtype": "unspecified"}` option. This allows to write text data to non-text column (or vice versa), relying to Postgres cast capabilities.

For example, now it is possible to read column of type `money` as Spark's `StringType()`, and write it back to the same column, without using intermediate columns or tables. (229)

Bug Fixes

- Return back handling of `DBReader(columns="string")`. This was a valid syntax up to v0.10 release, but it was removed because most of users neved used it. It looks that we were wrong, returning this behavior back, but with deprecation warning. (238)

- Downgrade Greenplum package version from `2.3.0` to `2.2.0`. (239)

This is because version 2.3.0 introduced issues with writing data to Greenplum 6.x. Connector can open transaction with `SELECT * FROM table LIMIT 0` query, but does not close it, which leads to deadlocks.

For using this connector with Greenplum 7.x, please pass package version explicitly:

python
maven_packages = Greenplum.get_packages(package_version="2.3.0", ...)

0.10.1

Features

- Add support of `Incremental Strategies` for `Kafka` connection:

python
reader = DBReader(
connection=Kafka(...),
source="topic_name",
hwm=AutoDetectHWM(name="some_hwm_name", expression="offset"),
)

with IncrementalStrategy():
df = reader.run()


This lets you resume reading data from a Kafka topic starting at the last committed offset from your previous run. (202)

- Add `has_data`, `raise_if_no_data` methods to `DBReader` class. (203)

- Updade VMware Greenplum connector from `2.1.4` to `2.3.0`. This implies:

- Greenplum 7.x support
- [Kubernetes support](https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/configure.html#k8scfg)
- New read option [gpdb.matchDistributionPolicy](https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/options.html#distpolmotion) which allows to match each Spark executor with specific Greenplum segment, avoiding redundant data transfer between Greenplum segments
- Allows overriding [Greenplum optimizer parameters](https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/2.3/greenplum-connector-spark/options.html#greenplum-gucs) in read/write operations (208)

- `Greenplum.get_packages()` method now accepts optional arg `package_version` which allows to override version of Greenplum connector package. (208)

0.10.0

Breaking Changes

- Upgrade `etl-entities` from v1 to v2 (172).

This implies that `HWM` classes are now have different internal structure than they used to.
<table>
<colgroup>
<col style="width: 50%" />
<col style="width: 50%" />
</colgroup>
<thead>
<tr class="header">
<th>etl-entities < 2</th>
<th>etl-entities >= 2</th>
</tr>
</thead>
<tbody>
<tr class="odd">
<td>

python
from etl_entities.old_hwm import IntHWM as OldIntHWM
from etl_entities.source import Column, Table
from etl_entities.process import Process

hwm = OldIntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
value=123,
)

</td>
<td>

python
from etl_entities.hwm import ColumnIntHWM

hwm = ColumnIntHWM(
name="some_unique_name",
description="any value you want",
source="schema.table",
expression="col1",
value=123,
)

</td>
</tr>
</tbody>
</table>

**Breaking change:** If you used HWM classes from `etl_entities` module, you should rewrite your code to make it compatible with new version.

<div class="dropdown">

More details

- `HWM` classes used by previous onETL versions were moved from `etl_entities` to `etl_entities.old_hwm` submodule. They are here for compatibility reasons, but are planned to be removed in `etl-entities` v3 release.
- New `HWM` classes have flat structure instead of nested.
- New `HWM` classes have mandatory `name` attribute (it was known as `qualified_name` before).
- Type aliases used while serializing and deserializing `HWM` objects to `dict` representation were changed too: `int` -> `column_int`.

</div>

To make migration simpler, you can use new method:

python
old_hwm = OldIntHWM(...)
new_hwm = old_hwm.as_new_hwm()


Which automatically converts all fields from old structure to new one, including `qualified_name` -> `name`.

- **Breaking changes:**

- Methods `BaseHWMStore.get()` and `BaseHWMStore.save()` were renamed to `get_hwm()` and `set_hwm()`.
- They now can be used only with new HWM classes from `etl_entities.hwm`, **old HWM classes are not supported**.

If you used them in your code, please update it accordingly.

- YAMLHWMStore **CANNOT read files created by older onETL versions** (0.9.x or older).

<details>
<summary>Upgrade procedure</summary>

python
pip install onetl==0.9.5

Get qualified_name for HWM


Option 1. HWM is built manually
from etl_entities import IntHWM, FileListHWM
from etl_entities.source import Column, Table, RemoteFolder
from etl_entities.process import Process

for column HWM
old_column_hwm = IntHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=Table(name="schema.table", instance="postgres://host:5432/db"),
column=Column(name="col1"),
)
qualified_name = old_column_hwm.qualified_name
"col1schema.tablepostgres://host:5432/dbcde.abc.myprocessmyhost"

for file HWM
old_file_hwm = FileListHWM(
process=Process(name="myprocess", task="abc", dag="cde", host="myhost"),
source=RemoteFolder(name="/absolute/path", instance="ftp://ftp.server:21"),
)
qualified_name = old_file_hwm.qualified_name
"file_list/absolute/pathftp://ftp.server:21cde.abc.myprocessmyhost"


Option 2. HWM is generated automatically (by DBReader/FileDownloader)
See onETL logs and search for string like qualified_name = '...'

qualified_name = "col1schema.tablepostgres://host:5432/dbcde.abc.myprocessmyhost"


Get .yml file path by qualified_name

import os
from pathlib import PurePosixPath
from onetl.hwm.store import YAMLHWMStore

here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
hwm_path = yaml_hwm_store.get_file_path(qualified_name)
print(hwm_path)

for column HWM
LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/col1__schema.table__postgres_host_5432_db__cde.abc.myprocess__myhost.yml')

for file HWM
LocalPosixPath('/home/maxim/.local/share/onETL/yml_hwm_store/file_list__absolute_path__ftp_ftp.server_21__cde.abc.myprocess__myhost.yml')


Read raw .yml file content

from yaml import safe_load, dump

raw_old_hwm_items = safe_load(hwm_path.read_text())
print(raw_old_hwm_items)

for column HWM
[
{
"column": { "name": "col1", "partition": {} },
"modified_time": "2023-12-18T10: 39: 47.377378",
"process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
"source": { "instance": "postgres: //host:5432/db", "name": "schema.table" },
"type": "int",
"value": "123",
},
]

for file HWM
[
{
"modified_time": "2023-12-18T11:15:36.478462",
"process": { "dag": "cde", "host": "myhost", "name": "myprocess", "task": "abc" },
"source": { "instance": "ftp://ftp.server:21", "name": "/absolute/path" },
"type": "file_list",
"value": ["file1.txt", "file2.txt"],
},
]


Convert file content to new structure, compatible with onETL 0.10.x
raw_new_hwm_items = []
for old_hwm in raw_old_hwm_items:
new_hwm = {"name": qualified_name, "modified_time": old_hwm["modified_time"]}

if "column" in old_hwm:
new_hwm["expression"] = old_hwm["column"]["name"]
new_hwm["entity"] = old_hwm["source"]["name"]
old_hwm.pop("process", None)

if old_hwm["type"] == "int":
new_hwm["type"] = "column_int"
new_hwm["value"] = old_hwm["value"]

elif old_hwm["type"] == "date":
new_hwm["type"] = "column_date"
new_hwm["value"] = old_hwm["value"]

elif old_hwm["type"] == "datetime":
new_hwm["type"] = "column_datetime"
new_hwm["value"] = old_hwm["value"]

elif old_hwm["type"] == "file_list":
new_hwm["type"] = "file_list"
new_hwm["value"] = [
os.fspath(PurePosixPath(old_hwm["source"]["name"]).joinpath(path))
for path in old_hwm["value"]
]

else:
raise ValueError("WAT?")

raw_new_hwm_items.append(new_hwm)


print(raw_new_hwm_items)
for column HWM
[
{
"name": "col1schema.tablepostgres://host:5432/dbcde.abc.myprocessmyhost",
"modified_time": "2023-12-18T10:39:47.377378",
"expression": "col1",
"source": "schema.table",
"type": "column_int",
"value": 123,
},
]

for file HWM
[
{
"name": "file_list/absolute/pathftp://ftp.server:21cde.abc.myprocessmyhost",
"modified_time": "2023-12-18T11:15:36.478462",
"entity": "/absolute/path",
"type": "file_list",
"value": ["/absolute/path/file1.txt", "/absolute/path/file2.txt"],
},
]


Save file with new content
with open(hwm_path, "w") as file:
dump(raw_new_hwm_items, file)


Stop Python interpreter and update onETL
pip install onetl==0.10.0
Check that new .yml file can be read

from onetl.hwm.store import YAMLHWMStore

qualified_name = ...

here you should pass the same arguments as used on production, if any
yaml_hwm_store = YAMLHWMStore()
yaml_hwm_store.get_hwm(qualified_name)

for column HWM
ColumnIntHWM(
name='col1schema.tablepostgres://host:5432/dbcde.abc.myprocessmyhost',
description='',
entity='schema.table',
value=123,
expression='col1',
modified_time=datetime.datetime(2023, 12, 18, 10, 39, 47, 377378),
)

for file HWM
FileListHWM(
name='file_list/absolute/pathftp://ftp.server:21cde.abc.myprocessmyhost',
description='',
entity=AbsolutePath('/absolute/path'),
value=frozenset({AbsolutePath('/absolute/path/file1.txt'), AbsolutePath('/absolute/path/file2.txt')}),
expression=None,
modified_time=datetime.datetime(2023, 12, 18, 11, 15, 36, 478462)
)


That's all!

</details>

But most of users use other HWM store implementations which do not have such issues.

- Several classes and functions were moved from `onetl` to `etl_entities`:

<table>
<colgroup>
<col style="width: 50%" />
<col style="width: 50%" />
</colgroup>
<thead>
<tr class="header">
<th>onETL <code>0.9.x</code> and older</th>
<th>onETL <code>0.10.x</code> and newer</th>
</tr>
</thead>
<tbody>
<tr class="odd">
<td>

python
from onetl.hwm.store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)

</td>
<td>

python
from etl_entities.hwm_store import (
detect_hwm_store,
BaseHWMStore,
HWMStoreClassRegistry,
register_hwm_store_class,
HWMStoreManager,
MemoryHWMStore,
)

</td>
</tr>
</tbody>
</table>

They still can be imported from old module, but this is deprecated and will be removed in v1.0.0 release.

- Change the way of passing `HWM` to `DBReader` and `FileDownloader` classes:

<table>
<colgroup>
<col style="width: 50%" />
<col style="width: 50%" />
</colgroup>
<thead>
<tr class="header">
<th>onETL <code>0.9.x</code> and older</th>
<th>onETL <code>0.10.x</code> and newer</th>
</tr>
</thead>
<tbody>
<tr class="odd">
<td>

python
reader = DBReader(
connection=...,
source=...,
hwm_column="col1",





)

</td>
<td>

python
reader = DBReader(
connection=...,
source=...,
hwm=DBReader.AutoDetectHWM(
name is mandatory now!
name="my_unique_hwm_name",
expression="col1",
),
)

</td>
</tr>
<tr class="even">
<td>

python
reader = DBReader(
connection=...,
source=...,
hwm_column=(
"col1",
"cast(col1 as date)",

),
)

</td>
<td>

python
reader = DBReader(
connection=...,
source=...,
hwm=DBReader.AutoDetectHWM(
name is mandatory now!
name="my_unique_hwm_name",
expression="cast(col1 as date)",
),
)

</td>
</tr>
<tr class="odd">
<td>

python
downloader = FileDownloader(
connection=...,
source_path=...,
target_path=...,
hwm_type="file_list",


)

</td>
<td>

python
downloader = FileDownloader(
connection=...,
source_path=...,
target_path=...,
hwm=FileListHWM(
name is mandatory now!
name="another_unique_hwm_name",
),
)

</td>
</tr>
</tbody>
</table>

New HWM classes have **mandatory** `name` attribute which should be passed explicitly, instead of generating if automatically under the hood.

Automatic `name` generation using the old `DBReader.hwm_column` / `FileDownloader.hwm_type` syntax is still supported, but will be removed in v1.0.0 release. (179)

- Performance of read Incremental and Batch strategies has been drastically improved. (182).

<details>
<summary>Before and after in details</summary>

`DBReader.run()` + incremental/batch strategy behavior in versions 0.9.x and older:

- Get table schema by making query `SELECT * FROM table WHERE 1=0` (if `DBReader.columns` has `*`)
- Expand `*` to real column names from table, add here `hwm_column`, remove duplicates (as some RDBMS does not allow that).
- Create dataframe from query like `SELECT hwm_expression AS hwm_column, ...other table columns... FROM table WHERE hwm_expression > prev_hwm.value`.
- Determine HWM class using dataframe schema: `df.schema[hwm_column].dataType`.
- Determine x HWM column value using Spark: `df.select(max(hwm_column)).collect()`.
- Use `max(hwm_column)` as next HWM value, and save it to HWM Store.
- Return dataframe to user.

This was far from ideal:

- Dataframe content (all rows or just changed ones) was loaded from the source to Spark only to get min/max values of specific column.

- Step of fetching table schema and then substituting column names in the next query caused some unexpected errors.

For example, source contains columns with mixed name case, like `"CamelColumn"` or `"spaced column"`.

Column names were *not* escaped during query generation, leading to queries that cannot be executed by database.

So users have to *explicitly* pass column names `DBReader`, wrapping columns with mixed naming with `"`:

python
reader = DBReader(
connection=...,
source=...,
columns=[ passing '*' here leads to wrong SQL query generation
"normal_column",
'"CamelColumn"',
'"spaced column"',
...,
],
)


- Using `DBReader` with `IncrementalStrategy` could lead to reading rows already read before.

Dataframe was created from query with WHERE clause like `hwm.expression > prev_hwm.value`, not `hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value`.

So if new rows appeared in the source **after** HWM value is determined, they can be read by accessing dataframe content (because Spark dataframes are lazy), leading to inconsistencies between HWM value and dataframe content.

This may lead to issues then `DBReader.run()` read some data, updated HWM value, and next call of `DBReader.run()` will read rows that were already read in previous run.

`DBReader.run()` + incremental/batch strategy behavior in versions 0.10.x and newer:

- Detect type of HWM expression: `SELECT hwm.expression FROM table WHERE 1=0`.
- Determine corresponding Spark type `df.schema[0]` and when determine matching HWM class (if `DReader.AutoDetectHWM` is used).
- Get min/max values by querying the source: `SELECT MAX(hwm.expression) FROM table WHERE hwm.expression >= prev_hwm.value`.
- Use `max(hwm.expression)` as next HWM value, and save it to HWM Store.
- Create dataframe from query `SELECT ... table columns ... FROM table WHERE hwm.expression > prev_hwm.value AND hwm.expression <= current_hwm.value`, baking new HWM value into the query.
- Return dataframe to user.

</details>

Improvements:

- Allow source to calculate min/max instead of loading everything to Spark. This should be **faster** on large amounts of data (**up to x2**), because we do not transfer all the data from the source to Spark. This can be even faster if source have indexes for HWM column.
- Columns list is passed to source as-is, without any resolving on `DBReader` side. So you can pass `DBReader(columns=["*"])` to read tables with mixed columns naming.
- Restrict dataframe content to always match HWM values, which leads to never reading the same row twice.

**Breaking change**: HWM column is not being implicitly added to dataframe. It was a part of `SELECT` clause, but now it is mentioned only in `WHERE` clause.

So if you had code like this, you have to rewrite it:

<table>
<colgroup>
<col style="width: 50%" />
<col style="width: 50%" />
</colgroup>
<thead>
<tr class="header">
<th>onETL <code>0.9.x</code> and older</th>
<th>onETL <code>0.10.x</code> and newer</th>
</tr>
</thead>
<tbody>
<tr class="odd">
<td>

python
reader = DBReader(
connection=...,
source=...,
columns=[
"col1",
"col2",


],
hwm_column="hwm_col",
)

df = reader.run()

hwm_column value is in the dataframe
assert df.columns == ["col1", "col2", "hwm_col"]

</td>
<td>

python
reader = DBReader(
connection=...,
source=...,
columns=[
"col1",
"col2",
add hwm_column explicitly
"hwm_col",
],
hwm_column="hwm_col",
)

df = reader.run()
if columns list is not updated,
this fill fail
assert df.columns == ["col1", "col2", "hwm_col"]

</td>
</tr>
<tr class="even">
<td>

python
reader = DBReader(
connection=...,
source=...,
columns=[
"col1",
"col2",


],
hwm_column=(
"hwm_col",
"cast(hwm_col as int)",
),
)

df = reader.run()

hwm_expression value is in the dataframe
assert df.columns == ["col1", "col2", "hwm_col"]

</td>
<td>

python
reader = DBReader(
connection=...,
source=...,
columns=[
"col1",
"col2",
add hwm_expression explicitly
"cast(hwm_col as int) as hwm_col",
],
hwm_column=(
"hwm_col",
"cast(hwm_col as int)",
),
)

df = reader.run()
if columns list is not updated,
this fill fail
assert df.columns == ["col1", "col2", "hwm_col"]

</td>
</tr>
</tbody>
</table>

But most users just use `columns=["*"]` anyway, they won't see any changes.

- `FileDownloader.run()` now updates HWM in HWM Store not after each file is being successfully downloaded, but after all files were handled.

This is because:

- FileDownloader can be used with `DownloadOptions(workers=N)`, which could lead to race condition - one thread can save to HWM store one HWM value when another thread will save different value.
- FileDownloader can download hundreds and thousands of files, and issuing a request to HWM Store for each file could potentially DDoS HWM Store. (189)

There is a exception handler which tries to save HWM to HWM store if download process was interrupted. But if it was interrupted by force, like sending `SIGKILL` event, HWM will not be saved to HWM store, so some already downloaded files may be downloaded again next time.

But unexpected process kill may produce other negative impact, like some file will be downloaded partially, so this is an expected behavior.

Features

- Add Python 3.12 compatibility. (167)
- `Excel` file format now can be used with Spark 3.5.0. (187)
- `SnapshotBatchStagy` and `IncrementalBatchStrategy` does no raise exceptions if source does not contain any data. Instead they stop at first iteration and return empty dataframe. (188)
- Cache result of `connection.check()` in high-level classes like `DBReader`, `FileDownloader` and so on. This makes logs less verbose. (190)

Bug Fixes

- Fix `slot` and `hook` decorators returning methods with missing arguments in signature (Pylance, VS Code). (183)
- Kafka connector documentation said that it does support reading topic data incrementally by passing `group.id` or `groupIdPrefix`. Actually, this is not true, because Spark does not send information to Kafka which messages were consumed. So currently users can only read the whole topic, no incremental reads are supported.

0.9.5

Not secure
Features

- Add `XML` file format support. (\163)
- Tested compatibility with Spark 3.5.0. `MongoDB` and `Excel` are not supported yet, but other packages do. (\159)

Improvements

- Add check to all DB and FileDF connections that Spark session is alive. (\164)

Bug Fixes

- Fix `Hive.check()` behavior when Hive Metastore is not available. (\164)

0.9.4

Not secure
Features

- Add `Excel` file format support. (148)
- Add `Samba` file connection. It is now possible to download and upload files to Samba shared folders using `FileDownloader`/`FileUploader`. (150)
- Add `if_exists="ignore"` and `error` to `Hive.WriteOptions` (143)
- Add `if_exists="ignore"` and `error` to `JDBC.WriteOptions` (144)
- Add `if_exists="ignore"` and `error` to `MongoDB.WriteOptions` (145)

Improvements

- Add documentation about different ways of passing packages to Spark session. (151)

- Drastically improve `Greenplum` documentation:

- Added information about network ports, grants, `pg_hba.conf` and so on.
- Added interaction schemas for reading, writing and executing statements in Greenplum.
- Added recommendations about reading data from views and `JOIN` results from Greenplum. (154)

- Make `.fetch` and `.execute` methods of DB connections thread-safe. Each thread works with its own connection. (156)

- Call `.close()` on FileConnection then it is removed by garbage collector. (156)

Bug Fixes

- Fix issue while stopping Python interpreter calls `JDBCMixin.close()` and prints exceptions to log. (156)

0.9.3

Not secure
Bug Fixes

- Fix documentation build

Page 1 of 3

© 2024 Safety CLI Cybersecurity Inc. All Rights Reserved.