restflow_core/services/
workflow.rs

1use crate::{
2    AppCore,
3    engine::context::namespace,
4    models::{NodeType, TaskStatus, Workflow},
5};
6use anyhow::{Context, Result, bail};
7use serde_json::Value;
8use std::collections::HashSet;
9use std::sync::Arc;
10use tokio::time::{Duration, Instant, sleep};
11use tracing::warn;
12
13// Core workflow functions that can be used by both Axum and Tauri
14
15pub async fn list_workflows(core: &Arc<AppCore>) -> Result<Vec<Workflow>> {
16    core.storage
17        .workflows
18        .list_workflows()
19        .context("Failed to list workflows")
20}
21
22pub async fn get_workflow(core: &Arc<AppCore>, id: &str) -> Result<Workflow> {
23    core.storage
24        .workflows
25        .get_workflow(id)
26        .with_context(|| format!("Failed to get workflow {}", id))
27}
28
29pub async fn create_workflow(core: &Arc<AppCore>, mut workflow: Workflow) -> Result<Workflow> {
30    // Generate ID if not provided
31    if workflow.id.is_empty() {
32        workflow.id = format!("wf_{}", uuid::Uuid::new_v4());
33    }
34
35    core.storage
36        .workflows
37        .create_workflow(&workflow)
38        .with_context(|| format!("Failed to save workflow {}", workflow.name))?;
39
40    Ok(workflow)
41}
42
43pub async fn update_workflow(
44    core: &Arc<AppCore>,
45    id: &str,
46    workflow: Workflow,
47) -> Result<Workflow> {
48    core.storage
49        .workflows
50        .update_workflow(id, &workflow)
51        .with_context(|| format!("Failed to update workflow {}", id))?;
52
53    Ok(workflow)
54}
55
56pub async fn delete_workflow(core: &Arc<AppCore>, id: &str) -> Result<()> {
57    // Try to deactivate any active triggers for this workflow
58    let _ = core.trigger_manager.deactivate_workflow(id).await;
59
60    core.storage
61        .workflows
62        .delete_workflow(id)
63        .with_context(|| format!("Failed to delete workflow {}", id))
64}
65
66// Execution functions (also in workflows API)
67
68pub async fn execute_workflow_inline(core: &Arc<AppCore>, mut workflow: Workflow) -> Result<Value> {
69    workflow.id = format!("inline-{}", uuid::Uuid::new_v4());
70
71    if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
72        core.get_python_manager()
73            .await
74            .context("Failed to initialize Python manager for inline execution")?;
75    }
76
77    core.storage
78        .workflows
79        .create_workflow(&workflow)
80        .with_context(|| format!("Failed to persist inline workflow {}", workflow.name))?;
81
82    let result = async {
83        let execution_id = core
84            .executor
85            .submit(workflow.id.clone(), Value::Null)
86            .await
87            .with_context(|| format!("Failed to submit workflow {}", workflow.id))?;
88
89        wait_and_collect(core, &execution_id).await
90    }
91    .await;
92
93    if let Err(e) = core.storage.workflows.delete_workflow(&workflow.id) {
94        warn!(
95            workflow_id = %workflow.id,
96            error = %e,
97            "Failed to clean up inline workflow after execution"
98        );
99    }
100
101    result
102}
103
104pub async fn execute_workflow_by_id(
105    core: &Arc<AppCore>,
106    workflow_id: &str,
107    input: Value,
108) -> Result<Value> {
109    // Load workflow to check if Python manager initialization is needed
110    let workflow = core.storage
111        .workflows
112        .get_workflow(workflow_id)
113        .with_context(|| format!("Failed to load workflow {}", workflow_id))?;
114
115    // Ensure Python manager is initialized if workflow contains Python nodes
116    if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
117        core.get_python_manager()
118            .await
119            .context("Failed to initialize Python manager for workflow execution")?;
120    }
121
122    let execution_id = core
123        .executor
124        .submit(workflow_id.to_string(), input)
125        .await
126        .with_context(|| format!("Failed to execute workflow {}", workflow_id))?;
127
128    wait_and_collect(core, &execution_id).await
129}
130
131pub async fn submit_workflow(
132    core: &Arc<AppCore>,
133    workflow_id: &str,
134    input: Value,
135) -> Result<String> {
136    // Load workflow to check if Python manager initialization is needed
137    let workflow = core.storage
138        .workflows
139        .get_workflow(workflow_id)
140        .with_context(|| format!("Failed to load workflow {}", workflow_id))?;
141
142    // Ensure Python manager is initialized if workflow contains Python nodes
143    if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
144        core.get_python_manager()
145            .await
146            .context("Failed to initialize Python manager for workflow submission")?;
147    }
148
149    core.executor
150        .submit(workflow_id.to_string(), input)
151        .await
152        .with_context(|| format!("Failed to submit workflow {}", workflow_id))
153}
154
155pub async fn get_execution_status(
156    core: &Arc<AppCore>,
157    execution_id: &str,
158) -> Result<Vec<crate::models::Task>> {
159    core.executor
160        .get_execution_status(execution_id)
161        .await
162        .with_context(|| format!("Failed to get execution status for {}", execution_id))
163}
164
165const EXECUTION_POLL_INTERVAL: Duration = Duration::from_millis(100);
166const EXECUTION_TIMEOUT: Duration = Duration::from_secs(60);
167
168async fn wait_and_collect(core: &Arc<AppCore>, execution_id: &str) -> Result<Value> {
169    let tasks = wait_for_completion(core, execution_id).await?;
170
171    if let Some(failed_task) = tasks.iter().find(|task| task.status == TaskStatus::Failed) {
172        let error_message = failed_task
173            .error
174            .clone()
175            .unwrap_or_else(|| "Workflow execution failed".to_string());
176        bail!(error_message);
177    }
178
179    Ok(build_execution_context(execution_id, &tasks))
180}
181
182async fn wait_for_completion(
183    core: &Arc<AppCore>,
184    execution_id: &str,
185) -> Result<Vec<crate::models::Task>> {
186    let deadline = Instant::now() + EXECUTION_TIMEOUT;
187
188    loop {
189        let tasks = core.executor.get_execution_status(execution_id).await?;
190
191        if !tasks.is_empty()
192            && tasks
193                .iter()
194                .all(|task| matches!(task.status, TaskStatus::Completed | TaskStatus::Failed))
195        {
196            return Ok(tasks);
197        }
198
199        if Instant::now() >= deadline {
200            bail!("Execution {} timed out", execution_id);
201        }
202
203        sleep(EXECUTION_POLL_INTERVAL).await;
204    }
205}
206
207fn build_execution_context(execution_id: &str, tasks: &[crate::models::Task]) -> Value {
208    let workflow_id = tasks
209        .first()
210        .map(|task| task.workflow_id.clone())
211        .unwrap_or_default();
212
213    let mut data = serde_json::Map::new();
214    let mut seen_keys: HashSet<String> = HashSet::new();
215
216    for task in tasks {
217        for (key, value) in &task.context.data {
218            if seen_keys.insert(key.clone()) {
219                data.insert(key.clone(), value.clone());
220            }
221        }
222
223        if let Some(output) = &task.output {
224            let key = namespace::node(&task.node_id);
225            // Serialize NodeOutput to Value for context storage
226            if let Ok(output_value) = serde_json::to_value(output) {
227                data.insert(key, output_value);
228            }
229        }
230    }
231
232    serde_json::json!({
233        "workflow_id": workflow_id,
234        "execution_id": execution_id,
235        "data": data,
236    })
237}