restflow_core/
lib.rs

1pub mod engine;
2pub mod models;
3pub mod node;
4pub mod paths;
5pub mod python;
6pub mod services;
7pub mod storage;
8pub mod tools;
9
10pub use models::*;
11
12use engine::cron_scheduler::CronScheduler;
13use engine::executor::WorkflowExecutor;
14use engine::trigger_manager::TriggerManager;
15use node::registry::NodeRegistry;
16use once_cell::sync::OnceCell;
17use std::sync::Arc;
18use storage::Storage;
19use tracing::{error, info};
20
21/// Core application state shared between server and Tauri modes
22pub struct AppCore {
23    pub storage: Arc<Storage>,
24    pub executor: Arc<WorkflowExecutor>,
25    pub trigger_manager: Arc<TriggerManager>,
26    pub cron_scheduler: Arc<CronScheduler>,
27    pub python_manager: OnceCell<Arc<python::PythonManager>>,
28    pub registry: Arc<NodeRegistry>,
29}
30
31impl AppCore {
32    pub async fn new(db_path: &str) -> anyhow::Result<Self> {
33        let storage = Arc::new(Storage::new(db_path)?);
34
35        let num_workers = storage.config.get_worker_count().unwrap_or(4);
36
37        info!(num_workers, "Initializing RestFlow");
38
39        let registry = Arc::new(NodeRegistry::new());
40
41        let executor = Arc::new(WorkflowExecutor::new(
42            storage.clone(),
43            num_workers,
44            registry.clone(),
45        ));
46        executor.start().await;
47
48        let cron_scheduler = Arc::new(
49            CronScheduler::new(storage.clone(), executor.clone())
50                .await
51                .map_err(|e| {
52                    error!(error = %e, "Failed to create CronScheduler");
53                    e
54                })?,
55        );
56
57        if let Err(e) = cron_scheduler.start().await {
58            error!(error = %e, "Failed to start CronScheduler");
59        } else {
60            info!("CronScheduler started successfully");
61        }
62
63        let trigger_manager = Arc::new(TriggerManager::new(
64            storage.clone(),
65            executor.clone(),
66            cron_scheduler.clone(),
67        ));
68
69        if let Err(e) = trigger_manager.init().await {
70            error!(error = %e, "Failed to initialize trigger manager");
71        }
72
73        Ok(Self {
74            storage,
75            executor,
76            trigger_manager,
77            cron_scheduler,
78            python_manager: OnceCell::new(),
79            registry,
80        })
81    }
82
83    pub async fn get_python_manager(&self) -> anyhow::Result<Arc<python::PythonManager>> {
84        if let Some(manager) = self.python_manager.get() {
85            return Ok(manager.clone());
86        }
87
88        let manager = python::PythonManager::new().await?;
89
90        let _ = self.python_manager.set(manager.clone());
91
92        self.executor.set_python_manager(manager.clone()).await;
93
94        Ok(self.python_manager.get().unwrap().clone())
95    }
96
97    pub fn is_python_ready(&self) -> bool {
98        self.python_manager
99            .get()
100            .map(|m| m.is_ready())
101            .unwrap_or(false)
102    }
103}