restflow_core/storage/
trigger.rs

1use crate::models::ActiveTrigger;
2use anyhow::Result;
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use std::sync::Arc;
5
6// Store active triggers
7pub const ACTIVE_TRIGGERS_TABLE: TableDefinition<&str, &[u8]> =
8    TableDefinition::new("active_triggers");
9
10pub struct TriggerStorage {
11    db: Arc<Database>,
12}
13
14impl TriggerStorage {
15    pub fn new(db: Arc<Database>) -> Result<Self> {
16        // Create table if not exists
17        let write_txn = db.begin_write()?;
18        write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
19        write_txn.commit()?;
20
21        Ok(Self { db })
22    }
23
24    // Activate trigger
25    pub fn activate_trigger(&self, trigger: &ActiveTrigger) -> Result<()> {
26        let write_txn = self.db.begin_write()?;
27        {
28            let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
29            let json_bytes = serde_json::to_vec(trigger)?;
30            table.insert(trigger.id.as_str(), json_bytes.as_slice())?;
31        }
32        write_txn.commit()?;
33        Ok(())
34    }
35
36    // Deactivate trigger
37    pub fn deactivate_trigger(&self, trigger_id: &str) -> Result<()> {
38        let write_txn = self.db.begin_write()?;
39        {
40            let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
41            table.remove(trigger_id)?;
42        }
43        write_txn.commit()?;
44        Ok(())
45    }
46
47    // Find active trigger by workflow_id
48    pub fn get_active_trigger_by_workflow(
49        &self,
50        workflow_id: &str,
51    ) -> Result<Option<ActiveTrigger>> {
52        let read_txn = self.db.begin_read()?;
53        let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
54
55        for item in table.iter()? {
56            let (_, value) = item?;
57            let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
58            if trigger.workflow_id == workflow_id {
59                return Ok(Some(trigger));
60            }
61        }
62
63        Ok(None)
64    }
65
66    // Find active trigger by trigger_id
67    pub fn get_active_trigger(&self, trigger_id: &str) -> Result<Option<ActiveTrigger>> {
68        let read_txn = self.db.begin_read()?;
69        let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
70
71        if let Some(value) = table.get(trigger_id)? {
72            let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
73            Ok(Some(trigger))
74        } else {
75            Ok(None)
76        }
77    }
78
79    // Find workflow_id by webhook_id (trigger_id)
80    pub fn get_workflow_by_webhook(&self, webhook_id: &str) -> Result<Option<String>> {
81        if let Some(trigger) = self.get_active_trigger(webhook_id)? {
82            Ok(Some(trigger.workflow_id))
83        } else {
84            Ok(None)
85        }
86    }
87
88    // Update trigger (record trigger count, etc.)
89    pub fn update_trigger(&self, trigger: &ActiveTrigger) -> Result<()> {
90        let write_txn = self.db.begin_write()?;
91        {
92            let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
93            let json_bytes = serde_json::to_vec(trigger)?;
94            table.insert(trigger.id.as_str(), json_bytes.as_slice())?;
95        }
96        write_txn.commit()?;
97        Ok(())
98    }
99
100    // List all active triggers
101    pub fn list_active_triggers(&self) -> Result<Vec<ActiveTrigger>> {
102        let read_txn = self.db.begin_read()?;
103        let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
104
105        let mut triggers = Vec::new();
106        for item in table.iter()? {
107            let (_, value) = item?;
108            let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
109            triggers.push(trigger);
110        }
111
112        Ok(triggers)
113    }
114
115    // Get all Schedule type triggers (for scheduler)
116    pub fn list_schedule_triggers(&self) -> Result<Vec<ActiveTrigger>> {
117        let triggers = self.list_active_triggers()?;
118        Ok(triggers
119            .into_iter()
120            .filter(|t| {
121                matches!(
122                    t.trigger_config,
123                    crate::models::TriggerConfig::Schedule { .. }
124                )
125            })
126            .collect())
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use crate::models::{ActiveTrigger, AuthConfig, TriggerConfig};
134    use tempfile::tempdir;
135
136    fn create_test_webhook_trigger(id: &str, workflow_id: &str) -> ActiveTrigger {
137        ActiveTrigger {
138            id: id.to_string(),
139            workflow_id: workflow_id.to_string(),
140            trigger_config: TriggerConfig::Webhook {
141                path: format!("/api/webhook/{}", id),
142                method: "POST".to_string(),
143                auth: Some(AuthConfig::ApiKey {
144                    key: "test-key".to_string(),
145                    header_name: Some("X-API-Key".to_string()),
146                }),
147            },
148            trigger_count: 0,
149            activated_at: chrono::Utc::now().timestamp(),
150            last_triggered_at: None,
151        }
152    }
153
154    fn create_test_schedule_trigger(id: &str, workflow_id: &str) -> ActiveTrigger {
155        ActiveTrigger {
156            id: id.to_string(),
157            workflow_id: workflow_id.to_string(),
158            trigger_config: TriggerConfig::Schedule {
159                cron: "0 */5 * * * *".to_string(),
160                timezone: Some("UTC".to_string()),
161                payload: Some(serde_json::json!({"scheduled": true})),
162            },
163            trigger_count: 0,
164            activated_at: chrono::Utc::now().timestamp(),
165            last_triggered_at: None,
166        }
167    }
168
169    fn setup_test_storage() -> (TriggerStorage, tempfile::TempDir) {
170        let temp_dir = tempdir().unwrap();
171        let db_path = temp_dir.path().join("test.db");
172        let db = Arc::new(Database::create(db_path).unwrap());
173        let storage = TriggerStorage::new(db).unwrap();
174        (storage, temp_dir)
175    }
176
177    #[test]
178    fn test_activate_trigger() {
179        let (storage, _temp_dir) = setup_test_storage();
180
181        let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
182        storage.activate_trigger(&trigger).unwrap();
183
184        // Verify it was activated
185        let retrieved = storage.get_active_trigger("trigger-001").unwrap();
186        assert!(retrieved.is_some());
187
188        let retrieved = retrieved.unwrap();
189        assert_eq!(retrieved.id, "trigger-001");
190        assert_eq!(retrieved.workflow_id, "workflow-001");
191    }
192
193    #[test]
194    fn test_deactivate_trigger() {
195        let (storage, _temp_dir) = setup_test_storage();
196
197        // First activate
198        let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
199        storage.activate_trigger(&trigger).unwrap();
200
201        // Then deactivate
202        storage.deactivate_trigger("trigger-001").unwrap();
203
204        // Verify it was deactivated
205        let retrieved = storage.get_active_trigger("trigger-001").unwrap();
206        assert!(retrieved.is_none());
207    }
208
209    #[test]
210    fn test_get_active_trigger() {
211        let (storage, _temp_dir) = setup_test_storage();
212
213        let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
214        storage.activate_trigger(&trigger).unwrap();
215
216        let retrieved = storage.get_active_trigger("trigger-001").unwrap();
217        assert!(retrieved.is_some());
218
219        // Test non-existent trigger
220        let non_existent = storage.get_active_trigger("nonexistent").unwrap();
221        assert!(non_existent.is_none());
222    }
223
224    #[test]
225    fn test_get_trigger_by_workflow() {
226        let (storage, _temp_dir) = setup_test_storage();
227
228        // Activate multiple triggers for different workflows
229        let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
230        let trigger2 = create_test_webhook_trigger("trigger-002", "workflow-002");
231        let trigger3 = create_test_schedule_trigger("trigger-003", "workflow-001");
232
233        storage.activate_trigger(&trigger1).unwrap();
234        storage.activate_trigger(&trigger2).unwrap();
235        storage.activate_trigger(&trigger3).unwrap();
236
237        // Find trigger by workflow_id (returns first match)
238        let found = storage
239            .get_active_trigger_by_workflow("workflow-001")
240            .unwrap();
241        assert!(found.is_some());
242        let found = found.unwrap();
243        assert_eq!(found.workflow_id, "workflow-001");
244
245        // Test non-existent workflow
246        let not_found = storage
247            .get_active_trigger_by_workflow("workflow-999")
248            .unwrap();
249        assert!(not_found.is_none());
250    }
251
252    #[test]
253    fn test_get_workflow_by_webhook() {
254        let (storage, _temp_dir) = setup_test_storage();
255
256        let trigger = create_test_webhook_trigger("webhook-001", "workflow-001");
257        storage.activate_trigger(&trigger).unwrap();
258
259        // Get workflow_id by webhook_id
260        let workflow_id = storage.get_workflow_by_webhook("webhook-001").unwrap();
261        assert!(workflow_id.is_some());
262        assert_eq!(workflow_id.unwrap(), "workflow-001");
263
264        // Test non-existent webhook
265        let not_found = storage.get_workflow_by_webhook("nonexistent").unwrap();
266        assert!(not_found.is_none());
267    }
268
269    #[test]
270    fn test_update_trigger() {
271        let (storage, _temp_dir) = setup_test_storage();
272
273        // Create and activate trigger
274        let mut trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
275        storage.activate_trigger(&trigger).unwrap();
276
277        // Update trigger (increment count, set last_triggered_at)
278        trigger.trigger_count = 5;
279        trigger.last_triggered_at = Some(chrono::Utc::now().timestamp());
280        storage.update_trigger(&trigger).unwrap();
281
282        // Verify update
283        let retrieved = storage.get_active_trigger("trigger-001").unwrap().unwrap();
284        assert_eq!(retrieved.trigger_count, 5);
285        assert!(retrieved.last_triggered_at.is_some());
286    }
287
288    #[test]
289    fn test_list_active_triggers() {
290        let (storage, _temp_dir) = setup_test_storage();
291
292        // Activate multiple triggers
293        let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
294        let trigger2 = create_test_webhook_trigger("trigger-002", "workflow-002");
295        let trigger3 = create_test_schedule_trigger("trigger-003", "workflow-003");
296
297        storage.activate_trigger(&trigger1).unwrap();
298        storage.activate_trigger(&trigger2).unwrap();
299        storage.activate_trigger(&trigger3).unwrap();
300
301        // List all triggers
302        let triggers = storage.list_active_triggers().unwrap();
303        assert_eq!(triggers.len(), 3);
304
305        let ids: Vec<String> = triggers.iter().map(|t| t.id.clone()).collect();
306        assert!(ids.contains(&"trigger-001".to_string()));
307        assert!(ids.contains(&"trigger-002".to_string()));
308        assert!(ids.contains(&"trigger-003".to_string()));
309    }
310
311    #[test]
312    fn test_list_schedule_triggers() {
313        let (storage, _temp_dir) = setup_test_storage();
314
315        // Activate mixed trigger types
316        let webhook1 = create_test_webhook_trigger("webhook-001", "workflow-001");
317        let webhook2 = create_test_webhook_trigger("webhook-002", "workflow-002");
318        let schedule1 = create_test_schedule_trigger("schedule-001", "workflow-003");
319        let schedule2 = create_test_schedule_trigger("schedule-002", "workflow-004");
320
321        storage.activate_trigger(&webhook1).unwrap();
322        storage.activate_trigger(&webhook2).unwrap();
323        storage.activate_trigger(&schedule1).unwrap();
324        storage.activate_trigger(&schedule2).unwrap();
325
326        // List only schedule triggers
327        let schedule_triggers = storage.list_schedule_triggers().unwrap();
328        assert_eq!(schedule_triggers.len(), 2);
329
330        let ids: Vec<String> = schedule_triggers.iter().map(|t| t.id.clone()).collect();
331        assert!(ids.contains(&"schedule-001".to_string()));
332        assert!(ids.contains(&"schedule-002".to_string()));
333        assert!(!ids.contains(&"webhook-001".to_string()));
334    }
335
336    #[test]
337    fn test_trigger_persistence() {
338        let temp_dir = tempdir().unwrap();
339        let db_path = temp_dir.path().join("test.db");
340
341        // Create and activate triggers
342        {
343            let db = Arc::new(Database::create(&db_path).unwrap());
344            let storage = TriggerStorage::new(db).unwrap();
345
346            let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
347            let trigger2 = create_test_schedule_trigger("trigger-002", "workflow-002");
348
349            storage.activate_trigger(&trigger1).unwrap();
350            storage.activate_trigger(&trigger2).unwrap();
351        }
352
353        // Open database again and verify triggers persisted
354        {
355            let db = Arc::new(Database::open(&db_path).unwrap());
356            let storage = TriggerStorage::new(db).unwrap();
357
358            let triggers = storage.list_active_triggers().unwrap();
359            assert_eq!(triggers.len(), 2);
360
361            let trigger1 = storage.get_active_trigger("trigger-001").unwrap();
362            assert!(trigger1.is_some());
363            assert_eq!(trigger1.unwrap().workflow_id, "workflow-001");
364
365            let trigger2 = storage.get_active_trigger("trigger-002").unwrap();
366            assert!(trigger2.is_some());
367            assert_eq!(trigger2.unwrap().workflow_id, "workflow-002");
368        }
369    }
370}