1use crate::engine::executor::WorkflowExecutor;
2use crate::models::ActiveTrigger;
3use crate::storage::Storage;
4use anyhow::{Result, anyhow};
5use chrono_tz::Tz;
6use serde_json::Value;
7use std::collections::HashMap;
8use std::str::FromStr;
9use std::sync::{
10 Arc,
11 atomic::{AtomicBool, Ordering},
12};
13use tokio::sync::RwLock;
14use tokio_cron_scheduler::{Job, JobScheduler};
15use tracing::{debug, error, info};
16use uuid::Uuid;
17
18pub struct CronScheduler {
26 scheduler: JobScheduler,
28 storage: Arc<Storage>,
30 executor: Arc<WorkflowExecutor>,
32 job_map: Arc<RwLock<HashMap<Uuid, String>>>,
34 is_shutdown: AtomicBool,
36}
37
38impl CronScheduler {
39 pub async fn new(storage: Arc<Storage>, executor: Arc<WorkflowExecutor>) -> Result<Self> {
41 let scheduler = JobScheduler::new()
42 .await
43 .map_err(|e| anyhow!("Failed to create JobScheduler: {}", e))?;
44
45 Ok(Self {
46 scheduler,
47 storage,
48 executor,
49 job_map: Arc::new(RwLock::new(HashMap::new())),
50 is_shutdown: AtomicBool::new(false),
51 })
52 }
53
54 pub async fn start(&self) -> Result<()> {
56 self.scheduler
57 .start()
58 .await
59 .map_err(|e| anyhow!("Failed to start scheduler: {}", e))?;
60
61 info!("CronScheduler started successfully");
62 Ok(())
63 }
64
65 pub async fn add_schedule(
73 &self,
74 trigger: &ActiveTrigger,
75 cron_expr: String,
76 timezone: Option<String>,
77 payload: Option<Value>,
78 ) -> Result<()> {
79 if self.is_shutdown.load(Ordering::SeqCst) {
80 return Err(anyhow!("Scheduler has been shutdown"));
81 }
82
83 let trigger_id = trigger.id.clone();
84 let workflow_id = trigger.workflow_id.clone();
85 let executor = self.executor.clone();
86 let storage = self.storage.clone();
87
88 let trigger_payload = payload.unwrap_or(Value::Object(Default::default()));
89
90 debug!(
91 trigger_id = %trigger_id,
92 workflow_id = %workflow_id,
93 cron = %cron_expr,
94 timezone = ?timezone,
95 "Adding cron schedule"
96 );
97
98 let job = if let Some(tz) = timezone {
99 let timezone: Tz =
100 Tz::from_str(&tz).map_err(|e| anyhow!("Invalid timezone {}: {}", tz, e))?;
101 Job::new_async_tz(cron_expr.as_str(), timezone, move |_uuid, _l| {
102 let workflow_id = workflow_id.clone();
103 let executor = executor.clone();
104 let storage = storage.clone();
105 let payload = trigger_payload.clone();
106 let trigger_id = trigger_id.clone();
107
108 Box::pin(async move {
109 info!(
110 trigger_id = %trigger_id,
111 workflow_id = %workflow_id,
112 "Cron job triggered"
113 );
114
115 match executor.submit(workflow_id.clone(), payload).await {
116 Ok(execution_id) => {
117 info!(
118 execution_id = %execution_id,
119 workflow_id = %workflow_id,
120 "Workflow execution submitted by cron trigger"
121 );
122
123 if let Err(e) = update_trigger_stats(&storage, &trigger_id) {
124 error!(error = ?e, "Failed to update trigger statistics");
125 }
126 }
127 Err(e) => {
128 error!(
129 error = ?e,
130 workflow_id = %workflow_id,
131 "Failed to submit workflow execution"
132 );
133 }
134 }
135 })
136 })
137 .map_err(|e| anyhow!("Failed to create cron job with timezone: {}", e))?
138 } else {
139 Job::new_async(cron_expr.as_str(), move |_uuid, _l| {
140 let workflow_id = workflow_id.clone();
141 let executor = executor.clone();
142 let storage = storage.clone();
143 let payload = trigger_payload.clone();
144 let trigger_id = trigger_id.clone();
145
146 Box::pin(async move {
147 info!(
148 trigger_id = %trigger_id,
149 workflow_id = %workflow_id,
150 "Cron job triggered (UTC)"
151 );
152
153 match executor.submit(workflow_id.clone(), payload).await {
154 Ok(execution_id) => {
155 info!(
156 execution_id = %execution_id,
157 workflow_id = %workflow_id,
158 "Workflow execution submitted by cron trigger"
159 );
160
161 if let Err(e) = update_trigger_stats(&storage, &trigger_id) {
162 error!(error = ?e, "Failed to update trigger statistics");
163 }
164 }
165 Err(e) => {
166 error!(
167 error = ?e,
168 workflow_id = %workflow_id,
169 "Failed to submit workflow execution"
170 );
171 }
172 }
173 })
174 })
175 .map_err(|e| anyhow!("Failed to create cron job: {}", e))?
176 };
177
178 let job_uuid = self
179 .scheduler
180 .add(job)
181 .await
182 .map_err(|e| anyhow!("Failed to add job to scheduler: {}", e))?;
183
184 self.job_map
185 .write()
186 .await
187 .insert(job_uuid, trigger.id.clone());
188
189 info!(
190 trigger_id = %trigger.id,
191 job_uuid = %job_uuid,
192 cron = %cron_expr,
193 "Cron schedule added successfully"
194 );
195
196 Ok(())
197 }
198
199 pub async fn remove_schedule(&self, trigger_id: &str) -> Result<bool> {
204 let job_uuid = {
205 let map = self.job_map.read().await;
206 map.iter()
207 .find_map(|(uuid, id)| if id == trigger_id { Some(*uuid) } else { None })
208 };
209
210 if let Some(uuid) = job_uuid {
211 self.scheduler
212 .remove(&uuid)
213 .await
214 .map_err(|e| anyhow!("Failed to remove job from scheduler: {}", e))?;
215
216 self.job_map.write().await.remove(&uuid);
217
218 info!(trigger_id = %trigger_id, job_uuid = %uuid, "Cron schedule removed");
219 Ok(true)
220 } else {
221 debug!(
222 trigger_id = %trigger_id,
223 "No cron job found for trigger (might already be removed)"
224 );
225 Ok(false)
226 }
227 }
228
229 pub async fn shutdown(&mut self) -> Result<()> {
231 self.is_shutdown.store(true, Ordering::SeqCst);
232
233 self.scheduler
234 .shutdown()
235 .await
236 .map_err(|e| anyhow!("Failed to shutdown scheduler: {}", e))?;
237
238 info!("CronScheduler shutdown successfully");
239 Ok(())
240 }
241
242 pub async fn active_job_count(&self) -> usize {
244 self.job_map.read().await.len()
245 }
246}
247
248fn update_trigger_stats(storage: &Storage, trigger_id: &str) -> Result<()> {
250 if let Some(mut trigger) = storage.triggers.get_active_trigger(trigger_id)? {
251 trigger.record_trigger();
252 storage.triggers.update_trigger(&trigger)?;
253 }
254 Ok(())
255}
256
257#[cfg(test)]
258mod tests {
259 use super::*;
260 use crate::models::{Node, NodeType, TriggerConfig, Workflow};
261 use crate::node::registry::NodeRegistry;
262 use tempfile::tempdir;
263
264 async fn setup_test_scheduler() -> (CronScheduler, Arc<Storage>, tempfile::TempDir) {
265 let temp_dir = tempdir().unwrap();
266 let db_path = temp_dir.path().join("test.db");
267 let storage = Arc::new(Storage::new(db_path.to_str().unwrap()).unwrap());
268 let registry = Arc::new(NodeRegistry::new());
269
270 let workflow = Workflow {
272 id: "test-workflow".to_string(),
273 name: "Test Workflow".to_string(),
274 nodes: vec![Node {
275 id: "test-node".to_string(),
276 node_type: NodeType::Print,
277 config: serde_json::json!({
278 "type": "Print",
279 "data": {
280 "message": "Test message"
281 }
282 }),
283 position: None,
284 }],
285 edges: vec![],
286 };
287 storage.workflows.create_workflow(&workflow).unwrap();
288
289 let executor = Arc::new(WorkflowExecutor::new(storage.clone(), 4, registry));
290
291 let scheduler = CronScheduler::new(storage.clone(), executor).await.unwrap();
292
293 (scheduler, storage, temp_dir)
294 }
295
296 #[tokio::test]
297 async fn test_add_and_remove_schedule() {
298 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
299
300 scheduler.start().await.unwrap();
301
302 let trigger = ActiveTrigger::new(
303 "test-workflow".to_string(),
304 TriggerConfig::Schedule {
305 cron: "0 0 * * * *".to_string(), timezone: None,
307 payload: None,
308 },
309 );
310
311 scheduler
313 .add_schedule(&trigger, "0 0 * * * *".to_string(), None, None)
314 .await
315 .unwrap();
316
317 assert_eq!(scheduler.active_job_count().await, 1);
318
319 let removed = scheduler.remove_schedule(&trigger.id).await.unwrap();
321 assert!(removed);
322
323 assert_eq!(scheduler.active_job_count().await, 0);
324
325 scheduler.shutdown().await.unwrap();
326 }
327
328 #[tokio::test]
329 async fn test_start_and_shutdown() {
330 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
331
332 scheduler.start().await.unwrap();
334
335 let trigger = ActiveTrigger::new(
337 "test-workflow".to_string(),
338 TriggerConfig::Schedule {
339 cron: "0 0 * * * *".to_string(),
340 timezone: None,
341 payload: None,
342 },
343 );
344
345 scheduler
346 .add_schedule(&trigger, "0 0 * * * *".to_string(), None, None)
347 .await
348 .unwrap();
349
350 assert_eq!(scheduler.active_job_count().await, 1);
351
352 scheduler.shutdown().await.unwrap();
354
355 assert_eq!(scheduler.active_job_count().await, 1);
357 }
358
359 #[tokio::test]
360 async fn test_invalid_cron_expression() {
361 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
362
363 scheduler.start().await.unwrap();
364
365 let trigger = ActiveTrigger::new(
366 "test-workflow".to_string(),
367 TriggerConfig::Schedule {
368 cron: "invalid cron".to_string(),
369 timezone: None,
370 payload: None,
371 },
372 );
373
374 let result = scheduler
376 .add_schedule(&trigger, "invalid cron".to_string(), None, None)
377 .await;
378
379 assert!(result.is_err());
380 assert!(
381 result
382 .unwrap_err()
383 .to_string()
384 .contains("Failed to create cron job")
385 );
386
387 assert_eq!(scheduler.active_job_count().await, 0);
389
390 scheduler.shutdown().await.unwrap();
391 }
392
393 #[tokio::test]
394 async fn test_invalid_timezone() {
395 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
396
397 scheduler.start().await.unwrap();
398
399 let trigger = ActiveTrigger::new(
400 "test-workflow".to_string(),
401 TriggerConfig::Schedule {
402 cron: "0 0 * * * *".to_string(),
403 timezone: Some("Invalid/Timezone".to_string()),
404 payload: None,
405 },
406 );
407
408 let result = scheduler
410 .add_schedule(
411 &trigger,
412 "0 0 * * * *".to_string(),
413 Some("Invalid/Timezone".to_string()),
414 None,
415 )
416 .await;
417
418 assert!(result.is_err());
419 assert!(result.unwrap_err().to_string().contains("Invalid timezone"));
420
421 assert_eq!(scheduler.active_job_count().await, 0);
423
424 scheduler.shutdown().await.unwrap();
425 }
426
427 #[tokio::test]
428 async fn test_schedule_with_timezone() {
429 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
430
431 scheduler.start().await.unwrap();
432
433 let trigger = ActiveTrigger::new(
434 "test-workflow".to_string(),
435 TriggerConfig::Schedule {
436 cron: "0 0 * * * *".to_string(),
437 timezone: Some("Asia/Shanghai".to_string()),
438 payload: None,
439 },
440 );
441
442 scheduler
444 .add_schedule(
445 &trigger,
446 "0 0 * * * *".to_string(),
447 Some("Asia/Shanghai".to_string()),
448 None,
449 )
450 .await
451 .unwrap();
452
453 assert_eq!(scheduler.active_job_count().await, 1);
454
455 scheduler.remove_schedule(&trigger.id).await.unwrap();
457 scheduler.shutdown().await.unwrap();
458 }
459
460 #[tokio::test]
461 async fn test_remove_nonexistent_schedule() {
462 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
463
464 scheduler.start().await.unwrap();
465
466 let removed = scheduler
468 .remove_schedule("nonexistent-trigger-id")
469 .await
470 .unwrap();
471
472 assert!(!removed);
474
475 scheduler.shutdown().await.unwrap();
476 }
477
478 #[tokio::test]
479 async fn test_active_job_count() {
480 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
481
482 scheduler.start().await.unwrap();
483
484 assert_eq!(scheduler.active_job_count().await, 0);
486
487 let trigger1 = ActiveTrigger::new(
489 "test-workflow".to_string(),
490 TriggerConfig::Schedule {
491 cron: "0 0 * * * *".to_string(),
492 timezone: None,
493 payload: None,
494 },
495 );
496
497 scheduler
498 .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
499 .await
500 .unwrap();
501
502 assert_eq!(scheduler.active_job_count().await, 1);
503
504 let trigger2 = ActiveTrigger::new(
506 "test-workflow".to_string(),
507 TriggerConfig::Schedule {
508 cron: "0 30 * * * *".to_string(),
509 timezone: None,
510 payload: None,
511 },
512 );
513
514 scheduler
515 .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
516 .await
517 .unwrap();
518
519 assert_eq!(scheduler.active_job_count().await, 2);
520
521 scheduler.remove_schedule(&trigger1.id).await.unwrap();
523 assert_eq!(scheduler.active_job_count().await, 1);
524
525 scheduler.remove_schedule(&trigger2.id).await.unwrap();
527 assert_eq!(scheduler.active_job_count().await, 0);
528
529 scheduler.shutdown().await.unwrap();
530 }
531
532 #[tokio::test]
533 async fn test_multiple_schedules_same_workflow() {
534 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
535
536 scheduler.start().await.unwrap();
537
538 let trigger1 = ActiveTrigger::new(
540 "test-workflow".to_string(),
541 TriggerConfig::Schedule {
542 cron: "0 0 * * * *".to_string(), timezone: None,
544 payload: None,
545 },
546 );
547
548 let trigger2 = ActiveTrigger::new(
549 "test-workflow".to_string(),
550 TriggerConfig::Schedule {
551 cron: "0 30 * * * *".to_string(), timezone: None,
553 payload: None,
554 },
555 );
556
557 let trigger3 = ActiveTrigger::new(
558 "test-workflow".to_string(),
559 TriggerConfig::Schedule {
560 cron: "0 15 * * * *".to_string(), timezone: Some("America/New_York".to_string()),
562 payload: None,
563 },
564 );
565
566 scheduler
568 .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
569 .await
570 .unwrap();
571
572 scheduler
573 .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
574 .await
575 .unwrap();
576
577 scheduler
578 .add_schedule(
579 &trigger3,
580 "0 15 * * * *".to_string(),
581 Some("America/New_York".to_string()),
582 None,
583 )
584 .await
585 .unwrap();
586
587 assert_eq!(scheduler.active_job_count().await, 3);
588
589 scheduler.remove_schedule(&trigger1.id).await.unwrap();
591 assert_eq!(scheduler.active_job_count().await, 2);
592
593 scheduler.remove_schedule(&trigger2.id).await.unwrap();
594 assert_eq!(scheduler.active_job_count().await, 1);
595
596 scheduler.remove_schedule(&trigger3.id).await.unwrap();
597 assert_eq!(scheduler.active_job_count().await, 0);
598
599 scheduler.shutdown().await.unwrap();
600 }
601
602 #[tokio::test]
603 async fn test_schedule_with_payload() {
604 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
605
606 scheduler.start().await.unwrap();
607
608 let payload = serde_json::json!({
609 "key": "value",
610 "number": 42,
611 "nested": {
612 "field": "data"
613 }
614 });
615
616 let trigger = ActiveTrigger::new(
617 "test-workflow".to_string(),
618 TriggerConfig::Schedule {
619 cron: "0 0 * * * *".to_string(),
620 timezone: None,
621 payload: Some(payload.clone()),
622 },
623 );
624
625 scheduler
627 .add_schedule(&trigger, "0 0 * * * *".to_string(), None, Some(payload))
628 .await
629 .unwrap();
630
631 assert_eq!(scheduler.active_job_count().await, 1);
632
633 scheduler.remove_schedule(&trigger.id).await.unwrap();
635 scheduler.shutdown().await.unwrap();
636 }
637
638 #[tokio::test]
639 async fn test_shutdown_prevents_new_schedules() {
640 let (mut scheduler, _storage, _temp_dir) = setup_test_scheduler().await;
641
642 scheduler.start().await.unwrap();
643
644 let trigger1 = ActiveTrigger::new(
646 "test-workflow".to_string(),
647 TriggerConfig::Schedule {
648 cron: "0 0 * * * *".to_string(),
649 timezone: None,
650 payload: None,
651 },
652 );
653
654 scheduler
655 .add_schedule(&trigger1, "0 0 * * * *".to_string(), None, None)
656 .await
657 .unwrap();
658
659 assert_eq!(scheduler.active_job_count().await, 1);
660
661 scheduler.shutdown().await.unwrap();
663
664 let trigger2 = ActiveTrigger::new(
666 "test-workflow".to_string(),
667 TriggerConfig::Schedule {
668 cron: "0 30 * * * *".to_string(),
669 timezone: None,
670 payload: None,
671 },
672 );
673
674 let result = scheduler
675 .add_schedule(&trigger2, "0 30 * * * *".to_string(), None, None)
676 .await;
677
678 assert!(result.is_err());
680 assert!(
681 result
682 .unwrap_err()
683 .to_string()
684 .contains("Scheduler has been shutdown")
685 );
686
687 assert_eq!(scheduler.active_job_count().await, 1);
689 }
690
691 #[tokio::test]
692 async fn test_schedule_execution_tracking() {
693 let (mut scheduler, storage, _temp_dir) = setup_test_scheduler().await;
694
695 scheduler.start().await.unwrap();
696
697 let trigger = ActiveTrigger::new(
698 "test-workflow".to_string(),
699 TriggerConfig::Schedule {
700 cron: "* * * * * *".to_string(), timezone: None,
702 payload: None,
703 },
704 );
705
706 storage.triggers.activate_trigger(&trigger).unwrap();
708
709 scheduler
711 .add_schedule(&trigger, "* * * * * *".to_string(), None, None)
712 .await
713 .unwrap();
714
715 let initial_trigger = storage
717 .triggers
718 .get_active_trigger(&trigger.id)
719 .unwrap()
720 .unwrap();
721 let initial_count = initial_trigger.trigger_count;
722
723 tokio::time::sleep(tokio::time::Duration::from_millis(2100)).await;
726
727 let updated_trigger = storage
729 .triggers
730 .get_active_trigger(&trigger.id)
731 .unwrap()
732 .unwrap();
733
734 assert!(
736 updated_trigger.trigger_count > initial_count,
737 "Trigger count should have increased: {} vs {}",
738 updated_trigger.trigger_count,
739 initial_count
740 );
741
742 assert!(
744 updated_trigger.last_triggered_at.is_some(),
745 "Last triggered timestamp should be set"
746 );
747
748 scheduler.remove_schedule(&trigger.id).await.unwrap();
750 scheduler.shutdown().await.unwrap();
751 }
752}