1use crate::engine::context::ExecutionContext;
2use crate::engine::scheduler::Scheduler;
3use crate::models::{Node, NodeType};
4use crate::python::PythonManager;
5use crate::storage::Storage;
6use anyhow::{anyhow, Result};
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::time::Duration;
11use tracing::{debug, error, info, warn};
12
13const QUEUE_POLL_INTERVAL_MS: u64 = 100;
14
15pub struct WorkflowExecutor {
16 storage: Arc<Storage>,
17 scheduler: Arc<Scheduler>,
18 num_workers: usize,
19 registry: Arc<crate::node::registry::NodeRegistry>,
20 running: Arc<Mutex<bool>>,
21 python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
22}
23
24impl WorkflowExecutor {
25 pub fn new(
27 storage: Arc<Storage>,
28 num_workers: usize,
29 registry: Arc<crate::node::registry::NodeRegistry>,
30 ) -> Self {
31 let scheduler = Arc::new(Scheduler::new(storage.queue.clone(), storage.clone()));
32
33 Self {
34 storage,
35 scheduler,
36 num_workers,
37 registry,
38 running: Arc::new(Mutex::new(false)),
39 python_manager: Arc::new(Mutex::new(None)),
40 }
41 }
42
43 pub async fn set_python_manager(&self, manager: Arc<PythonManager>) {
45 let mut pm = self.python_manager.lock().await;
46 *pm = Some(manager);
47 }
48
49 pub async fn get_python_manager(&self) -> Option<Arc<PythonManager>> {
51 let pm = self.python_manager.lock().await;
52 pm.clone()
53 }
54
55 pub async fn submit(&self, workflow_id: String, input: Value) -> Result<String> {
56 self.submit_async(workflow_id, input).await
57 }
58
59 pub async fn submit_with_execution_id(
60 &self,
61 workflow_id: String,
62 input: Value,
63 execution_id: String,
64 ) -> Result<String> {
65 self.submit_async_with_id(workflow_id, input, execution_id)
66 .await
67 }
68
69 pub async fn submit_node(&self, node: Node, input: Value) -> Result<String> {
71 self.scheduler
72 .push_single_node(node, input)
73 .map_err(|e| anyhow::anyhow!("Failed to submit node: {}", e))
74 }
75
76 pub async fn start(&self) {
77 if !self.try_start().await {
78 return;
79 }
80
81 self.recover_stalled_tasks();
82 self.spawn_workers(self.num_workers).await;
83 }
84
85 async fn submit_async(&self, workflow_id: String, input: Value) -> Result<String> {
86 self.scheduler.submit_workflow_by_id(&workflow_id, input)
87 }
88
89 async fn submit_async_with_id(
90 &self,
91 workflow_id: String,
92 input: Value,
93 execution_id: String,
94 ) -> Result<String> {
95 self.scheduler
96 .submit_workflow_by_id_with_execution_id(&workflow_id, input, execution_id)
97 }
98
99 async fn try_start(&self) -> bool {
100 let mut running = self.running.lock().await;
101 if *running {
102 return false;
103 }
104 *running = true;
105 true
106 }
107
108 fn recover_stalled_tasks(&self) {
109 if let Err(e) = self.scheduler.recover_stalled_tasks() {
110 error!(error = %e, "Failed to recover stalled tasks");
111 }
112 }
113
114 async fn spawn_workers(&self, num_workers: usize) {
115 info!(num_workers, "Starting workers");
116
117 for worker_id in 0..num_workers {
118 let registry = self.registry.clone();
119 let running = self.running.clone();
120 let python_manager = self.python_manager.clone();
121
122 let worker = Worker::new(
123 worker_id,
124 self.storage.clone(),
125 self.scheduler.clone(),
126 registry,
127 running,
128 python_manager,
129 );
130
131 tokio::spawn(async move {
132 worker.run_worker_loop().await;
133 });
134 }
135 }
136
137 async fn execute_node(
140 node: &Node,
141 input: &crate::models::NodeInput,
142 context: &mut ExecutionContext,
143 registry: Arc<crate::node::registry::NodeRegistry>,
144 ) -> Result<crate::models::NodeOutput> {
145 use crate::models::NodeInput;
146
147 debug!(node_id = %node.id, node_type = ?node.node_type, "Executing node");
148
149 let executor = registry.get(&node.node_type).ok_or_else(|| {
150 anyhow::anyhow!("No executor found for node type: {:?}", node.node_type)
151 })?;
152
153 let config = match input {
155 NodeInput::HttpRequest(http_input) => {
156 let url = http_input.url.resolve(context)?;
157 let headers = http_input.headers.as_ref()
158 .map(|h| h.resolve(context))
159 .transpose()?;
160 let body = http_input.body.as_ref()
161 .map(|b| b.resolve(context))
162 .transpose()?;
163
164 serde_json::json!({
165 "url": url,
166 "method": http_input.method,
167 "headers": headers,
168 "body": body,
169 "timeout_ms": http_input.timeout_ms,
170 })
171 }
172 NodeInput::Agent(agent_input) => {
173 let prompt = agent_input.prompt.resolve(context)?;
174 serde_json::json!({
175 "model": agent_input.model,
176 "prompt": prompt.clone(),
177 "temperature": agent_input.temperature,
178 "api_key_config": agent_input.api_key_config,
179 "tools": agent_input.tools,
180 "input": prompt, })
182 }
183 NodeInput::Python(python_input) => {
184 let code = python_input.code.clone();
186 let input_data = python_input.input.as_ref()
187 .map(|i| i.resolve(context))
188 .transpose()?;
189
190 serde_json::json!({
191 "code": code,
192 "input": input_data,
193 })
194 }
195 NodeInput::Print(print_input) => {
196 let message = print_input.message.resolve(context)?;
197 serde_json::json!({
198 "message": message,
199 })
200 }
201 NodeInput::ManualTrigger(manual_input) => {
202 serde_json::to_value(manual_input)
204 .map_err(|e| anyhow::anyhow!("Failed to serialize manual trigger input: {}", e))?
205 }
206 NodeInput::WebhookTrigger(webhook_input) => {
207 serde_json::to_value(webhook_input)
209 .map_err(|e| anyhow::anyhow!("Failed to serialize webhook trigger input: {}", e))?
210 }
211 NodeInput::ScheduleTrigger(schedule_input) => {
212 serde_json::to_value(schedule_input)
213 .map_err(|e| anyhow::anyhow!("Failed to serialize schedule input: {}", e))?
214 }
215 };
216
217 executor.execute(&node.node_type, &config, context).await
218 }
219
220 pub async fn get_task_status(&self, task_id: &str) -> Result<crate::models::Task> {
221 self.scheduler
222 .get_task(task_id)
223 .map_err(|e| anyhow::anyhow!("Failed to get task status: {}", e))?
224 .ok_or_else(|| anyhow::anyhow!("Task {} not found", task_id))
225 }
226
227 pub async fn get_execution_status(
228 &self,
229 execution_id: &str,
230 ) -> Result<Vec<crate::models::Task>> {
231 self.scheduler
232 .get_tasks_by_execution(execution_id)
233 .map_err(|e| anyhow::anyhow!("Failed to get execution status: {}", e))
234 }
235
236 pub async fn list_tasks(
237 &self,
238 workflow_id: Option<&str>,
239 status: Option<crate::models::TaskStatus>,
240 ) -> Result<Vec<crate::models::Task>> {
241 self.scheduler
242 .list_tasks(workflow_id, status)
243 .map_err(|e| anyhow::anyhow!("Failed to list tasks: {}", e))
244 }
245}
246
247struct Worker {
248 id: usize,
249 storage: Arc<Storage>,
250 scheduler: Arc<Scheduler>,
251 registry: Arc<crate::node::registry::NodeRegistry>,
252 running: Arc<Mutex<bool>>,
253 python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
254}
255
256impl Worker {
257 fn new(
258 id: usize,
259 storage: Arc<Storage>,
260 scheduler: Arc<Scheduler>,
261 registry: Arc<crate::node::registry::NodeRegistry>,
262 running: Arc<Mutex<bool>>,
263 python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
264 ) -> Self {
265 Self {
266 id,
267 storage,
268 scheduler,
269 registry,
270 running,
271 python_manager,
272 }
273 }
274
275 async fn run_worker_loop(&self) {
276 info!(worker_id = self.id, "Worker started");
277
278 while *self.running.lock().await {
279 if let Err(e) = self.process_next_task().await {
280 let error_msg = e.to_string();
281 if !error_msg.contains("Failed to get task") {
282 error!(worker_id = self.id, error = %error_msg, "Worker error");
283 }
284 tokio::time::sleep(tokio::time::Duration::from_millis(QUEUE_POLL_INTERVAL_MS))
285 .await;
286 }
287 }
288
289 info!(worker_id = self.id, "Worker stopped");
290 }
291
292 async fn process_next_task(&self) -> Result<()> {
293 let task = self
294 .scheduler
295 .pop_task()
296 .await
297 .map_err(|e| anyhow::anyhow!("Failed to get task: {}", e))?;
298
299 debug!(worker_id = self.id, task_id = %task.id, node_id = %task.node_id, "Processing task");
300
301 let node = task.get_node(&self.storage)?;
302
303 let mut context = task.context.clone();
304 context.ensure_secret_storage(&self.storage);
305
306 if node.node_type == NodeType::Python {
308 debug!(worker_id = self.id, task_id = %task.id, "Python node detected, waiting for Python manager");
310
311 let manager = {
312 let mut retries = 0;
313 const MAX_RETRIES: u32 = 300; loop {
316 if let Some(m) = self.python_manager.lock().await.clone() {
317 break m;
318 }
319
320 if retries >= MAX_RETRIES {
321 return Err(anyhow!(
322 "Python manager not available after {}s. \
323 Ensure Python manager is initialized before submitting Python tasks.",
324 MAX_RETRIES / 10
325 ));
326 }
327
328 retries += 1;
329 tokio::time::sleep(Duration::from_millis(100)).await;
330 }
331 };
332
333 debug!(worker_id = self.id, task_id = %task.id, "Python manager acquired");
334 context = context.with_python_manager(manager);
335 } else {
336 if let Some(manager) = self.python_manager.lock().await.clone() {
338 context = context.with_python_manager(manager);
339 }
340 }
341
342 let result =
343 WorkflowExecutor::execute_node(node, &task.input, &mut context, self.registry.clone()).await;
344
345 match result {
346 Ok(output) => match self.scheduler.push_downstream_tasks(&task, output.clone()) {
347 Ok(_) => {
348 if let Err(e) = self.scheduler.complete_task(&task.id, output) {
349 warn!(task_id = %task.id, error = %e, "Failed to persist task completion");
350 } else {
351 info!(task_id = %task.id, node_id = %task.node_id, "Task completed");
352 }
353 }
354 Err(e) => {
355 let error_msg = format!("Task succeeded but failed to push downstream: {}", e);
356 if let Err(persist_err) = self.scheduler.fail_task(&task.id, error_msg.clone())
357 {
358 warn!(task_id = %task.id, error = %persist_err, "Failed to persist task failure");
359 }
360 error!(task_id = %task.id, error = %e, "Failed to push downstream tasks");
361 return Err(anyhow::anyhow!(error_msg));
362 }
363 },
364 Err(error) => {
365 if let Err(e) = self.scheduler.fail_task(&task.id, error.to_string()) {
366 warn!(task_id = %task.id, error = %e, "Failed to persist task failure");
367 }
368 error!(task_id = %task.id, error = %error, "Task execution failed");
369 }
370 }
371
372 Ok(())
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::*;
379 use crate::models::{Node, NodeType, Workflow};
380 use crate::node::registry::NodeRegistry;
381 use crate::storage::Storage;
382 use tempfile::tempdir;
383
384 fn create_test_executor() -> (WorkflowExecutor, tempfile::TempDir) {
385 let temp_dir = tempdir().unwrap();
386 let db_path = temp_dir.path().join("test.db");
387 let storage = Arc::new(Storage::new(db_path.to_str().unwrap()).unwrap());
388 let registry = Arc::new(NodeRegistry::new());
389 let executor = WorkflowExecutor::new(storage, 2, registry);
390 (executor, temp_dir)
391 }
392
393 fn create_test_node(id: &str, node_type: NodeType, config: Value) -> Node {
394 Node {
395 id: id.to_string(),
396 node_type,
397 config,
398 position: None,
399 }
400 }
401
402 fn create_test_print_node(id: &str, message: &str) -> Node {
403 create_test_node(
404 id,
405 NodeType::Print,
406 serde_json::json!({
407 "type": "Print",
408 "data": {
409 "message": message
410 }
411 }),
412 )
413 }
414
415 fn create_test_workflow(id: &str, nodes: Vec<Node>) -> Workflow {
416 Workflow {
417 id: id.to_string(),
418 name: format!("Test Workflow {}", id),
419 nodes,
420 edges: vec![],
421 }
422 }
423
424 #[tokio::test]
425 async fn test_executor_creation() {
426 let (executor, _tmp) = create_test_executor();
427
428 let running = executor.running.lock().await;
430 assert!(!*running);
431 }
432
433 #[tokio::test]
434 async fn test_submit_single_node() {
435 let (executor, _tmp) = create_test_executor();
436
437 let node = create_test_print_node("print1", "Hello World");
438 let input = serde_json::json!({});
439
440 let task_id = executor.submit_node(node, input).await.unwrap();
441 assert!(!task_id.is_empty());
442
443 let task = executor.get_task_status(&task_id).await.unwrap();
445 assert_eq!(task.node_id, "print1");
446 }
447
448 #[tokio::test]
449 async fn test_submit_workflow() {
450 let (executor, _tmp) = create_test_executor();
451
452 let node = create_test_print_node("print1", "Test");
453 let workflow = create_test_workflow("wf-001", vec![node]);
454
455 executor.storage.workflows.create_workflow(&workflow).unwrap();
457
458 let execution_id = executor.submit("wf-001".to_string(), serde_json::json!({}))
459 .await
460 .unwrap();
461
462 assert!(!execution_id.is_empty());
463 }
464
465 #[tokio::test]
466 async fn test_submit_with_custom_execution_id() {
467 let (executor, _tmp) = create_test_executor();
468
469 let node = create_test_print_node("print1", "Test");
470 let workflow = create_test_workflow("wf-001", vec![node]);
471 executor.storage.workflows.create_workflow(&workflow).unwrap();
472
473 let custom_id = "custom-exec-001".to_string();
474 let execution_id = executor
475 .submit_with_execution_id("wf-001".to_string(), serde_json::json!({}), custom_id.clone())
476 .await
477 .unwrap();
478
479 assert_eq!(execution_id, custom_id);
480 }
481
482 #[tokio::test]
483 async fn test_executor_start_idempotent() {
484 let (executor, _tmp) = create_test_executor();
485
486 executor.start().await;
488 let running = *executor.running.lock().await;
489 assert!(running);
490
491 let try_start_result = executor.try_start().await;
493 assert!(!try_start_result);
494 }
495
496 #[tokio::test]
497 async fn test_get_task_status() {
498 let (executor, _tmp) = create_test_executor();
499
500 let node = create_test_print_node("print1", "Test");
501 let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
502
503 let task = executor.get_task_status(&task_id).await.unwrap();
504 assert_eq!(task.id, task_id);
505 assert_eq!(task.node_id, "print1");
506 }
507
508 #[tokio::test]
509 async fn test_get_task_status_not_found() {
510 let (executor, _tmp) = create_test_executor();
511
512 let result = executor.get_task_status("nonexistent-task").await;
513 assert!(result.is_err());
514 assert!(result.unwrap_err().to_string().contains("not found"));
515 }
516
517 #[tokio::test]
518 async fn test_get_execution_status() {
519 let (executor, _tmp) = create_test_executor();
520
521 let node = create_test_print_node("print1", "Test");
522 let workflow = create_test_workflow("wf-001", vec![node]);
523 executor.storage.workflows.create_workflow(&workflow).unwrap();
524
525 let execution_id = executor.submit("wf-001".to_string(), serde_json::json!({}))
526 .await
527 .unwrap();
528
529 let tasks = executor.get_execution_status(&execution_id).await.unwrap();
530 assert_eq!(tasks.len(), 1);
531 assert_eq!(tasks[0].execution_id, execution_id);
532 }
533
534 #[tokio::test]
535 async fn test_list_tasks() {
536 let (executor, _tmp) = create_test_executor();
537
538 let node1 = create_test_print_node("print1", "Test1");
539 let node2 = create_test_print_node("print2", "Test2");
540
541 executor.submit_node(node1, serde_json::json!({})).await.unwrap();
542 executor.submit_node(node2, serde_json::json!({})).await.unwrap();
543
544 let tasks = executor.list_tasks(None, None).await.unwrap();
545 assert_eq!(tasks.len(), 2);
546 }
547
548 #[tokio::test]
549 async fn test_list_tasks_filtered_by_workflow() {
550 let (executor, _tmp) = create_test_executor();
551
552 let node = create_test_print_node("print1", "Test");
553 let workflow = create_test_workflow("wf-001", vec![node]);
554 executor.storage.workflows.create_workflow(&workflow).unwrap();
555
556 executor.submit("wf-001".to_string(), serde_json::json!({})).await.unwrap();
557
558 let tasks = executor.list_tasks(Some("wf-001"), None).await.unwrap();
559 assert_eq!(tasks.len(), 1);
560 assert_eq!(tasks[0].workflow_id, "wf-001");
561 }
562
563 #[tokio::test]
564 async fn test_execute_node_with_template_resolution() {
565 let (executor, _tmp) = create_test_executor();
566
567 let mut context = ExecutionContext::new("wf-001".to_string());
569 context.set_var("name", serde_json::json!("Alice"));
570
571 let node = create_test_node(
573 "print1",
574 NodeType::Print,
575 serde_json::json!({
576 "type": "Print",
577 "data": {
578 "message": "Hello {{var.name}}!"
579 }
580 }),
581 );
582
583 let node_input: crate::models::NodeInput =
585 serde_json::from_value(node.config.clone()).unwrap();
586
587 let result = WorkflowExecutor::execute_node(
588 &node,
589 &node_input,
590 &mut context,
591 executor.registry.clone()
592 ).await;
593
594 assert!(result.is_ok());
595 let output = result.unwrap();
596
597 if let crate::models::NodeOutput::Print(print_output) = output {
599 assert_eq!(print_output.printed, "Hello Alice!");
600 } else {
601 panic!("Expected Print output");
602 }
603 }
604
605 #[tokio::test]
606 async fn test_worker_picks_up_task() {
607 let (executor, _tmp) = create_test_executor();
608
609 executor.start().await;
611
612 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
614
615 let node = create_test_print_node("print1", "Worker test");
617 let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
618
619 let mut attempts = 0;
621 let max_attempts = 20; let mut task_picked_up = false;
623
624 loop {
625 if attempts >= max_attempts {
626 break;
627 }
628
629 let task = executor.get_task_status(&task_id).await.unwrap();
630 if task.status != crate::models::TaskStatus::Pending {
631 task_picked_up = true;
632 break;
633 }
634
635 attempts += 1;
636 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
637 }
638
639 assert!(task_picked_up, "Worker should have picked up the task");
640 }
641
642 #[tokio::test]
645 #[ignore = "Worker completion logic needs fixing - task stuck in Running state"]
646 async fn test_worker_completes_task() {
647 let (executor, _tmp) = create_test_executor();
648
649 executor.start().await;
650 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
651
652 let node = create_test_print_node("print1", "Worker test");
653 let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
654
655 let mut attempts = 0;
656 loop {
657 if attempts >= 100 {
658 let task = executor.get_task_status(&task_id).await.unwrap();
659 panic!("Task did not complete. Final status: {:?}", task.status);
660 }
661
662 let task = executor.get_task_status(&task_id).await.unwrap();
663 if task.status == crate::models::TaskStatus::Completed {
664 assert!(task.output.is_some());
665 break;
666 }
667
668 attempts += 1;
669 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
670 }
671 }
672
673 #[tokio::test]
674 async fn test_python_manager_injection() {
675 let (executor, _tmp) = create_test_executor();
676
677 assert!(executor.get_python_manager().await.is_none());
679
680 let manager = PythonManager::new_mock();
682 executor.set_python_manager(manager).await;
683
684 assert!(executor.get_python_manager().await.is_some());
686 }
687}