restflow_core/engine/
cron_scheduler.rs

1use crate::engine::executor::WorkflowExecutor;
2use crate::models::ActiveTrigger;
3use crate::storage::Storage;
4use anyhow::{Result, anyhow};
5use chrono_tz::Tz;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::{
10    Arc,
11    atomic::{AtomicBool, Ordering},
12};
13use tokio::sync::RwLock;
14use tokio_cron_scheduler::{Job, JobScheduler};
15use tracing::{debug, error, info};
16use uuid::Uuid;
17
18/// Cron scheduler responsible for managing all scheduled jobs
19///
20/// Features:
21/// - Supports standard cron expressions
22/// - Supports timezone configuration
23/// - Automatically triggers workflow execution
24/// - Persists job mapping relationships
25pub struct CronScheduler {
26    /// tokio-cron-scheduler instance
27    scheduler: JobScheduler,
28    /// Storage layer used to fetch workflows
29    storage: Arc<Storage>,
30    /// Workflow executor
31    executor: Arc<WorkflowExecutor>,
32    /// job_uuid -> trigger_id mapping table (used for job removal)
33    job_map: Arc<RwLock<HashMap<Uuid, String>>>,
34    /// Tracks whether shutdown has been requested to prevent new schedules
35    is_shutdown: AtomicBool,
36}
37
38impl CronScheduler {
39    /// Create a new Cron scheduler
40    pub async fn new(storage: Arc<Storage>, executor: Arc<WorkflowExecutor>) -> Result<Self> {
41        let scheduler = JobScheduler::new()
42            .await
43            .map_err(|e| anyhow!("Failed to create JobScheduler: {}", e))?;
44
45        Ok(Self {
46            scheduler,
47            storage,
48            executor,
49            job_map: Arc::new(RwLock::new(HashMap::new())),
50            is_shutdown: AtomicBool::new(false),
51        })
52    }
53
54    /// Start the scheduler
55    pub async fn start(&self) -> Result<()> {
56        self.scheduler
57            .start()
58            .await
59            .map_err(|e| anyhow!("Failed to start scheduler: {}", e))?;
60
61        info!("CronScheduler started successfully");
62        Ok(())
63    }
64
65    /// Add a scheduled job
66    ///
67    /// # Parameters
68    /// - `trigger`: active trigger information
69    /// - `cron_expr`: 6-field cron expression (sec min hour day month weekday), e.g. "0 0 0 * * *" for midnight every day
70    /// - `timezone`: timezone (e.g. "Asia/Shanghai"), None indicates UTC
71    /// - `payload`: payload passed to the workflow when triggered
72    pub async fn add_schedule(
73        &self,
74        trigger: &ActiveTrigger,
75        cron_expr: String,
76        timezone: Option<String>,
77        payload: Option<Value>,
78    ) -> Result<()> {
79        if self.is_shutdown.load(Ordering::SeqCst) {
80            return Err(anyhow!("Scheduler has been shutdown"));
81        }
82
83        let trigger_id = trigger.id.clone();
84        let workflow_id = trigger.workflow_id.clone();
85        let executor = self.executor.clone();
86        let storage = self.storage.clone();
87
88        let trigger_payload = payload.unwrap_or(Value::Object(Default::default()));
89
90        debug!(
91            trigger_id = %trigger_id,
92            workflow_id = %workflow_id,
93            cron = %cron_expr,
94            timezone = ?timezone,
95            "Adding cron schedule"
96        );
97
98        let job = if let Some(tz) = timezone {
99            let timezone: Tz =
100                Tz::from_str(&tz).map_err(|e| anyhow!("Invalid timezone {}: {}", tz, e))?;
101            Job::new_async_tz(cron_expr.as_str(), timezone, move |_uuid, _l| {
102                let workflow_id = workflow_id.clone();
103                let executor = executor.clone();
104                let storage = storage.clone();
105                let payload = trigger_payload.clone();
106                let trigger_id = trigger_id.clone();
107
108                Box::pin(async move {
109                    info!(
110                        trigger_id = %trigger_id,
111                        workflow_id = %workflow_id,
112                        "Cron job triggered"
113                    );
114
115                    match executor.submit(workflow_id.clone(), payload).await {
116                        Ok(execution_id) => {
117                            info!(
118                                execution_id = %execution_id,
119                                workflow_id = %workflow_id,
120                                "Workflow execution submitted by cron trigger"
121                            );
122
123                            if let Err(e) = update_trigger_stats(&storage, &trigger_id) {
124                                error!(error = ?e, "Failed to update trigger statistics");
125                            }
126                        }
127                        Err(e) => {
128                            error!(
129                                error = ?e,
130                                workflow_id = %workflow_id,
131                                "Failed to submit workflow execution"
132                            );
133                        }
134                    }
135                })
136            })
137            .map_err(|e| anyhow!("Failed to create cron job with timezone: {}", e))?
138        } else {
139            Job::new_async(cron_expr.as_str(), move |_uuid, _l| {
140                let workflow_id = workflow_id.clone();
141                let executor = executor.clone();
142                let storage = storage.clone();
143                let payload = trigger_payload.clone();
144                let trigger_id = trigger_id.clone();
145
146                Box::pin(async move {
147                    info!(
148                        trigger_id = %trigger_id,
149                        workflow_id = %workflow_id,
150                        "Cron job triggered (UTC)"
151                    );
152
153                    match executor.submit(workflow_id.clone(), payload).await {
154                        Ok(execution_id) => {
155                            info!(
156                                execution_id = %execution_id,
157                                workflow_id = %workflow_id,
158                                "Workflow execution submitted by cron trigger"
159                            );
160
161                            if let Err(e) = update_trigger_stats(&storage, &trigger_id) {
162                                error!(error = ?e, "Failed to update trigger statistics");
163                            }
164                        }
165                        Err(e) => {
166                            error!(
167                                error = ?e,
168                                workflow_id = %workflow_id,
169                                "Failed to submit workflow execution"
170                            );
171                        }
172                    }
173                })
174            })
175            .map_err(|e| anyhow!("Failed to create cron job: {}", e))?
176        };
177
178        let job_uuid = self
179            .scheduler
180            .add(job)
181            .await
182            .map_err(|e| anyhow!("Failed to add job to scheduler: {}", e))?;
183
184        self.job_map
185            .write()
186            .await
187            .insert(job_uuid, trigger.id.clone());
188
189        info!(
190            trigger_id = %trigger.id,
191            job_uuid = %job_uuid,
192            cron = %cron_expr,
193            "Cron schedule added successfully"
194        );
195
196        Ok(())
197    }
198
199    /// Remove a scheduled job
200    ///
201    /// Returns Ok(true) when a job was found and removed, Ok(false) when no job existed,
202    /// and Err if the underlying scheduler operation failed.
203    pub async fn remove_schedule(&self, trigger_id: &str) -> Result<bool> {
204        let job_uuid = {
205            let map = self.job_map.read().await;
206            map.iter()
207                .find_map(|(uuid, id)| if id == trigger_id { Some(*uuid) } else { None })
208        };
209
210        if let Some(uuid) = job_uuid {
211            self.scheduler
212                .remove(&uuid)
213                .await
214                .map_err(|e| anyhow!("Failed to remove job from scheduler: {}", e))?;
215
216            self.job_map.write().await.remove(&uuid);
217
218            info!(trigger_id = %trigger_id, job_uuid = %uuid, "Cron schedule removed");
219            Ok(true)
220        } else {
221            debug!(
222                trigger_id = %trigger_id,
223                "No cron job found for trigger (might already be removed)"
224            );
225            Ok(false)
226        }
227    }
228
229    /// Shut down the scheduler
230    pub async fn shutdown(&mut self) -> Result<()> {
231        self.is_shutdown.store(true, Ordering::SeqCst);
232
233        self.scheduler
234            .shutdown()
235            .await
236            .map_err(|e| anyhow!("Failed to shutdown scheduler: {}", e))?;
237
238        info!("CronScheduler shutdown successfully");
239        Ok(())
240    }
241
242    /// Get the number of active jobs
243    pub async fn active_job_count(&self) -> usize {
244        self.job_map.read().await.len()
245    }
246}
247
248/// Update trigger statistics
249fn update_trigger_stats(storage: &Storage, trigger_id: &str) -> Result<()> {
250    if let Some(mut trigger) = storage.triggers.get_active_trigger(trigger_id)? {
251        trigger.record_trigger();
252        storage.triggers.update_trigger(&trigger)?;
253    }
254    Ok(())
255}
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260    use crate::models::{Node, NodeType, TriggerConfig, Workflow};
261    use crate::node::registry::NodeRegistry;
262    use tempfile::tempdir;
263
264    async fn setup_test_scheduler() -> (CronScheduler, Arc<Storage>, tempfile::TempDir) {
265        let temp_dir = tempdir().unwrap();
266        let db_path = temp_dir.path().join("test.db");
267        let storage = Arc::new(Storage::new(db_path.to_str().unwrap()).unwrap());
268        let registry = Arc::new(NodeRegistry::new());
269
270        // Create a simple test workflow
271        let workflow = Workflow {
272            id: "test-workflow".to_string(),
273            name: "Test Workflow".to_string(),
274            nodes: vec![Node {
275                id: "test-node".to_string(),
276                node_type: NodeType::Print,
277                config: serde_json::json!({
278                    "type": "Print",
279                    "data": {
280                        "message": "Test message"
281                    }
282                }),
283                position: None,
284            }],
285            edges: vec![],
286        };
287        storage.workflows.create_workflow(&workflow).unwrap();
288
289        let executor = Arc::new(WorkflowExecutor::new(storage.clone(), 4, registry));
290
291        let scheduler = CronScheduler::new(storage.clone(), executor).await.unwrap();
292
293        (scheduler, storage, temp_dir)
294    }
295
296    #[tokio::test]
297    async fn test_add_and_remove_schedule() {
298        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
299
300        scheduler.start().await.unwrap();
301
302        let trigger = ActiveTrigger::new(
303            "test-workflow".to_string(),
304            TriggerConfig::Schedule {
305                cron: "0 0 * * * *".to_string(), // 6-field format: sec min hour day month weekday
306                timezone: None,
307                payload: None,
308            },
309        );
310
311        // Add schedule
312        scheduler
313            .add_schedule(&trigger, "0 0 * * * *".to_string(), None, None)
314            .await
315            .unwrap();
316
317        assert_eq!(scheduler.active_job_count().await, 1);
318
319        // Remove schedule
320        let removed = scheduler.remove_schedule(&trigger.id).await.unwrap();
321        assert!(removed);
322
323        assert_eq!(scheduler.active_job_count().await, 0);
324
325        scheduler.shutdown().await.unwrap();
326    }
327
328    #[tokio::test]
329    async fn test_start_and_shutdown() {
330        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
331
332        // Start scheduler
333        scheduler.start().await.unwrap();
334
335        // Add a test job to verify scheduler is running
336        let trigger = ActiveTrigger::new(
337            "test-workflow".to_string(),
338            TriggerConfig::Schedule {
339                cron: "0 0 * * * *".to_string(),
340                timezone: None,
341                payload: None,
342            },
343        );
344
345        scheduler
346            .add_schedule(&trigger, "0 0 * * * *".to_string(), None, None)
347            .await
348            .unwrap();
349
350        assert_eq!(scheduler.active_job_count().await, 1);
351
352        // Shutdown scheduler
353        scheduler.shutdown().await.unwrap();
354
355        // Verify job count is still maintained (job_map is not cleared)
356        assert_eq!(scheduler.active_job_count().await, 1);
357    }
358
359    #[tokio::test]
360    async fn test_invalid_cron_expression() {
361        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
362
363        scheduler.start().await.unwrap();
364
365        let trigger = ActiveTrigger::new(
366            "test-workflow".to_string(),
367            TriggerConfig::Schedule {
368                cron: "invalid cron".to_string(),
369                timezone: None,
370                payload: None,
371            },
372        );
373
374        // Should fail due to invalid cron expression
375        let result = scheduler
376            .add_schedule(&trigger, "invalid cron".to_string(), None, None)
377            .await;
378
379        assert!(result.is_err());
380        assert!(
381            result
382                .unwrap_err()
383                .to_string()
384                .contains("Failed to create cron job")
385        );
386
387        // No job should be added
388        assert_eq!(scheduler.active_job_count().await, 0);
389
390        scheduler.shutdown().await.unwrap();
391    }
392
393    #[tokio::test]
394    async fn test_invalid_timezone() {
395        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
396
397        scheduler.start().await.unwrap();
398
399        let trigger = ActiveTrigger::new(
400            "test-workflow".to_string(),
401            TriggerConfig::Schedule {
402                cron: "0 0 * * * *".to_string(),
403                timezone: Some("Invalid/Timezone".to_string()),
404                payload: None,
405            },
406        );
407
408        // Should fail due to invalid timezone
409        let result = scheduler
410            .add_schedule(
411                &trigger,
412                "0 0 * * * *".to_string(),
413                Some("Invalid/Timezone".to_string()),
414                None,
415            )
416            .await;
417
418        assert!(result.is_err());
419        assert!(result.unwrap_err().to_string().contains("Invalid timezone"));
420
421        // No job should be added
422        assert_eq!(scheduler.active_job_count().await, 0);
423
424        scheduler.shutdown().await.unwrap();
425    }
426
427    #[tokio::test]
428    async fn test_schedule_with_timezone() {
429        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
430
431        scheduler.start().await.unwrap();
432
433        let trigger = ActiveTrigger::new(
434            "test-workflow".to_string(),
435            TriggerConfig::Schedule {
436                cron: "0 0 * * * *".to_string(),
437                timezone: Some("Asia/Shanghai".to_string()),
438                payload: None,
439            },
440        );
441
442        // Should succeed with valid timezone
443        scheduler
444            .add_schedule(
445                &trigger,
446                "0 0 * * * *".to_string(),
447                Some("Asia/Shanghai".to_string()),
448                None,
449            )
450            .await
451            .unwrap();
452
453        assert_eq!(scheduler.active_job_count().await, 1);
454
455        // Clean up
456        scheduler.remove_schedule(&trigger.id).await.unwrap();
457        scheduler.shutdown().await.unwrap();
458    }
459
460    #[tokio::test]
461    async fn test_remove_nonexistent_schedule() {
462        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
463
464        scheduler.start().await.unwrap();
465
466        // Try to remove a schedule that doesn't exist
467        let removed = scheduler
468            .remove_schedule("nonexistent-trigger-id")
469            .await
470            .unwrap();
471
472        // Should return false when no job was found
473        assert!(!removed);
474
475        scheduler.shutdown().await.unwrap();
476    }
477
478    #[tokio::test]
479    async fn test_active_job_count() {
480        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
481
482        scheduler.start().await.unwrap();
483
484        // Initially no jobs
485        assert_eq!(scheduler.active_job_count().await, 0);
486
487        // Add first job
488        let trigger1 = ActiveTrigger::new(
489            "test-workflow".to_string(),
490            TriggerConfig::Schedule {
491                cron: "0 0 * * * *".to_string(),
492                timezone: None,
493                payload: None,
494            },
495        );
496
497        scheduler
498            .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
499            .await
500            .unwrap();
501
502        assert_eq!(scheduler.active_job_count().await, 1);
503
504        // Add second job
505        let trigger2 = ActiveTrigger::new(
506            "test-workflow".to_string(),
507            TriggerConfig::Schedule {
508                cron: "0 30 * * * *".to_string(),
509                timezone: None,
510                payload: None,
511            },
512        );
513
514        scheduler
515            .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
516            .await
517            .unwrap();
518
519        assert_eq!(scheduler.active_job_count().await, 2);
520
521        // Remove first job
522        scheduler.remove_schedule(&trigger1.id).await.unwrap();
523        assert_eq!(scheduler.active_job_count().await, 1);
524
525        // Remove second job
526        scheduler.remove_schedule(&trigger2.id).await.unwrap();
527        assert_eq!(scheduler.active_job_count().await, 0);
528
529        scheduler.shutdown().await.unwrap();
530    }
531
532    #[tokio::test]
533    async fn test_multiple_schedules_same_workflow() {
534        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
535
536        scheduler.start().await.unwrap();
537
538        // Add multiple schedules for the same workflow
539        let trigger1 = ActiveTrigger::new(
540            "test-workflow".to_string(),
541            TriggerConfig::Schedule {
542                cron: "0 0 * * * *".to_string(), // Every hour
543                timezone: None,
544                payload: None,
545            },
546        );
547
548        let trigger2 = ActiveTrigger::new(
549            "test-workflow".to_string(),
550            TriggerConfig::Schedule {
551                cron: "0 30 * * * *".to_string(), // Every hour at 30 minutes
552                timezone: None,
553                payload: None,
554            },
555        );
556
557        let trigger3 = ActiveTrigger::new(
558            "test-workflow".to_string(),
559            TriggerConfig::Schedule {
560                cron: "0 15 * * * *".to_string(), // Every hour at 15 minutes
561                timezone: Some("America/New_York".to_string()),
562                payload: None,
563            },
564        );
565
566        // Add all three schedules
567        scheduler
568            .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
569            .await
570            .unwrap();
571
572        scheduler
573            .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
574            .await
575            .unwrap();
576
577        scheduler
578            .add_schedule(
579                &trigger3,
580                "0 15 * * * *".to_string(),
581                Some("America/New_York".to_string()),
582                None,
583            )
584            .await
585            .unwrap();
586
587        assert_eq!(scheduler.active_job_count().await, 3);
588
589        // Remove each individually
590        scheduler.remove_schedule(&trigger1.id).await.unwrap();
591        assert_eq!(scheduler.active_job_count().await, 2);
592
593        scheduler.remove_schedule(&trigger2.id).await.unwrap();
594        assert_eq!(scheduler.active_job_count().await, 1);
595
596        scheduler.remove_schedule(&trigger3.id).await.unwrap();
597        assert_eq!(scheduler.active_job_count().await, 0);
598
599        scheduler.shutdown().await.unwrap();
600    }
601
602    #[tokio::test]
603    async fn test_schedule_with_payload() {
604        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
605
606        scheduler.start().await.unwrap();
607
608        let payload = serde_json::json!({
609            "key": "value",
610            "number": 42,
611            "nested": {
612                "field": "data"
613            }
614        });
615
616        let trigger = ActiveTrigger::new(
617            "test-workflow".to_string(),
618            TriggerConfig::Schedule {
619                cron: "0 0 * * * *".to_string(),
620                timezone: None,
621                payload: Some(payload.clone()),
622            },
623        );
624
625        // Add schedule with custom payload
626        scheduler
627            .add_schedule(&trigger, "0 0 * * * *".to_string(), None, Some(payload))
628            .await
629            .unwrap();
630
631        assert_eq!(scheduler.active_job_count().await, 1);
632
633        // Clean up
634        scheduler.remove_schedule(&trigger.id).await.unwrap();
635        scheduler.shutdown().await.unwrap();
636    }
637
638    #[tokio::test]
639    async fn test_shutdown_prevents_new_schedules() {
640        let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
641
642        scheduler.start().await.unwrap();
643
644        // Add a job before shutdown
645        let trigger1 = ActiveTrigger::new(
646            "test-workflow".to_string(),
647            TriggerConfig::Schedule {
648                cron: "0 0 * * * *".to_string(),
649                timezone: None,
650                payload: None,
651            },
652        );
653
654        scheduler
655            .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
656            .await
657            .unwrap();
658
659        assert_eq!(scheduler.active_job_count().await, 1);
660
661        // Shutdown the scheduler
662        scheduler.shutdown().await.unwrap();
663
664        // Try to add a new job after shutdown
665        let trigger2 = ActiveTrigger::new(
666            "test-workflow".to_string(),
667            TriggerConfig::Schedule {
668                cron: "0 30 * * * *".to_string(),
669                timezone: None,
670                payload: None,
671            },
672        );
673
674        let result = scheduler
675            .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
676            .await;
677
678        // Should fail because scheduler is shut down
679        assert!(result.is_err());
680        assert!(
681            result
682                .unwrap_err()
683                .to_string()
684                .contains("Scheduler has been shutdown")
685        );
686
687        // Job count should remain at 1 (only the first job)
688        assert_eq!(scheduler.active_job_count().await, 1);
689    }
690
691    #[tokio::test]
692    async fn test_schedule_execution_tracking() {
693        let (mut scheduler, storage, _temp_dir) = setup_test_scheduler().await;
694
695        scheduler.start().await.unwrap();
696
697        let trigger = ActiveTrigger::new(
698            "test-workflow".to_string(),
699            TriggerConfig::Schedule {
700                cron: "* * * * * *".to_string(), // Every second for testing
701                timezone: None,
702                payload: None,
703            },
704        );
705
706        // Store the trigger in storage
707        storage.triggers.activate_trigger(&trigger).unwrap();
708
709        // Add schedule
710        scheduler
711            .add_schedule(&trigger, "* * * * * *".to_string(), None, None)
712            .await
713            .unwrap();
714
715        // Get initial trigger count
716        let initial_trigger = storage
717            .triggers
718            .get_active_trigger(&trigger.id)
719            .unwrap()
720            .unwrap();
721        let initial_count = initial_trigger.trigger_count;
722
723        // Wait for at least 2 full seconds to ensure at least one execution
724        // (cron runs every second, so 2100ms guarantees at least one execution)
725        tokio::time::sleep(tokio::time::Duration::from_millis(2100)).await;
726
727        // Get updated trigger
728        let updated_trigger = storage
729            .triggers
730            .get_active_trigger(&trigger.id)
731            .unwrap()
732            .unwrap();
733
734        // Verify trigger count increased
735        assert!(
736            updated_trigger.trigger_count > initial_count,
737            "Trigger count should have increased: {} vs {}",
738            updated_trigger.trigger_count,
739            initial_count
740        );
741
742        // Verify last_triggered_at was updated
743        assert!(
744            updated_trigger.last_triggered_at.is_some(),
745            "Last triggered timestamp should be set"
746        );
747
748        // Clean up
749        scheduler.remove_schedule(&trigger.id).await.unwrap();
750        scheduler.shutdown().await.unwrap();
751    }
752}