restflow_core/storage/
config.rs1use anyhow::Result;
2use redb::{Database, ReadableDatabase, TableDefinition};
3use serde::{Deserialize, Serialize};
4use std::sync::Arc;
5
6const CONFIG_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("system_config");
7
8const DEFAULT_WORKER_COUNT: usize = 4;
10const DEFAULT_TASK_TIMEOUT_SECONDS: u64 = 300; const DEFAULT_STALL_TIMEOUT_SECONDS: u64 = 300; const DEFAULT_MAX_RETRIES: u32 = 3;
13const MIN_WORKER_COUNT: usize = 1;
14const MIN_TIMEOUT_SECONDS: u64 = 10;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct SystemConfig {
18 pub worker_count: usize,
19 pub task_timeout_seconds: u64,
20 pub stall_timeout_seconds: u64,
21 pub max_retries: u32,
22}
23
24impl Default for SystemConfig {
25 fn default() -> Self {
26 Self {
27 worker_count: DEFAULT_WORKER_COUNT,
28 task_timeout_seconds: DEFAULT_TASK_TIMEOUT_SECONDS,
29 stall_timeout_seconds: DEFAULT_STALL_TIMEOUT_SECONDS,
30 max_retries: DEFAULT_MAX_RETRIES,
31 }
32 }
33}
34
35impl SystemConfig {
36 pub fn validate(&self) -> Result<()> {
38 if self.worker_count < MIN_WORKER_COUNT {
39 return Err(anyhow::anyhow!(
40 "Worker count must be at least {}",
41 MIN_WORKER_COUNT
42 ));
43 }
44
45 if self.task_timeout_seconds < MIN_TIMEOUT_SECONDS {
46 return Err(anyhow::anyhow!(
47 "Task timeout must be at least {} seconds",
48 MIN_TIMEOUT_SECONDS
49 ));
50 }
51
52 if self.stall_timeout_seconds < MIN_TIMEOUT_SECONDS {
53 return Err(anyhow::anyhow!(
54 "Stall timeout must be at least {} seconds",
55 MIN_TIMEOUT_SECONDS
56 ));
57 }
58
59 if self.max_retries == 0 {
60 return Err(anyhow::anyhow!("Max retries must be at least 1"));
61 }
62
63 Ok(())
64 }
65}
66
67pub struct ConfigStorage {
68 db: Arc<Database>,
69}
70
71impl ConfigStorage {
72 pub fn new(db: Arc<Database>) -> Result<Self> {
73 let write_txn = db.begin_write()?;
75 write_txn.open_table(CONFIG_TABLE)?;
76 write_txn.commit()?;
77
78 let storage = Self { db };
79
80 if storage.get_config()?.is_none() {
82 storage.update_config(SystemConfig::default())?;
83 }
84
85 Ok(storage)
86 }
87
88 pub fn get_config(&self) -> Result<Option<SystemConfig>> {
90 let read_txn = self.db.begin_read()?;
91 let table = read_txn.open_table(CONFIG_TABLE)?;
92
93 if let Some(data) = table.get("system")? {
94 let config: SystemConfig = serde_json::from_slice(data.value())?;
95 Ok(Some(config))
96 } else {
97 Ok(None)
98 }
99 }
100
101 pub fn update_config(&self, config: SystemConfig) -> Result<()> {
103 config.validate()?;
105
106 let write_txn = self.db.begin_write()?;
107 {
108 let mut table = write_txn.open_table(CONFIG_TABLE)?;
109 let serialized = serde_json::to_vec(&config)?;
110 table.insert("system", serialized.as_slice())?;
111 }
112 write_txn.commit()?;
113 Ok(())
114 }
115
116 pub fn get_worker_count(&self) -> Result<usize> {
118 Ok(self.get_config()?.unwrap_or_default().worker_count)
119 }
120
121 pub fn set_worker_count(&self, count: usize) -> Result<()> {
123 let mut config = self.get_config()?.unwrap_or_default();
124 config.worker_count = count.max(MIN_WORKER_COUNT);
125 self.update_config(config)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use tempfile::tempdir;
133
134 fn setup_test_storage() -> (ConfigStorage, tempfile::TempDir) {
135 let temp_dir = tempdir().unwrap();
136 let db_path = temp_dir.path().join("test.db");
137 let db = Arc::new(Database::create(db_path).unwrap());
138 let storage = ConfigStorage::new(db).unwrap();
139 (storage, temp_dir)
140 }
141
142 #[test]
143 fn test_default_config() {
144 let (storage, _temp_dir) = setup_test_storage();
145
146 let config = storage.get_config().unwrap();
148 assert!(config.is_some());
149
150 let config = config.unwrap();
151 assert_eq!(config.worker_count, DEFAULT_WORKER_COUNT);
152 assert_eq!(config.task_timeout_seconds, DEFAULT_TASK_TIMEOUT_SECONDS);
153 assert_eq!(config.stall_timeout_seconds, DEFAULT_STALL_TIMEOUT_SECONDS);
154 assert_eq!(config.max_retries, DEFAULT_MAX_RETRIES);
155 }
156
157 #[test]
158 fn test_update_config() {
159 let (storage, _temp_dir) = setup_test_storage();
160
161 let new_config = SystemConfig {
162 worker_count: 8,
163 task_timeout_seconds: 600,
164 stall_timeout_seconds: 600,
165 max_retries: 5,
166 };
167
168 storage.update_config(new_config.clone()).unwrap();
169
170 let retrieved = storage.get_config().unwrap().unwrap();
171 assert_eq!(retrieved.worker_count, 8);
172 assert_eq!(retrieved.task_timeout_seconds, 600);
173 assert_eq!(retrieved.stall_timeout_seconds, 600);
174 assert_eq!(retrieved.max_retries, 5);
175 }
176
177 #[test]
178 fn test_config_validation() {
179 let valid_config = SystemConfig {
180 worker_count: 2,
181 task_timeout_seconds: 30,
182 stall_timeout_seconds: 30,
183 max_retries: 1,
184 };
185 assert!(valid_config.validate().is_ok());
186 }
187
188 #[test]
189 fn test_invalid_worker_count() {
190 let (storage, _temp_dir) = setup_test_storage();
191
192 let invalid_config = SystemConfig {
193 worker_count: 0, task_timeout_seconds: 300,
195 stall_timeout_seconds: 300,
196 max_retries: 3,
197 };
198
199 let result = storage.update_config(invalid_config);
200 assert!(result.is_err());
201 assert!(
202 result
203 .unwrap_err()
204 .to_string()
205 .contains("Worker count must be at least")
206 );
207 }
208
209 #[test]
210 fn test_invalid_task_timeout() {
211 let (storage, _temp_dir) = setup_test_storage();
212
213 let invalid_config = SystemConfig {
214 worker_count: 4,
215 task_timeout_seconds: 5, stall_timeout_seconds: 300,
217 max_retries: 3,
218 };
219
220 let result = storage.update_config(invalid_config);
221 assert!(result.is_err());
222 assert!(
223 result
224 .unwrap_err()
225 .to_string()
226 .contains("Task timeout must be at least")
227 );
228 }
229
230 #[test]
231 fn test_invalid_stall_timeout() {
232 let (storage, _temp_dir) = setup_test_storage();
233
234 let invalid_config = SystemConfig {
235 worker_count: 4,
236 task_timeout_seconds: 300,
237 stall_timeout_seconds: 5, max_retries: 3,
239 };
240
241 let result = storage.update_config(invalid_config);
242 assert!(result.is_err());
243 assert!(
244 result
245 .unwrap_err()
246 .to_string()
247 .contains("Stall timeout must be at least")
248 );
249 }
250
251 #[test]
252 fn test_invalid_retries() {
253 let (storage, _temp_dir) = setup_test_storage();
254
255 let invalid_config = SystemConfig {
256 worker_count: 4,
257 task_timeout_seconds: 300,
258 stall_timeout_seconds: 300,
259 max_retries: 0, };
261
262 let result = storage.update_config(invalid_config);
263 assert!(result.is_err());
264 assert!(
265 result
266 .unwrap_err()
267 .to_string()
268 .contains("Max retries must be at least 1")
269 );
270 }
271
272 #[test]
273 fn test_get_worker_count() {
274 let (storage, _temp_dir) = setup_test_storage();
275
276 let count = storage.get_worker_count().unwrap();
278 assert_eq!(count, DEFAULT_WORKER_COUNT);
279
280 let new_config = SystemConfig {
282 worker_count: 10,
283 task_timeout_seconds: 300,
284 stall_timeout_seconds: 300,
285 max_retries: 3,
286 };
287 storage.update_config(new_config).unwrap();
288
289 let count = storage.get_worker_count().unwrap();
290 assert_eq!(count, 10);
291 }
292
293 #[test]
294 fn test_set_worker_count() {
295 let (storage, _temp_dir) = setup_test_storage();
296
297 storage.set_worker_count(6).unwrap();
299
300 let config = storage.get_config().unwrap().unwrap();
301 assert_eq!(config.worker_count, 6);
302
303 storage.set_worker_count(0).unwrap();
305
306 let config = storage.get_config().unwrap().unwrap();
307 assert_eq!(config.worker_count, MIN_WORKER_COUNT);
308 }
309
310 #[test]
311 fn test_config_persistence() {
312 let temp_dir = tempdir().unwrap();
313 let db_path = temp_dir.path().join("test.db");
314
315 {
317 let db = Arc::new(Database::create(&db_path).unwrap());
318 let storage = ConfigStorage::new(db).unwrap();
319
320 let new_config = SystemConfig {
321 worker_count: 12,
322 task_timeout_seconds: 900,
323 stall_timeout_seconds: 900,
324 max_retries: 10,
325 };
326 storage.update_config(new_config).unwrap();
327 }
328
329 {
331 let db = Arc::new(Database::open(&db_path).unwrap());
332 let storage = ConfigStorage::new(db).unwrap();
333
334 let config = storage.get_config().unwrap().unwrap();
335 assert_eq!(config.worker_count, 12);
336 assert_eq!(config.task_timeout_seconds, 900);
337 assert_eq!(config.stall_timeout_seconds, 900);
338 assert_eq!(config.max_retries, 10);
339 }
340 }
341}