Flowpipe

Latest version: v1.0.1

Safety actively analyzes 642283 Python packages for vulnerabilities to keep your Python projects secure.

Scan your dependencies

Page 2 of 5

0.8.1

A major bug in the threading Graph evaluation was fixed, that would lead to nodes not being submitted for evaluation reliably.

0.8.0

When evaluating a Graph in "threading" mode, the higher-level API from `concurrent.futures` will be used, providing an easy option to control the number of threads being spawned, as well as providing more useful information in case a Node causes an Exception during evaluation.

Note that the API for the "threading" mode changed and instead of a "submission_delay", the "max_workers" keyword controls the behavior of threading evaluations.

0.7.0

Nested graphs are supported in flowpipe.

python
from flowpipe import Graph, Node


Node(outputs=['file'])
def MyNode(file):
Something is done in here ...
return {'file': file}


A graph that fixes an incoming file, cleaning up messy names etc.

+-----------------------+ +-------------------------+
| Cleanup Filename | | Change Lineendings |
|-----------------------| |-------------------------|
o file<> | +--->o file<> |
| file o-----+ | file o
+-----------------------+ +-------------------------+
fix_file = Graph(name="fix_file")
cleanup_filename = MyNode(name="Cleanup Filename", graph=fix_file)
change_lineendings = MyNode(name="Change Lineendings", graph=fix_file)
cleanup_filename.outputs["file"].connect(change_lineendings.inputs["file"])


A second graph reads finds files, and extracts their contents into a database
+----------------+ +----------------------------+ +----------------+
| Find File | | Read Values from File | | Update DB |
|----------------| |----------------------------| |----------------|
o file<> | +--->o file<> | +--->o file<> |
| file o-----+ | file o-----+ | file o
+----------------+ +----------------------------+ +----------------+
udpate_db_from_file = Graph(name="udpate_db_from_file")
find_file = MyNode(name="Find File", graph=udpate_db_from_file)
values_from_file = MyNode(name="Read Values from File", graph=udpate_db_from_file)
update_db = MyNode(name="Update DB", graph=udpate_db_from_file)
find_file.outputs["file"].connect(values_from_file.inputs["file"])
values_from_file.outputs["file"].connect(update_db.inputs["file"])


The second graph however relies on clean input files so the first graph can
be used within the second "udpate db" graph.
For this purpose, graphs can promote input and output plugs from their nodes
to the graph level, making other graphs aware of them:
fix_file["Cleanup Filename"].inputs["file"].promote_to_graph(name="file_to_clean")
fix_file["Change Lineendings"].outputs["file"].promote_to_graph(name="clean_file")

Now the update_db graph can connect nodes to the fix_file graph
find_file.outputs["file"].connect(fix_file.inputs["file_to_clean"])
fix_file.outputs["clean_file"].connect(udpate_db_from_file["Read Values from File"].inputs["file"])


The result now looks like this:

+---udpate_db_from_file----+ +-------fix_file--------+ +--------fix_file---------+ +----udpate_db_from_file-----+ +---udpate_db_from_file----+
| Find File | | Cleanup Filename | | Change Lineendings | | Read Values from File | | Update DB |
|--------------------------| |-----------------------| |-------------------------| |----------------------------| |--------------------------|
o file<> | +--->o file<> | +--->o file<> | +--->o file<> | +--->o file<> |
| file o-----+ | file o-----+ | file o-----+ | file o-----+ | file o
+--------------------------+ +-----------------------+ +-------------------------+ +----------------------------+ +--------------------------+
print(fix_file)


Subgraphs can be accessed by their name from any participating graph
assert udpate_db_from_file.subgraphs["fix_file"] is fix_file
assert fix_file.subgraphs["udpate_db_from_file"] is udpate_db_from_file

0.6.0

StatsReporter and LogObserver are replaced by a unified and simplified Event system.

Original Issue: https://github.com/PaulSchweizer/flowpipe/issues/97

How to use:
python
Simple functions serve as listeners

def omitted_listener(node):
print("Omitted:", node.name)

def started_listener(node):
print("Started:", node.name)

def finished_listener(node):
print("Finished:", node.name)


Connect the listeners
INode.EVENTS['evaluation-omitted'].register(omitted_listener)
INode.EVENTS['evaluation-started'].register(started_listener)
INode.EVENTS['evaluation-finished'].register(finished_listener)

my_node = MyNode()

my_node.evaluate()

Inspect the node's stats after evaluation
print(my_node.stats)

>> {"eval_time": 0.123, "start_time": 1573121193}

0.5.7

0.5.6

Threading will now wait for all nodes to finish evaluation

Page 2 of 5

© 2024 Safety CLI Cybersecurity Inc. All Rights Reserved.