Skip to content
Snippets Groups Projects
Commit d15ca1d0 authored by Riccardo Boero's avatar Riccardo Boero :innocent:
Browse files

Fix workflow task parsing

parent 3b9b8e21
No related branches found
No related tags found
No related merge requests found
name = "FACT_workflow_manager"
uuid = "34b7aff1-f91f-4b8b-9a3d-d0a54f07d855"
authors = ["Riccardo Boero <ribo@nilu.no>"]
version = "0.0.6"
version = "0.0.7"
[deps]
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
......
......@@ -111,48 +111,48 @@ function manage_workflow(tasks::Dict, results_container::Dict, system::Orchestra
task_handles = [] # Initialize an array to hold task handles
# Start the tasks logic
for (task_name, task_info) in tasks
#println("Spawning task: $task_name")
# Validate task_info
if !(task_info isa Dict)
error("Invalid task_info for task $task_name: Expected Dict but got ", typeof(task_info))
end
t = Threads.@spawn begin
try
# Prepare the dependencies information beforehand
dependencies_info = Dict{String, Dict}()
# Check and process dependencies
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
if haskey(tasks, dep)
dependencies_info[dep] = tasks[dep]
else
#println("Warning: Dependency $dep not found in tasks.")
println("Warning: Dependency $dep not found in tasks.")
end
end
end
# Check if the task has dependencies and wait for them if it does
# Wait for dependencies
if haskey(task_info, "dependencies") && !isempty(task_info["dependencies"])
for dep in task_info["dependencies"]
# Wait for the dependency task to signal completion
#println("[$task_name] waiting on dependency $dep")
take!(task_channels[dep])
# put back the signal into the channel because somebody else may need it!!!
put!(task_channels[dep],true)
#println("[$task_name] dependency $dep completed")
put!(task_channels[dep], true) # Reput the signal for other tasks
end
end
# Execute the task
#println("Executing task: $task_name")
run_task(task_name, task_info, dependencies_info, system, results_container, update_condition, overwrite)
#println("Finished executing task: $task_name")
catch ex
println("Error occurred in task $task_name: ", ex)
println("Stacktrace: ", stacktrace(catch_backtrace()))
finally
put!(task_channels[task_name], true)
#println("Finished executing task: $task_name")
end
end
push!(task_handles, t) # Collect the task handle
push!(task_handles, t)
end
# Wait on each task to complete
#println("Waiting for all tasks to complete...")
foreach(wait, task_handles) # This waits for each task to complete
......
"""
parseWorkflow(filePath::String) -> (Dict, Dict)
parseWorkflow(filePath::String) -> Dict{String, Any}
Parses a workflow configuration file in TOML format to extract the tasks and global settings.
Parses a workflow configuration file in TOML format to extract the tasks defined in the workflow.
# Arguments
- `filePath::String`: The path to the workflow configuration file in TOML format.
# Returns
- `(Dict, Dict)`: A tuple containing two dictionaries. The first dictionary contains the tasks defined in the workflow file, with each task's properties and configurations. The second dictionary contains global settings that apply to the entire workflow.
- `Dict{String, Any}`: A dictionary containing the tasks defined in the workflow file. Each key represents a task name, and the value is another dictionary containing the task's properties and configurations.
# Behavior
- Reads the specified TOML file and checks for a `tasks` section.
- Returns the `tasks` dictionary if it exists.
- Throws an error if the `tasks` section is missing.
# Raises
- `Error`: If the `tasks` section is not found in the TOML file.
# Example
> tasks, globals = parseWorkflow("path/to/workflow.toml")
```julia
# Example TOML file structure:
# [tasks.task1]
# dependencies = ["task2"]
# function = "my_function"
# arguments = "some_args"
# parameters_dict = {}
# [tasks.task2]
# dependencies = []
# function = "another_function"
# arguments = "other_args"
# parameters_dict = {}
# Parse the workflow file
tasks = parseWorkflow("path/to/workflow.toml")
# Access tasks
println(tasks["task1"]["function"]) # Output: "my_function"
``
This function reads the specified TOML file, parsing the defined tasks and global settings into separate dictionaries for further processing in the workflow management system.
This function reads the specified TOML file, ensuring the presence of a `tasks` section and parsing it into a dictionary for further processing in the workflow management system.
"""
function parseWorkflow(filePath::String)
# Parse the workflow file
config = TOML.parsefile(filePath)
tasks = config["tasks"]
return tasks
data = TOML.parsefile(filePath)
if haskey(data, "tasks")
return data["tasks"] # Return the tasks dictionary
else
error("Invalid workflow file: 'tasks' section not found.")
end
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment