1pub mod engine;
2pub mod models;
3pub mod node;
4pub mod paths;
5pub mod python;
6pub mod services;
7pub mod storage;
8pub mod tools;
9
10pub use models::*;
11
12use engine::cron_scheduler::CronScheduler;
13use engine::executor::WorkflowExecutor;
14use engine::trigger_manager::TriggerManager;
15use node::registry::NodeRegistry;
16use once_cell::sync::OnceCell;
17use std::sync::Arc;
18use storage::Storage;
19use tracing::{error, info};
20
21pub struct AppCore {
23 pub storage: Arc<Storage>,
24 pub executor: Arc<WorkflowExecutor>,
25 pub trigger_manager: Arc<TriggerManager>,
26 pub cron_scheduler: Arc<CronScheduler>,
27 pub python_manager: OnceCell<Arc<python::PythonManager>>,
28 pub registry: Arc<NodeRegistry>,
29}
30
31impl AppCore {
32 pub async fn new(db_path: &str) -> anyhow::Result<Self> {
33 let storage = Arc::new(Storage::new(db_path)?);
34
35 let num_workers = storage.config.get_worker_count().unwrap_or(4);
36
37 info!(num_workers, "Initializing RestFlow");
38
39 let registry = Arc::new(NodeRegistry::new());
40
41 let executor = Arc::new(WorkflowExecutor::new(
42 storage.clone(),
43 num_workers,
44 registry.clone(),
45 ));
46 executor.start().await;
47
48 let cron_scheduler = Arc::new(
49 CronScheduler::new(storage.clone(), executor.clone())
50 .await
51 .map_err(|e| {
52 error!(error = %e, "Failed to create CronScheduler");
53 e
54 })?,
55 );
56
57 if let Err(e) = cron_scheduler.start().await {
58 error!(error = %e, "Failed to start CronScheduler");
59 } else {
60 info!("CronScheduler started successfully");
61 }
62
63 let trigger_manager = Arc::new(TriggerManager::new(
64 storage.clone(),
65 executor.clone(),
66 cron_scheduler.clone(),
67 ));
68
69 if let Err(e) = trigger_manager.init().await {
70 error!(error = %e, "Failed to initialize trigger manager");
71 }
72
73 Ok(Self {
74 storage,
75 executor,
76 trigger_manager,
77 cron_scheduler,
78 python_manager: OnceCell::new(),
79 registry,
80 })
81 }
82
83 pub async fn get_python_manager(&self) -> anyhow::Result<Arc<python::PythonManager>> {
84 if let Some(manager) = self.python_manager.get() {
85 return Ok(manager.clone());
86 }
87
88 let manager = python::PythonManager::new().await?;
89
90 let _ = self.python_manager.set(manager.clone());
91
92 self.executor.set_python_manager(manager.clone()).await;
93
94 Ok(self.python_manager.get().unwrap().clone())
95 }
96
97 pub fn is_python_ready(&self) -> bool {
98 self.python_manager
99 .get()
100 .map(|m| m.is_ready())
101 .unwrap_or(false)
102 }
103}