Skip to content
Snippets Groups Projects
Commit 1704d74c authored by Riccardo Boero's avatar Riccardo Boero :innocent:
Browse files

Clean files and update results reference

parent 81becca3
No related branches found
No related tags found
No related merge requests found
......@@ -2,7 +2,7 @@
julia_version = "1.11.2"
manifest_format = "2.0"
project_hash = "5fa6dc8800c00deef572fadeff39db8ef9ce1b6b"
project_hash = "a00cdef610575cb3c86bd1777059c9b4d4bcdd34"
[[deps.ANSIColoredPrinters]]
git-tree-sha1 = "574baf8110975760d391c710b6341da1afa48d8c"
......
......@@ -10,9 +10,11 @@ Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
FACT_data_API_reader = "77ed0678-dbc6-4e3d-9156-895052ddb5a7"
FACT_unified_data_IO = "ec8d5dc6-0dfe-41d7-8c2c-855ff50b7b42"
MySQL = "39abe10b-433b-5dbd-92d4-e302a9df00cd"
OrchestratorRegistry = "b9d03754-910a-4bb6-b2d8-aec5d72e1905"
Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c"
TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76"
[compat]
FACT_data_API_reader = "0.0.1"
FACT_unified_data_IO = "0.0.1"
OrchestratorRegistry = "0.0.1"
"""
startResultsContainer(auth_code::String) -> Dict
Starts the results container using the provided authentication code.
# Arguments
- `auth_code::String`: The authentication code required to access the container registry.
# Returns
- `Dict`: A dictionary containing the configuration and status of the started results container.
# Example
> results_container = startResultsContainer("your_auth_code_here")
This function retrieves the configuration for the results container based on the provided authentication code, then starts the container and returns its configuration and status.
"""
function startResultsContainer(auth_code::String)
results_container = get_results_container(auth_code)
return results_container
end
"""
saveResultsContainer(auth_code::String)
Saves the current state of the results container to an image for later use or distribution.
# Arguments
- `auth_code::String`: The authentication code required to access the container registry and perform operations.
# Example
> saveResultsContainer("your_auth_code_here")
This function stops the results container, commits its current state to an image, and pushes this image to a registry using the provided authentication code.
"""
function saveResultsContainer(auth_code::String)
stop_container_update_image(auth_code)
end
"""
stopResultsContainer(auth_code::String)
Stops the results container identified by the authentication code.
# Arguments
- `auth_code::String`: The authentication code required to access and perform operations on the container.
# Example
> stopResultsContainer("your_auth_code_here")
This function stops the running results container using the provided authentication code.
"""
function stopResultsContainer(auth_code::String)
stop_container("fact_results", auth_token=auth_code)
end
"""
startDataContainers(services::Vector{String}, auth_code::String)
Starts data containers for the services listed in the `services` argument using the provided authentication code.
# Arguments
- `services::Vector{String}`: A vector of service names for which data containers should be started.
- `auth_code::String`: The authentication code required to access the container registry and perform operations.
# Example
> startDataContainers(["service1", "service2"], "your_auth_code_here")
This function iterates over the given services, retrieves their respective container configurations, and starts each container using the provided authentication code.
"""
function startDataContainers(services::Vector{String}, auth_code::String)
for service in services
lower_case_service=lowercase(service)
container_config = get_data_container_config(lower_case_service)
create_container(container_config, lower_case_service, auth_token=auth_code)
end
end
"""
stopDataContainers(services::Vector{String}, auth_code::String)
Stops data containers for the services listed in the `services` argument using the provided authentication code.
# Arguments
- `services::Vector{String}`: A vector of service names for which data containers should be stopped.
- `auth_code::String`: The authentication code required to access and perform operations on the container.
# Example
> stopDataContainers(["service1", "service2"], "your_auth_code_here")
This function iterates over the given services and stops each corresponding data container using the provided authentication code.
"""
function stopDataContainers(services::Vector{String}, auth_code::String)
for service in services
lower_case_service=lowercase(service)
stop_container(lower_case_service, auth_token=auth_code)
end
end
"""
collectServices(tasks::Dict) -> Vector{String}
Collects and returns a list of unique service names from the provided tasks dictionary.
# Arguments
- `tasks::Dict`: A dictionary containing task configurations, each with a service name.
# Returns
- `Vector{String}`: A vector of unique service names extracted from the tasks.
# Example
> services = collectServices(tasks)
This function parses the tasks dictionary to extract the service names associated with each task, removes duplicates, and returns the list of unique service names for further processing.
"""
function collectServices(tasks::Dict)
services = String[] # Initialize an empty array to store service values
for (task_name, task_info) in tasks
push!(services, task_info["service"]) # Collect each service value
end
return unique(services) # Remove duplicates and return the result
end
......@@ -32,10 +32,9 @@ This will initiate the workflow execution process, handling task dependencies, D
module FACT_workflow_manager
using Base.Threads, TOML, MySQL, DataFrames, Random, Dates
using FACT_unified_data_IO, FACT_data_API_reader
using FACT_unified_data_IO, FACT_data_API_reader, OrchestratorRegistry
include("TOMLParser.jl")
include("Containers.jl")
include("Task.jl")
include("Runner.jl")
......
......@@ -28,41 +28,15 @@ To run a workflow defined in a file located at `/path/to/workflow.yml`, call:
This will execute the workflow as defined, handling all necessary container operations behind the scenes.
"""
function run_file(workflowFilePath::String)
function run_file(workflowFilePath::String, system::OrchestratorRegistry.ServiceRegistry)
# Parse the workflow file
tasks, globals = parseWorkflow(workflowFilePath)
tasks = parseWorkflow(workflowFilePath)
# Extract the 'overwrite' key with a default of false
overwrite = get(globals, "overwrite", false)
# Ensure 'overwrite' is a boolean
overwrite = Bool(overwrite)
# Get a list of unique services we need for the workflow
unique_services = collectServices(tasks)
# Login to image registry
encoded_auth = registry_login("registry.nilu.no", "FACT_token", "glpat-1zG-TQx___xLsPjj2vsG")
# Start the results container & get a connection to it
results_container = startResultsContainer(encoded_auth)
# Start the data containers
startDataContainers(unique_services, encoded_auth)
# Create a reference to results container
results_container = OrchestratorRegistry.get_services_by_name(system, "FACTResultsIO")[1]
# Execute the workflow
need_to_update_results = manage_workflow(tasks, globals["platform"], results_container, overwrite)
# Stop the results container
if need_to_update_results
saveResultsContainer(encoded_auth)
else
stopResultsContainer(encoded_auth)
end
# Stop the data containers
stopDataContainers(unique_services, encoded_auth)
manage_workflow(tasks, globals["platform"], results_container)
end
"""
......@@ -85,7 +59,7 @@ To execute a workflow, define `tasks`, `platform`, and `results_container` accor
This function ensures that tasks are executed respecting their dependencies and available resources, aiming to maximize efficiency and minimize resource contention.
"""
function manage_workflow(tasks::Dict, platform::Dict, results_container::Dict, overwrite::Bool)
function manage_workflow(tasks::Dict, platform::Dict, results_container::Dict)
# Flag to return whether there is a need to save an updated image of FACTResultsIO
update_condition = Atomic{Bool}(false)
......@@ -140,5 +114,4 @@ function manage_workflow(tasks::Dict, platform::Dict, results_container::Dict, o
foreach(wait, task_handles) # This waits for each task to complete
println("All tasks completed.")
return update_condition[]
end
......@@ -17,7 +17,6 @@ This function reads the specified TOML file, parsing the defined tasks and globa
function parseWorkflow(filePath::String)
# Parse the workflow file
config = TOML.parsefile(filePath)
globals = config["globals"]
tasks = config["tasks"]
return tasks, globals
return tasks
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment