Skip to content
Snippets Groups Projects
Commit ec9ce410 authored by Riccardo Boero's avatar Riccardo Boero :innocent:
Browse files

Fix workflow task parsing

parent 38b16695
No related branches found
No related tags found
No related merge requests found
name = "FACT_workflow_manager"
uuid = "34b7aff1-f91f-4b8b-9a3d-d0a54f07d855"
authors = ["Riccardo Boero <ribo@nilu.no>"]
version = "0.0.11"
version = "0.0.12"
[deps]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
......
......@@ -111,66 +111,64 @@ manage_workflow(tasks_list, results_container, system)
```
"""
function manage_workflow(tasks_list::Vector{Dict}, results_container::Dict, system::OrchestratorRegistry.ServiceRegistry)
# Convert tasks_list into a dictionary for dependency resolution
# Convert tasks_list into a dictionary keyed by the "id" field for easy dependency resolution
# Each element of tasks_list is assumed to have a unique "id" key.
tasks = Dict(task["id"] => task for task in tasks_list)
overwrite = true
update_condition = Atomic{Bool}(false) # Flag for saving updates
# Initialize a Dict to hold Channels for each task
update_condition = Atomic{Bool}(false) # Flag for signaling updates
# Initialize Channels for each task to signal completion
task_channels = Dict{String, Channel{Bool}}()
for task_id in keys(tasks)
task_channels[task_id] = Channel{Bool}(1)
end
task_handles = [] # Initialize an array to hold task handles
task_handles = Task[] # Array of task handles for spawned threads
for (task_id, task_info) in tasks
# Validate task_info
if !(task_info isa Dict)
error("Invalid task_info for task $task_id: Expected Dict but got ", typeof(task_info))
end
t = Threads.@spawn begin
t = @spawn begin
try
# Prepare dependencies information if present
dependencies_info = Dict{String, Dict}()
# Check and process dependencies
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
if haskey(tasks, dep)
dependencies_info[dep] = tasks[dep]
else
println("Warning: Dependency $dep not found in tasks.")
# Dependency not found warning (optional)
# println("Warning: Dependency $dep not found in tasks.")
end
end
end
# Wait for dependencies
# Wait for dependencies to complete if there are any
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
# Check that the dependency exists in task_channels
if !haskey(task_channels, dep)
error("Dependency $dep not found in task channels for task $task_id.")
end
# Wait until the dependent task signals completion
take!(task_channels[dep])
put!(task_channels[dep], true) # Reput the signal for other tasks
# Put the signal back for other tasks that might depend on the same dependency
put!(task_channels[dep], true)
end
end
# Execute the task
run_task(task_id, task_info, dependencies_info, system, results_container, update_condition, overwrite)
catch ex
# Handle exceptions by printing error messages and stacktrace
println("Error occurred in task $task_id: ", ex)
println("Stacktrace: ", stacktrace(catch_backtrace()))
finally
# Signal that this task is complete
put!(task_channels[task_id], true)
end
end
push!(task_handles, t)
end
# Wait on each task to complete
# Wait for all tasks to complete
foreach(wait, task_handles)
println("All tasks completed.")
end
end
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment