restflow_core/services/
workflow.rs1use crate::{
2 AppCore,
3 engine::context::namespace,
4 models::{NodeType, TaskStatus, Workflow},
5};
6use anyhow::{Context, Result, bail};
7use serde_json::Value;
8use std::collections::HashSet;
9use std::sync::Arc;
10use tokio::time::{Duration, Instant, sleep};
11use tracing::warn;
12
13pub async fn list_workflows(core: &Arc<AppCore>) -> Result<Vec<Workflow>> {
16 core.storage
17 .workflows
18 .list_workflows()
19 .context("Failed to list workflows")
20}
21
22pub async fn get_workflow(core: &Arc<AppCore>, id: &str) -> Result<Workflow> {
23 core.storage
24 .workflows
25 .get_workflow(id)
26 .with_context(|| format!("Failed to get workflow {}", id))
27}
28
29pub async fn create_workflow(core: &Arc<AppCore>, mut workflow: Workflow) -> Result<Workflow> {
30 if workflow.id.is_empty() {
32 workflow.id = format!("wf_{}", uuid::Uuid::new_v4());
33 }
34
35 core.storage
36 .workflows
37 .create_workflow(&workflow)
38 .with_context(|| format!("Failed to save workflow {}", workflow.name))?;
39
40 Ok(workflow)
41}
42
43pub async fn update_workflow(
44 core: &Arc<AppCore>,
45 id: &str,
46 workflow: Workflow,
47) -> Result<Workflow> {
48 core.storage
49 .workflows
50 .update_workflow(id, &workflow)
51 .with_context(|| format!("Failed to update workflow {}", id))?;
52
53 Ok(workflow)
54}
55
56pub async fn delete_workflow(core: &Arc<AppCore>, id: &str) -> Result<()> {
57 let _ = core.trigger_manager.deactivate_workflow(id).await;
59
60 core.storage
61 .workflows
62 .delete_workflow(id)
63 .with_context(|| format!("Failed to delete workflow {}", id))
64}
65
66pub async fn execute_workflow_inline(core: &Arc<AppCore>, mut workflow: Workflow) -> Result<Value> {
69 workflow.id = format!("inline-{}", uuid::Uuid::new_v4());
70
71 if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
72 core.get_python_manager()
73 .await
74 .context("Failed to initialize Python manager for inline execution")?;
75 }
76
77 core.storage
78 .workflows
79 .create_workflow(&workflow)
80 .with_context(|| format!("Failed to persist inline workflow {}", workflow.name))?;
81
82 let result = async {
83 let execution_id = core
84 .executor
85 .submit(workflow.id.clone(), Value::Null)
86 .await
87 .with_context(|| format!("Failed to submit workflow {}", workflow.id))?;
88
89 wait_and_collect(core, &execution_id).await
90 }
91 .await;
92
93 if let Err(e) = core.storage.workflows.delete_workflow(&workflow.id) {
94 warn!(
95 workflow_id = %workflow.id,
96 error = %e,
97 "Failed to clean up inline workflow after execution"
98 );
99 }
100
101 result
102}
103
104pub async fn execute_workflow_by_id(
105 core: &Arc<AppCore>,
106 workflow_id: &str,
107 input: Value,
108) -> Result<Value> {
109 let workflow = core.storage
111 .workflows
112 .get_workflow(workflow_id)
113 .with_context(|| format!("Failed to load workflow {}", workflow_id))?;
114
115 if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
117 core.get_python_manager()
118 .await
119 .context("Failed to initialize Python manager for workflow execution")?;
120 }
121
122 let execution_id = core
123 .executor
124 .submit(workflow_id.to_string(), input)
125 .await
126 .with_context(|| format!("Failed to execute workflow {}", workflow_id))?;
127
128 wait_and_collect(core, &execution_id).await
129}
130
131pub async fn submit_workflow(
132 core: &Arc<AppCore>,
133 workflow_id: &str,
134 input: Value,
135) -> Result<String> {
136 let workflow = core.storage
138 .workflows
139 .get_workflow(workflow_id)
140 .with_context(|| format!("Failed to load workflow {}", workflow_id))?;
141
142 if workflow.nodes.iter().any(|node| node.node_type == NodeType::Python) {
144 core.get_python_manager()
145 .await
146 .context("Failed to initialize Python manager for workflow submission")?;
147 }
148
149 core.executor
150 .submit(workflow_id.to_string(), input)
151 .await
152 .with_context(|| format!("Failed to submit workflow {}", workflow_id))
153}
154
155pub async fn get_execution_status(
156 core: &Arc<AppCore>,
157 execution_id: &str,
158) -> Result<Vec<crate::models::Task>> {
159 core.executor
160 .get_execution_status(execution_id)
161 .await
162 .with_context(|| format!("Failed to get execution status for {}", execution_id))
163}
164
165const EXECUTION_POLL_INTERVAL: Duration = Duration::from_millis(100);
166const EXECUTION_TIMEOUT: Duration = Duration::from_secs(60);
167
168async fn wait_and_collect(core: &Arc<AppCore>, execution_id: &str) -> Result<Value> {
169 let tasks = wait_for_completion(core, execution_id).await?;
170
171 if let Some(failed_task) = tasks.iter().find(|task| task.status == TaskStatus::Failed) {
172 let error_message = failed_task
173 .error
174 .clone()
175 .unwrap_or_else(|| "Workflow execution failed".to_string());
176 bail!(error_message);
177 }
178
179 Ok(build_execution_context(execution_id, &tasks))
180}
181
182async fn wait_for_completion(
183 core: &Arc<AppCore>,
184 execution_id: &str,
185) -> Result<Vec<crate::models::Task>> {
186 let deadline = Instant::now() + EXECUTION_TIMEOUT;
187
188 loop {
189 let tasks = core.executor.get_execution_status(execution_id).await?;
190
191 if !tasks.is_empty()
192 && tasks
193 .iter()
194 .all(|task| matches!(task.status, TaskStatus::Completed | TaskStatus::Failed))
195 {
196 return Ok(tasks);
197 }
198
199 if Instant::now() >= deadline {
200 bail!("Execution {} timed out", execution_id);
201 }
202
203 sleep(EXECUTION_POLL_INTERVAL).await;
204 }
205}
206
207fn build_execution_context(execution_id: &str, tasks: &[crate::models::Task]) -> Value {
208 let workflow_id = tasks
209 .first()
210 .map(|task| task.workflow_id.clone())
211 .unwrap_or_default();
212
213 let mut data = serde_json::Map::new();
214 let mut seen_keys: HashSet<String> = HashSet::new();
215
216 for task in tasks {
217 for (key, value) in &task.context.data {
218 if seen_keys.insert(key.clone()) {
219 data.insert(key.clone(), value.clone());
220 }
221 }
222
223 if let Some(output) = &task.output {
224 let key = namespace::node(&task.node_id);
225 if let Ok(output_value) = serde_json::to_value(output) {
227 data.insert(key, output_value);
228 }
229 }
230 }
231
232 serde_json::json!({
233 "workflow_id": workflow_id,
234 "execution_id": execution_id,
235 "data": data,
236 })
237}