restflow_core/engine/
executor.rs

1use crate::engine::context::ExecutionContext;
2use crate::engine::scheduler::Scheduler;
3use crate::models::{Node, NodeType};
4use crate::python::PythonManager;
5use crate::storage::Storage;
6use anyhow::{anyhow, Result};
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::sync::Mutex;
10use tokio::time::Duration;
11use tracing::{debug, error, info, warn};
12
13const QUEUE_POLL_INTERVAL_MS: u64 = 100;
14
15pub struct WorkflowExecutor {
16    storage: Arc<Storage>,
17    scheduler: Arc<Scheduler>,
18    num_workers: usize,
19    registry: Arc<crate::node::registry::NodeRegistry>,
20    running: Arc<Mutex<bool>>,
21    python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
22}
23
24impl WorkflowExecutor {
25    /// Create an asynchronous executor with storage and workers
26    pub fn new(
27        storage: Arc<Storage>,
28        num_workers: usize,
29        registry: Arc<crate::node::registry::NodeRegistry>,
30    ) -> Self {
31        let scheduler = Arc::new(Scheduler::new(storage.queue.clone(), storage.clone()));
32
33        Self {
34            storage,
35            scheduler,
36            num_workers,
37            registry,
38            running: Arc::new(Mutex::new(false)),
39            python_manager: Arc::new(Mutex::new(None)),
40        }
41    }
42
43    /// Set the Python manager for script execution
44    pub async fn set_python_manager(&self, manager: Arc<PythonManager>) {
45        let mut pm = self.python_manager.lock().await;
46        *pm = Some(manager);
47    }
48
49    /// Get the Python manager if available
50    pub async fn get_python_manager(&self) -> Option<Arc<PythonManager>> {
51        let pm = self.python_manager.lock().await;
52        pm.clone()
53    }
54
55    pub async fn submit(&self, workflow_id: String, input: Value) -> Result<String> {
56        self.submit_async(workflow_id, input).await
57    }
58
59    pub async fn submit_with_execution_id(
60        &self,
61        workflow_id: String,
62        input: Value,
63        execution_id: String,
64    ) -> Result<String> {
65        self.submit_async_with_id(workflow_id, input, execution_id)
66            .await
67    }
68
69    /// Submit a single node for execution
70    pub async fn submit_node(&self, node: Node, input: Value) -> Result<String> {
71        self.scheduler
72            .push_single_node(node, input)
73            .map_err(|e| anyhow::anyhow!("Failed to submit node: {}", e))
74    }
75
76    pub async fn start(&self) {
77        if !self.try_start().await {
78            return;
79        }
80
81        self.recover_stalled_tasks();
82        self.spawn_workers(self.num_workers).await;
83    }
84
85    async fn submit_async(&self, workflow_id: String, input: Value) -> Result<String> {
86        self.scheduler.submit_workflow_by_id(&workflow_id, input)
87    }
88
89    async fn submit_async_with_id(
90        &self,
91        workflow_id: String,
92        input: Value,
93        execution_id: String,
94    ) -> Result<String> {
95        self.scheduler
96            .submit_workflow_by_id_with_execution_id(&workflow_id, input, execution_id)
97    }
98
99    async fn try_start(&self) -> bool {
100        let mut running = self.running.lock().await;
101        if *running {
102            return false;
103        }
104        *running = true;
105        true
106    }
107
108    fn recover_stalled_tasks(&self) {
109        if let Err(e) = self.scheduler.recover_stalled_tasks() {
110            error!(error = %e, "Failed to recover stalled tasks");
111        }
112    }
113
114    async fn spawn_workers(&self, num_workers: usize) {
115        info!(num_workers, "Starting workers");
116
117        for worker_id in 0..num_workers {
118            let registry = self.registry.clone();
119            let running = self.running.clone();
120            let python_manager = self.python_manager.clone();
121
122            let worker = Worker::new(
123                worker_id,
124                self.storage.clone(),
125                self.scheduler.clone(),
126                registry,
127                running,
128                python_manager,
129            );
130
131            tokio::spawn(async move {
132                worker.run_worker_loop().await;
133            });
134        }
135    }
136
137    /// Resolves Templated<T> fields in NodeInput using execution context,
138    /// then delegates to node-specific executor for actual execution.
139    async fn execute_node(
140        node: &Node,
141        input: &crate::models::NodeInput,
142        context: &mut ExecutionContext,
143        registry: Arc<crate::node::registry::NodeRegistry>,
144    ) -> Result<crate::models::NodeOutput> {
145        use crate::models::NodeInput;
146
147        debug!(node_id = %node.id, node_type = ?node.node_type, "Executing node");
148
149        let executor = registry.get(&node.node_type).ok_or_else(|| {
150            anyhow::anyhow!("No executor found for node type: {:?}", node.node_type)
151        })?;
152
153        // Resolve templates and convert to Value for existing executors
154        let config = match input {
155            NodeInput::HttpRequest(http_input) => {
156                let url = http_input.url.resolve(context)?;
157                let headers = http_input.headers.as_ref()
158                    .map(|h| h.resolve(context))
159                    .transpose()?;
160                let body = http_input.body.as_ref()
161                    .map(|b| b.resolve(context))
162                    .transpose()?;
163
164                serde_json::json!({
165                    "url": url,
166                    "method": http_input.method,
167                    "headers": headers,
168                    "body": body,
169                    "timeout_ms": http_input.timeout_ms,
170                })
171            }
172            NodeInput::Agent(agent_input) => {
173                let prompt = agent_input.prompt.resolve(context)?;
174                serde_json::json!({
175                    "model": agent_input.model,
176                    "prompt": prompt.clone(),
177                    "temperature": agent_input.temperature,
178                    "api_key_config": agent_input.api_key_config,
179                    "tools": agent_input.tools,
180                    "input": prompt,  // For AgentExecutor compatibility
181                })
182            }
183            NodeInput::Python(python_input) => {
184                // Note: code is not templated to avoid conflicts with Python f-strings
185                let code = python_input.code.clone();
186                let input_data = python_input.input.as_ref()
187                    .map(|i| i.resolve(context))
188                    .transpose()?;
189
190                serde_json::json!({
191                    "code": code,
192                    "input": input_data,
193                })
194            }
195            NodeInput::Print(print_input) => {
196                let message = print_input.message.resolve(context)?;
197                serde_json::json!({
198                    "message": message,
199                })
200            }
201            NodeInput::ManualTrigger(manual_input) => {
202                // Manual triggers don't need input resolution - they provide data to the workflow
203                serde_json::to_value(manual_input)
204                    .map_err(|e| anyhow::anyhow!("Failed to serialize manual trigger input: {}", e))?
205            }
206            NodeInput::WebhookTrigger(webhook_input) => {
207                // Webhook triggers don't need input resolution - they provide data to the workflow
208                serde_json::to_value(webhook_input)
209                    .map_err(|e| anyhow::anyhow!("Failed to serialize webhook trigger input: {}", e))?
210            }
211            NodeInput::ScheduleTrigger(schedule_input) => {
212                serde_json::to_value(schedule_input)
213                    .map_err(|e| anyhow::anyhow!("Failed to serialize schedule input: {}", e))?
214            }
215        };
216
217        executor.execute(&node.node_type, &config, context).await
218    }
219
220    pub async fn get_task_status(&self, task_id: &str) -> Result<crate::models::Task> {
221        self.scheduler
222            .get_task(task_id)
223            .map_err(|e| anyhow::anyhow!("Failed to get task status: {}", e))?
224            .ok_or_else(|| anyhow::anyhow!("Task {} not found", task_id))
225    }
226
227    pub async fn get_execution_status(
228        &self,
229        execution_id: &str,
230    ) -> Result<Vec<crate::models::Task>> {
231        self.scheduler
232            .get_tasks_by_execution(execution_id)
233            .map_err(|e| anyhow::anyhow!("Failed to get execution status: {}", e))
234    }
235
236    pub async fn list_tasks(
237        &self,
238        workflow_id: Option<&str>,
239        status: Option<crate::models::TaskStatus>,
240    ) -> Result<Vec<crate::models::Task>> {
241        self.scheduler
242            .list_tasks(workflow_id, status)
243            .map_err(|e| anyhow::anyhow!("Failed to list tasks: {}", e))
244    }
245}
246
247struct Worker {
248    id: usize,
249    storage: Arc<Storage>,
250    scheduler: Arc<Scheduler>,
251    registry: Arc<crate::node::registry::NodeRegistry>,
252    running: Arc<Mutex<bool>>,
253    python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
254}
255
256impl Worker {
257    fn new(
258        id: usize,
259        storage: Arc<Storage>,
260        scheduler: Arc<Scheduler>,
261        registry: Arc<crate::node::registry::NodeRegistry>,
262        running: Arc<Mutex<bool>>,
263        python_manager: Arc<Mutex<Option<Arc<PythonManager>>>>,
264    ) -> Self {
265        Self {
266            id,
267            storage,
268            scheduler,
269            registry,
270            running,
271            python_manager,
272        }
273    }
274
275    async fn run_worker_loop(&self) {
276        info!(worker_id = self.id, "Worker started");
277
278        while *self.running.lock().await {
279            if let Err(e) = self.process_next_task().await {
280                let error_msg = e.to_string();
281                if !error_msg.contains("Failed to get task") {
282                    error!(worker_id = self.id, error = %error_msg, "Worker error");
283                }
284                tokio::time::sleep(tokio::time::Duration::from_millis(QUEUE_POLL_INTERVAL_MS))
285                    .await;
286            }
287        }
288
289        info!(worker_id = self.id, "Worker stopped");
290    }
291
292    async fn process_next_task(&self) -> Result<()> {
293        let task = self
294            .scheduler
295            .pop_task()
296            .await
297            .map_err(|e| anyhow::anyhow!("Failed to get task: {}", e))?;
298
299        debug!(worker_id = self.id, task_id = %task.id, node_id = %task.node_id, "Processing task");
300
301        let node = task.get_node(&self.storage)?;
302
303        let mut context = task.context.clone();
304        context.ensure_secret_storage(&self.storage);
305
306        // Handle Python manager based on node type
307        if node.node_type == NodeType::Python {
308            // For Python nodes, MUST have manager - wait if initializing
309            debug!(worker_id = self.id, task_id = %task.id, "Python node detected, waiting for Python manager");
310
311            let manager = {
312                let mut retries = 0;
313                const MAX_RETRIES: u32 = 300; // 30 seconds (100ms * 300)
314
315                loop {
316                    if let Some(m) = self.python_manager.lock().await.clone() {
317                        break m;
318                    }
319
320                    if retries >= MAX_RETRIES {
321                        return Err(anyhow!(
322                            "Python manager not available after {}s. \
323                             Ensure Python manager is initialized before submitting Python tasks.",
324                            MAX_RETRIES / 10
325                        ));
326                    }
327
328                    retries += 1;
329                    tokio::time::sleep(Duration::from_millis(100)).await;
330                }
331            };
332
333            debug!(worker_id = self.id, task_id = %task.id, "Python manager acquired");
334            context = context.with_python_manager(manager);
335        } else {
336            // For non-Python nodes, use if available (optional)
337            if let Some(manager) = self.python_manager.lock().await.clone() {
338                context = context.with_python_manager(manager);
339            }
340        }
341
342        let result =
343            WorkflowExecutor::execute_node(node, &task.input, &mut context, self.registry.clone()).await;
344
345        match result {
346            Ok(output) => match self.scheduler.push_downstream_tasks(&task, output.clone()) {
347                Ok(_) => {
348                    if let Err(e) = self.scheduler.complete_task(&task.id, output) {
349                        warn!(task_id = %task.id, error = %e, "Failed to persist task completion");
350                    } else {
351                        info!(task_id = %task.id, node_id = %task.node_id, "Task completed");
352                    }
353                }
354                Err(e) => {
355                    let error_msg = format!("Task succeeded but failed to push downstream: {}", e);
356                    if let Err(persist_err) = self.scheduler.fail_task(&task.id, error_msg.clone())
357                    {
358                        warn!(task_id = %task.id, error = %persist_err, "Failed to persist task failure");
359                    }
360                    error!(task_id = %task.id, error = %e, "Failed to push downstream tasks");
361                    return Err(anyhow::anyhow!(error_msg));
362                }
363            },
364            Err(error) => {
365                if let Err(e) = self.scheduler.fail_task(&task.id, error.to_string()) {
366                    warn!(task_id = %task.id, error = %e, "Failed to persist task failure");
367                }
368                error!(task_id = %task.id, error = %error, "Task execution failed");
369            }
370        }
371
372        Ok(())
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::*;
379    use crate::models::{Node, NodeType, Workflow};
380    use crate::node::registry::NodeRegistry;
381    use crate::storage::Storage;
382    use tempfile::tempdir;
383
384    fn create_test_executor() -> (WorkflowExecutor, tempfile::TempDir) {
385        let temp_dir = tempdir().unwrap();
386        let db_path = temp_dir.path().join("test.db");
387        let storage = Arc::new(Storage::new(db_path.to_str().unwrap()).unwrap());
388        let registry = Arc::new(NodeRegistry::new());
389        let executor = WorkflowExecutor::new(storage, 2, registry);
390        (executor, temp_dir)
391    }
392
393    fn create_test_node(id: &str, node_type: NodeType, config: Value) -> Node {
394        Node {
395            id: id.to_string(),
396            node_type,
397            config,
398            position: None,
399        }
400    }
401
402    fn create_test_print_node(id: &str, message: &str) -> Node {
403        create_test_node(
404            id,
405            NodeType::Print,
406            serde_json::json!({
407                "type": "Print",
408                "data": {
409                    "message": message
410                }
411            }),
412        )
413    }
414
415    fn create_test_workflow(id: &str, nodes: Vec<Node>) -> Workflow {
416        Workflow {
417            id: id.to_string(),
418            name: format!("Test Workflow {}", id),
419            nodes,
420            edges: vec![],
421        }
422    }
423
424    #[tokio::test]
425    async fn test_executor_creation() {
426        let (executor, _tmp) = create_test_executor();
427
428        // Verify executor is not running initially
429        let running = executor.running.lock().await;
430        assert!(!*running);
431    }
432
433    #[tokio::test]
434    async fn test_submit_single_node() {
435        let (executor, _tmp) = create_test_executor();
436
437        let node = create_test_print_node("print1", "Hello World");
438        let input = serde_json::json!({});
439
440        let task_id = executor.submit_node(node, input).await.unwrap();
441        assert!(!task_id.is_empty());
442
443        // Verify task was queued
444        let task = executor.get_task_status(&task_id).await.unwrap();
445        assert_eq!(task.node_id, "print1");
446    }
447
448    #[tokio::test]
449    async fn test_submit_workflow() {
450        let (executor, _tmp) = create_test_executor();
451
452        let node = create_test_print_node("print1", "Test");
453        let workflow = create_test_workflow("wf-001", vec![node]);
454
455        // Store workflow first
456        executor.storage.workflows.create_workflow(&workflow).unwrap();
457
458        let execution_id = executor.submit("wf-001".to_string(), serde_json::json!({}))
459            .await
460            .unwrap();
461
462        assert!(!execution_id.is_empty());
463    }
464
465    #[tokio::test]
466    async fn test_submit_with_custom_execution_id() {
467        let (executor, _tmp) = create_test_executor();
468
469        let node = create_test_print_node("print1", "Test");
470        let workflow = create_test_workflow("wf-001", vec![node]);
471        executor.storage.workflows.create_workflow(&workflow).unwrap();
472
473        let custom_id = "custom-exec-001".to_string();
474        let execution_id = executor
475            .submit_with_execution_id("wf-001".to_string(), serde_json::json!({}), custom_id.clone())
476            .await
477            .unwrap();
478
479        assert_eq!(execution_id, custom_id);
480    }
481
482    #[tokio::test]
483    async fn test_executor_start_idempotent() {
484        let (executor, _tmp) = create_test_executor();
485
486        // First start should succeed
487        executor.start().await;
488        let running = *executor.running.lock().await;
489        assert!(running);
490
491        // Second start should be no-op (try_start returns false)
492        let try_start_result = executor.try_start().await;
493        assert!(!try_start_result);
494    }
495
496    #[tokio::test]
497    async fn test_get_task_status() {
498        let (executor, _tmp) = create_test_executor();
499
500        let node = create_test_print_node("print1", "Test");
501        let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
502
503        let task = executor.get_task_status(&task_id).await.unwrap();
504        assert_eq!(task.id, task_id);
505        assert_eq!(task.node_id, "print1");
506    }
507
508    #[tokio::test]
509    async fn test_get_task_status_not_found() {
510        let (executor, _tmp) = create_test_executor();
511
512        let result = executor.get_task_status("nonexistent-task").await;
513        assert!(result.is_err());
514        assert!(result.unwrap_err().to_string().contains("not found"));
515    }
516
517    #[tokio::test]
518    async fn test_get_execution_status() {
519        let (executor, _tmp) = create_test_executor();
520
521        let node = create_test_print_node("print1", "Test");
522        let workflow = create_test_workflow("wf-001", vec![node]);
523        executor.storage.workflows.create_workflow(&workflow).unwrap();
524
525        let execution_id = executor.submit("wf-001".to_string(), serde_json::json!({}))
526            .await
527            .unwrap();
528
529        let tasks = executor.get_execution_status(&execution_id).await.unwrap();
530        assert_eq!(tasks.len(), 1);
531        assert_eq!(tasks[0].execution_id, execution_id);
532    }
533
534    #[tokio::test]
535    async fn test_list_tasks() {
536        let (executor, _tmp) = create_test_executor();
537
538        let node1 = create_test_print_node("print1", "Test1");
539        let node2 = create_test_print_node("print2", "Test2");
540
541        executor.submit_node(node1, serde_json::json!({})).await.unwrap();
542        executor.submit_node(node2, serde_json::json!({})).await.unwrap();
543
544        let tasks = executor.list_tasks(None, None).await.unwrap();
545        assert_eq!(tasks.len(), 2);
546    }
547
548    #[tokio::test]
549    async fn test_list_tasks_filtered_by_workflow() {
550        let (executor, _tmp) = create_test_executor();
551
552        let node = create_test_print_node("print1", "Test");
553        let workflow = create_test_workflow("wf-001", vec![node]);
554        executor.storage.workflows.create_workflow(&workflow).unwrap();
555
556        executor.submit("wf-001".to_string(), serde_json::json!({})).await.unwrap();
557
558        let tasks = executor.list_tasks(Some("wf-001"), None).await.unwrap();
559        assert_eq!(tasks.len(), 1);
560        assert_eq!(tasks[0].workflow_id, "wf-001");
561    }
562
563    #[tokio::test]
564    async fn test_execute_node_with_template_resolution() {
565        let (executor, _tmp) = create_test_executor();
566
567        // Create context with variables
568        let mut context = ExecutionContext::new("wf-001".to_string());
569        context.set_var("name", serde_json::json!("Alice"));
570
571        // Create print node with template
572        let node = create_test_node(
573            "print1",
574            NodeType::Print,
575            serde_json::json!({
576                "type": "Print",
577                "data": {
578                    "message": "Hello {{var.name}}!"
579                }
580            }),
581        );
582
583        // Parse NodeInput
584        let node_input: crate::models::NodeInput =
585            serde_json::from_value(node.config.clone()).unwrap();
586
587        let result = WorkflowExecutor::execute_node(
588            &node,
589            &node_input,
590            &mut context,
591            executor.registry.clone()
592        ).await;
593
594        assert!(result.is_ok());
595        let output = result.unwrap();
596
597        // Verify the message was interpolated
598        if let crate::models::NodeOutput::Print(print_output) = output {
599            assert_eq!(print_output.printed, "Hello Alice!");
600        } else {
601            panic!("Expected Print output");
602        }
603    }
604
605    #[tokio::test]
606    async fn test_worker_picks_up_task() {
607        let (executor, _tmp) = create_test_executor();
608
609        // Start the executor with workers
610        executor.start().await;
611
612        // Give workers time to start
613        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
614
615        // Submit a print node
616        let node = create_test_print_node("print1", "Worker test");
617        let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
618
619        // Wait for worker to pick up the task (should transition from Pending)
620        let mut attempts = 0;
621        let max_attempts = 20; // 2 seconds
622        let mut task_picked_up = false;
623
624        loop {
625            if attempts >= max_attempts {
626                break;
627            }
628
629            let task = executor.get_task_status(&task_id).await.unwrap();
630            if task.status != crate::models::TaskStatus::Pending {
631                task_picked_up = true;
632                break;
633            }
634
635            attempts += 1;
636            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
637        }
638
639        assert!(task_picked_up, "Worker should have picked up the task");
640    }
641
642    // TODO: Fix worker task completion - tasks are picked up but not completing
643    // This is a known issue that needs investigation
644    #[tokio::test]
645    #[ignore = "Worker completion logic needs fixing - task stuck in Running state"]
646    async fn test_worker_completes_task() {
647        let (executor, _tmp) = create_test_executor();
648
649        executor.start().await;
650        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
651
652        let node = create_test_print_node("print1", "Worker test");
653        let task_id = executor.submit_node(node, serde_json::json!({})).await.unwrap();
654
655        let mut attempts = 0;
656        loop {
657            if attempts >= 100 {
658                let task = executor.get_task_status(&task_id).await.unwrap();
659                panic!("Task did not complete. Final status: {:?}", task.status);
660            }
661
662            let task = executor.get_task_status(&task_id).await.unwrap();
663            if task.status == crate::models::TaskStatus::Completed {
664                assert!(task.output.is_some());
665                break;
666            }
667
668            attempts += 1;
669            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
670        }
671    }
672
673    #[tokio::test]
674    async fn test_python_manager_injection() {
675        let (executor, _tmp) = create_test_executor();
676
677        // Verify no Python manager initially
678        assert!(executor.get_python_manager().await.is_none());
679
680        // Create a mock Python manager without network/filesystem operations
681        let manager = PythonManager::new_mock();
682        executor.set_python_manager(manager).await;
683
684        // Verify Python manager is set
685        assert!(executor.get_python_manager().await.is_some());
686    }
687}