restflow_core/storage/
config.rs

1use anyhow::Result;
2use redb::{Database, ReadableDatabase, TableDefinition};
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5
6const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("system_config");
7
8// KISS: Default configuration constants
9const DEFAULT_WORKER_COUNT: usize = 4;
10const DEFAULT_TASK_TIMEOUT_SECONDS: u64 = 300; // 5 minutes
11const DEFAULT_STALL_TIMEOUT_SECONDS: u64 = 300; // 5 minutes  
12const DEFAULT_MAX_RETRIES: u32 = 3;
13const MIN_WORKER_COUNT: usize = 1;
14const MIN_TIMEOUT_SECONDS: u64 = 10;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SystemConfig {
18    pub worker_count: usize,
19    pub task_timeout_seconds: u64,
20    pub stall_timeout_seconds: u64,
21    pub max_retries: u32,
22}
23
24impl Default for SystemConfig {
25    fn default() -> Self {
26        Self {
27            worker_count: DEFAULT_WORKER_COUNT,
28            task_timeout_seconds: DEFAULT_TASK_TIMEOUT_SECONDS,
29            stall_timeout_seconds: DEFAULT_STALL_TIMEOUT_SECONDS,
30            max_retries: DEFAULT_MAX_RETRIES,
31        }
32    }
33}
34
35impl SystemConfig {
36    /// KISS: Validate configuration values
37    pub fn validate(&self) -> Result<()> {
38        if self.worker_count < MIN_WORKER_COUNT {
39            return Err(anyhow::anyhow!(
40                "Worker count must be at least {}",
41                MIN_WORKER_COUNT
42            ));
43        }
44
45        if self.task_timeout_seconds < MIN_TIMEOUT_SECONDS {
46            return Err(anyhow::anyhow!(
47                "Task timeout must be at least {} seconds",
48                MIN_TIMEOUT_SECONDS
49            ));
50        }
51
52        if self.stall_timeout_seconds < MIN_TIMEOUT_SECONDS {
53            return Err(anyhow::anyhow!(
54                "Stall timeout must be at least {} seconds",
55                MIN_TIMEOUT_SECONDS
56            ));
57        }
58
59        if self.max_retries == 0 {
60            return Err(anyhow::anyhow!("Max retries must be at least 1"));
61        }
62
63        Ok(())
64    }
65}
66
67pub struct ConfigStorage {
68    db: Arc<Database>,
69}
70
71impl ConfigStorage {
72    pub fn new(db: Arc<Database>) -> Result<Self> {
73        // Create table
74        let write_txn = db.begin_write()?;
75        write_txn.open_table(CONFIG_TABLE)?;
76        write_txn.commit()?;
77
78        let storage = Self { db };
79
80        // Set default config if not exists
81        if storage.get_config()?.is_none() {
82            storage.update_config(SystemConfig::default())?;
83        }
84
85        Ok(storage)
86    }
87
88    /// Get system configuration
89    pub fn get_config(&self) -> Result<Option<SystemConfig>> {
90        let read_txn = self.db.begin_read()?;
91        let table = read_txn.open_table(CONFIG_TABLE)?;
92
93        if let Some(data) = table.get("system")? {
94            let config: SystemConfig = serde_json::from_slice(data.value())?;
95            Ok(Some(config))
96        } else {
97            Ok(None)
98        }
99    }
100
101    /// Update system configuration
102    pub fn update_config(&self, config: SystemConfig) -> Result<()> {
103        // Validate before saving
104        config.validate()?;
105
106        let write_txn = self.db.begin_write()?;
107        {
108            let mut table = write_txn.open_table(CONFIG_TABLE)?;
109            let serialized = serde_json::to_vec(&config)?;
110            table.insert("system", serialized.as_slice())?;
111        }
112        write_txn.commit()?;
113        Ok(())
114    }
115
116    /// Get a specific config value
117    pub fn get_worker_count(&self) -> Result<usize> {
118        Ok(self.get_config()?.unwrap_or_default().worker_count)
119    }
120
121    /// Update worker count
122    pub fn set_worker_count(&self, count: usize) -> Result<()> {
123        let mut config = self.get_config()?.unwrap_or_default();
124        config.worker_count = count.max(MIN_WORKER_COUNT);
125        self.update_config(config)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use tempfile::tempdir;
133
134    fn setup_test_storage() -> (ConfigStorage, tempfile::TempDir) {
135        let temp_dir = tempdir().unwrap();
136        let db_path = temp_dir.path().join("test.db");
137        let db = Arc::new(Database::create(db_path).unwrap());
138        let storage = ConfigStorage::new(db).unwrap();
139        (storage, temp_dir)
140    }
141
142    #[test]
143    fn test_default_config() {
144        let (storage, _temp_dir) = setup_test_storage();
145
146        // Should have default config after initialization
147        let config = storage.get_config().unwrap();
148        assert!(config.is_some());
149
150        let config = config.unwrap();
151        assert_eq!(config.worker_count, DEFAULT_WORKER_COUNT);
152        assert_eq!(config.task_timeout_seconds, DEFAULT_TASK_TIMEOUT_SECONDS);
153        assert_eq!(config.stall_timeout_seconds, DEFAULT_STALL_TIMEOUT_SECONDS);
154        assert_eq!(config.max_retries, DEFAULT_MAX_RETRIES);
155    }
156
157    #[test]
158    fn test_update_config() {
159        let (storage, _temp_dir) = setup_test_storage();
160
161        let new_config = SystemConfig {
162            worker_count: 8,
163            task_timeout_seconds: 600,
164            stall_timeout_seconds: 600,
165            max_retries: 5,
166        };
167
168        storage.update_config(new_config.clone()).unwrap();
169
170        let retrieved = storage.get_config().unwrap().unwrap();
171        assert_eq!(retrieved.worker_count, 8);
172        assert_eq!(retrieved.task_timeout_seconds, 600);
173        assert_eq!(retrieved.stall_timeout_seconds, 600);
174        assert_eq!(retrieved.max_retries, 5);
175    }
176
177    #[test]
178    fn test_config_validation() {
179        let valid_config = SystemConfig {
180            worker_count: 2,
181            task_timeout_seconds: 30,
182            stall_timeout_seconds: 30,
183            max_retries: 1,
184        };
185        assert!(valid_config.validate().is_ok());
186    }
187
188    #[test]
189    fn test_invalid_worker_count() {
190        let (storage, _temp_dir) = setup_test_storage();
191
192        let invalid_config = SystemConfig {
193            worker_count: 0, // Invalid: less than MIN_WORKER_COUNT
194            task_timeout_seconds: 300,
195            stall_timeout_seconds: 300,
196            max_retries: 3,
197        };
198
199        let result = storage.update_config(invalid_config);
200        assert!(result.is_err());
201        assert!(
202            result
203                .unwrap_err()
204                .to_string()
205                .contains("Worker count must be at least")
206        );
207    }
208
209    #[test]
210    fn test_invalid_task_timeout() {
211        let (storage, _temp_dir) = setup_test_storage();
212
213        let invalid_config = SystemConfig {
214            worker_count: 4,
215            task_timeout_seconds: 5, // Invalid: less than MIN_TIMEOUT_SECONDS
216            stall_timeout_seconds: 300,
217            max_retries: 3,
218        };
219
220        let result = storage.update_config(invalid_config);
221        assert!(result.is_err());
222        assert!(
223            result
224                .unwrap_err()
225                .to_string()
226                .contains("Task timeout must be at least")
227        );
228    }
229
230    #[test]
231    fn test_invalid_stall_timeout() {
232        let (storage, _temp_dir) = setup_test_storage();
233
234        let invalid_config = SystemConfig {
235            worker_count: 4,
236            task_timeout_seconds: 300,
237            stall_timeout_seconds: 5, // Invalid: less than MIN_TIMEOUT_SECONDS
238            max_retries: 3,
239        };
240
241        let result = storage.update_config(invalid_config);
242        assert!(result.is_err());
243        assert!(
244            result
245                .unwrap_err()
246                .to_string()
247                .contains("Stall timeout must be at least")
248        );
249    }
250
251    #[test]
252    fn test_invalid_retries() {
253        let (storage, _temp_dir) = setup_test_storage();
254
255        let invalid_config = SystemConfig {
256            worker_count: 4,
257            task_timeout_seconds: 300,
258            stall_timeout_seconds: 300,
259            max_retries: 0, // Invalid: must be at least 1
260        };
261
262        let result = storage.update_config(invalid_config);
263        assert!(result.is_err());
264        assert!(
265            result
266                .unwrap_err()
267                .to_string()
268                .contains("Max retries must be at least 1")
269        );
270    }
271
272    #[test]
273    fn test_get_worker_count() {
274        let (storage, _temp_dir) = setup_test_storage();
275
276        // Should get default worker count
277        let count = storage.get_worker_count().unwrap();
278        assert_eq!(count, DEFAULT_WORKER_COUNT);
279
280        // Update and verify
281        let new_config = SystemConfig {
282            worker_count: 10,
283            task_timeout_seconds: 300,
284            stall_timeout_seconds: 300,
285            max_retries: 3,
286        };
287        storage.update_config(new_config).unwrap();
288
289        let count = storage.get_worker_count().unwrap();
290        assert_eq!(count, 10);
291    }
292
293    #[test]
294    fn test_set_worker_count() {
295        let (storage, _temp_dir) = setup_test_storage();
296
297        // Set new worker count
298        storage.set_worker_count(6).unwrap();
299
300        let config = storage.get_config().unwrap().unwrap();
301        assert_eq!(config.worker_count, 6);
302
303        // Try to set invalid count (should be clamped to minimum)
304        storage.set_worker_count(0).unwrap();
305
306        let config = storage.get_config().unwrap().unwrap();
307        assert_eq!(config.worker_count, MIN_WORKER_COUNT);
308    }
309
310    #[test]
311    fn test_config_persistence() {
312        let temp_dir = tempdir().unwrap();
313        let db_path = temp_dir.path().join("test.db");
314
315        // Create and update config
316        {
317            let db = Arc::new(Database::create(&db_path).unwrap());
318            let storage = ConfigStorage::new(db).unwrap();
319
320            let new_config = SystemConfig {
321                worker_count: 12,
322                task_timeout_seconds: 900,
323                stall_timeout_seconds: 900,
324                max_retries: 10,
325            };
326            storage.update_config(new_config).unwrap();
327        }
328
329        // Open database again and verify config persisted
330        {
331            let db = Arc::new(Database::open(&db_path).unwrap());
332            let storage = ConfigStorage::new(db).unwrap();
333
334            let config = storage.get_config().unwrap().unwrap();
335            assert_eq!(config.worker_count, 12);
336            assert_eq!(config.task_timeout_seconds, 900);
337            assert_eq!(config.stall_timeout_seconds, 900);
338            assert_eq!(config.max_retries, 10);
339        }
340    }
341}