This month's flytekit release continues to emphasize user-experience improvements, specifically around support for user documentation and the control plane access, what we've rebranded "FlyteRemote".
Redesigned Flyte Remote
The `FlyteRemote` class introduced a programmatic way for users to interact with Flyte entities through a Flyte backend. From past experience with legacy versions of flytekit, we've found that programmatic access to the control plane was a welcome addition to the ecosystem alongside CLI access (`flytectl` and `flyte-cli`), web (Flyte Console), and raw gRPC clients. See https://github.com/flyteorg/flytekit/pull/547 for the recent code changes.
Some examples:
Fetching
python
remote = FlyteRemote.from_config(default_project="flytesnacks", default_domain="development")
flyte_workflow = remote.fetch_workflow(name="my_workflow", version="v1")
Executing and waiting
python
flyte_launch_plan = remote.fetch_launch_plan(name="workflows.basic.basic_workflow.my_wf", version=f"123abc")
execution = remote.execute(flyte_launch_plan, {"a": 10, "b": "foobar"}, wait=True)
The `FlyteRemote` object even supports the real time registration and execution of new tasks (for tasks that don't require an image-build step of course, flytekit doesn't handle image-building)
python
example_db = "https://cdn.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip"
interactive_sql_task = SQLite3Task(
"basic_querying",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(
uri=example_db,
compressed=True,
),
)
registered_sql_task = remote.register(interactive_sql_task)
execution = remote.execute(registered_sql_task, inputs={"limit": 10}, wait=True)
This feature should still be considered beta though the primary interface is unlikely to change. Please see the [documentation](https://docs.flyte.org/projects/flytekit/en/latest/remote.html) for additional information, some more examples in the [integration test suite](https://github.com/flyteorg/flytekit/blob/master/tests/flytekit/integration/remote/test_remote.py).
The `FlyteRemote` object can also be used in an interactive setting, such as a python repl or jupyter notebook:
![image](https://user-images.githubusercontent.com/2816689/127570795-e98b3215-62c7-4e2c-8c3f-f8ea4e1585ff.png)
![image](https://user-images.githubusercontent.com/2816689/127570827-8ed19f6c-1e75-4199-8a77-80621eda6da9.png)
Please join the upcoming OSS meeting on Aug. 10th for a demo as well.
Task/Workflow Comments
With https://github.com/flyteorg/flytekit/pull/557 user comments for task and workflow parameters will again be stored in Admin, and shown in the UI on the launch form. If you have a task like so,
def t1(a: int):
"""
Comments here will not be read yet, but we hope to support this soon.
:param a: This is an integer input.
"""
The comment will show up on the launch form like so:
![image](https://user-images.githubusercontent.com/2896568/127560288-cb625ea2-abb2-4ba0-94af-0e44826226be.png)
flytekit supports all three common parameter styles: Sphinx (RST), googledocstring, and numpy.
Other Changes
Exception handling
Exception handling has been slightly improved with https://github.com/flyteorg/flytekit/pull/543. Errors raised in the course of running a task should now more accurately reflect where it came from - flytekit library code or user code. Support for distinguishing recoverable vs non-recoverable errors is now also there (with Propeller versions v0.12.12 and later). A 'recoverable' error will trigger and count against task-retries - by default, all system exceptions are recoverable and all user exceptions are not. If you want to raise a recoverable exception as a user, subclass the `FlyteRecoverableException` class and raise that.
Reference launch plan support in dynamic tasks
Thanks jeevb for [fixing](https://github.com/flyteorg/flytekit/pull/553) support for reference launch plans in dynamic tasks. This was previously not possible.
python
dynamic
def my_subwf(a: int) -> typing.List[int]:
reference_launch_plan(project="project", domain="domain", name="name", version="version")
def ref_lp1(p1: str, p2: str) -> int:
...
s = []
for i in range(a):
s.append(ref_lp1(p1="hello", p2=str(a)))
return s
Please note that _reference_ tasks and sub-workflows are still not allowed in dynamic tasks.
--
by wild-endeavor