Skip to content
Snippets Groups Projects
Commit fdcd1288 authored by Riccardo Boero's avatar Riccardo Boero :innocent:
Browse files

Fix workflow task parsing

parent d15ca1d0
No related branches found
No related tags found
No related merge requests found
name = "FACT_workflow_manager"
uuid = "34b7aff1-f91f-4b8b-9a3d-d0a54f07d855"
authors = ["Riccardo Boero <ribo@nilu.no>"]
version = "0.0.7"
version = "0.0.8"
[deps]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
......
......@@ -52,75 +52,90 @@ function run_file(workflowFilePath::String, system::OrchestratorRegistry.Service
end
"""
manage_workflow(tasks::Dict, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
manage_workflow(tasks_list::Vector{Dict}, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
Executes a workflow by managing the execution of tasks based on their dependencies and the provided service registry.
This function orchestrates the execution of tasks in a workflow, ensuring that each task waits for its dependencies to complete before starting. It uses multithreading to execute tasks in parallel while respecting dependencies and resource constraints. Each task interacts with the provided results container and may trigger updates to it.
This function handles a workflow where tasks are represented as a list of dictionaries. Each task is executed in parallel using multithreading, respecting dependencies between tasks. Tasks communicate their completion status via channels, and their execution may update the results container.
# Arguments
- `tasks::Dict`: A dictionary where keys are task names and values are dictionaries containing task details. Task details include:
- `dependencies`: A list of other task names that must complete before this task can run.
- Other relevant task-specific information needed for execution.
- `results_container::Dict`: A dictionary representing the results container, used by tasks to store or retrieve results.
- `system::OrchestratorRegistry.ServiceRegistry`: The service registry used to manage services required by tasks during execution.
- tasks_list::Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes the following keys:
- id (String): The unique identifier for the task.
- dependencies (Vector{String}): A list of task IDs that must complete before this task can run.
- Other relevant task-specific information (e.g., service, function, arguments).
- results_container::Dict: A dictionary representing the results container, used by tasks to store or retrieve results.
- system::OrchestratorRegistry.ServiceRegistry: The service registry used to manage services required by tasks during execution.
# Returns
- `Ref{Bool}`: A reference to a boolean flag indicating whether the results container requires saving due to updates during task execution.
- None
# Behavior
- Initializes channels for each task to signal dependency completion.
- Spawns a thread for each task, ensuring it waits for its dependencies to signal completion.
- Uses the `system` parameter to provide access to necessary services during task execution.
- Updates the `results_container` and tracks whether it requires saving using an atomic boolean flag.
1. Converts the tasks_list into a dictionary (Dict{String, Dict}) where the task id is the key for efficient access.
2. Initializes channels for each task to signal dependency completion.
3. Spawns a thread for each task:
- Waits for dependencies to complete before execution.
- Executes the task, updating the results_container and other relevant data structures.
4. Ensures all tasks are completed before the function exits.
# Example
To execute a workflow, define the tasks, results container, and service registry:
```julia
tasks = Dict(
"task1" => Dict("dependencies" => [], "info" => "example task 1"),
"task2" => Dict("dependencies" => ["task1"], "info" => "example task 2")
)
tasks_list = [
Dict(
"id" => "task1",
"dependencies" => [],
"service" => "example_service",
"function" => "run_task1",
"arguments" => "arg1",
"parameters_dict" => Dict()
),
Dict(
"id" => "task2",
"dependencies" => ["task1"],
"service" => "example_service",
"function" => "run_task2",
"arguments" => "arg2",
"parameters_dict" => Dict()
)
]
results_container = Dict("user" => "root", "password" => "devops", "host" => "localhost", "port" => 8080)
results_container = Dict(
"user" => "root",
"password" => "password",
"host" => "localhost",
"port" => 5432
)
system = ServiceRegistry()
# Add required services to the registry
add_service!(system, Service("123", "example-service", "http://localhost", 9000))
system = ServiceRegistry() # Assuming ServiceRegistry is preconfigured
# Execute the workflow
update_needed = manage_workflow(tasks, results_container, system)
println("Results container needs saving: ", update_needed[])
manage_workflow(tasks_list, results_container, system)
```
This example demonstrates how tasks are executed respecting dependencies, with access to services provided by the registry.
"""
function manage_workflow(tasks::Dict, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
function manage_workflow(tasks_list::Vector{Dict}, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
overwrite = true
# Flag to return whether there is a need to save an updated image of FACTResultsIO
update_condition = Atomic{Bool}(false)
update_condition = Atomic{Bool}(false) # Flag for saving updates
# Convert tasks_list (Vector) into a Dict for easy access by task id
tasks = Dict(task["id"] => task for task in tasks_list)
# Initialize a Dict to hold Channels for each task
task_channels = Dict{String, Channel{Bool}}()
for task_name in keys(tasks)
# For simplicity, use an unbuffered channel
task_channels[task_name] = Channel{Bool}(1)
for task_id in keys(tasks)
task_channels[task_id] = Channel{Bool}(1)
end
task_handles = [] # Initialize an array to hold task handles
for (task_name, task_info) in tasks
for (task_id, task_info) in tasks
# Validate task_info
if !(task_info isa Dict)
error("Invalid task_info for task $task_name: Expected Dict but got ", typeof(task_info))
error("Invalid task_info for task $task_id: Expected Dict but got ", typeof(task_info))
end
t = Threads.@spawn begin
try
dependencies_info = Dict{String, Dict}()
# Check and process dependencies
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
......@@ -131,7 +146,7 @@ function manage_workflow(tasks::Dict, results_container::Dict, system::Orchestra
end
end
end
# Wait for dependencies
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
......@@ -139,23 +154,21 @@ function manage_workflow(tasks::Dict, results_container::Dict, system::Orchestra
put!(task_channels[dep], true) # Reput the signal for other tasks
end
end
# Execute the task
run_task(task_name, task_info, dependencies_info, system, results_container, update_condition, overwrite)
run_task(task_id, task_info, dependencies_info, system, results_container, update_condition, overwrite)
catch ex
println("Error occurred in task $task_name: ", ex)
println("Error occurred in task $task_id: ", ex)
println("Stacktrace: ", stacktrace(catch_backtrace()))
finally
put!(task_channels[task_name], true)
put!(task_channels[task_id], true)
end
end
push!(task_handles, t)
end
# Wait on each task to complete
#println("Waiting for all tasks to complete...")
foreach(wait, task_handles) # This waits for each task to complete
# Wait on each task to complete
foreach(wait, task_handles)
println("All tasks completed.")
end
"""
parseWorkflow(filePath::String) -> Dict{String, Any}
parseWorkflow(filePath::String)
Parses a workflow configuration file in TOML format to extract the tasks defined in the workflow.
Parses a workflow configuration file in TOML format into a flat list of dictionaries. Each task is represented as a dictionary, including an `id` key for its name.
# Arguments
- `filePath::String`: The path to the workflow configuration file in TOML format.
- filePath::String: The path to the TOML file containing the workflow configuration.
# Returns
- `Dict{String, Any}`: A dictionary containing the tasks defined in the workflow file. Each key represents a task name, and the value is another dictionary containing the task's properties and configurations.
- Vector{Dict}: A list of task dictionaries. Each dictionary represents a task and includes:
- id (String): The unique identifier for the task (extracted from the task name).
- Other key-value pairs corresponding to task-specific details from the TOML file.
# Behavior
- Reads the specified TOML file and checks for a `tasks` section.
- Returns the `tasks` dictionary if it exists.
- Throws an error if the `tasks` section is missing.
# Raises
- `Error`: If the `tasks` section is not found in the TOML file.
1. Reads and parses the specified TOML file.
2. Ensures the file contains a "tasks" section.
3. Converts each task entry into a dictionary, adding an `id` field corresponding to the task name.
4. Returns a list of these dictionaries.
# Example
```julia
# Example TOML file structure:
# [tasks.task1]
# dependencies = ["task2"]
# function = "my_function"
# arguments = "some_args"
# parameters_dict = {}
# [tasks.task2]
# dependencies = []
# function = "another_function"
# arguments = "other_args"
# parameters_dict = {}
# Parse the workflow file
tasks = parseWorkflow("path/to/workflow.toml")
[tasks.no_counties]
service = "FACT_geo"
function = "get_geo_objects"
arguments = "no_counties"
parameters_dict = {}
dependencies = []
# Access tasks
println(tasks["task1"]["function"]) # Output: "my_function"
``
[tasks.another_task]
service = "Another_service"
function = "another_function"
arguments = "arg_value"
parameters_dict = {key1 = "value1"}
dependencies = ["no_counties"]
This function reads the specified TOML file, ensuring the presence of a `tasks` section and parsing it into a dictionary for further processing in the workflow management system.
# Usage:
tasks = parseWorkflow("path/to/workflow.toml")
for task in tasks
println("Task ID: ", task["id"])
println("Service: ", task["service"])
end
``
"""
function parseWorkflow(filePath::String)
# Parse the TOML file
data = TOML.parsefile(filePath)
if haskey(data, "tasks")
return data["tasks"] # Return the tasks dictionary
else
# Ensure the "tasks" section exists
if !haskey(data, "tasks")
error("Invalid workflow file: 'tasks' section not found.")
end
# Flatten tasks into a list of dictionaries
tasks = []
for (task_name, task_details) in data["tasks"]
if task_details isa Dict
# Add the task name as the "id" and flatten into a single dictionary
task_details["id"] = task_name
push!(tasks, task_details)
else
error("Invalid task structure for task '$task_name': Expected a dictionary.")
end
end
return tasks
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment