restflow_core/storage/
trigger.rs1use crate::models::ActiveTrigger;
2use anyhow::Result;
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use std::sync::Arc;
5
6pub const ACTIVE_TRIGGERS_TABLE: TableDefinition<&str, &[u8]> =
8 TableDefinition::new("active_triggers");
9
10pub struct TriggerStorage {
11 db: Arc<Database>,
12}
13
14impl TriggerStorage {
15 pub fn new(db: Arc<Database>) -> Result<Self> {
16 let write_txn = db.begin_write()?;
18 write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
19 write_txn.commit()?;
20
21 Ok(Self { db })
22 }
23
24 pub fn activate_trigger(&self, trigger: &ActiveTrigger) -> Result<()> {
26 let write_txn = self.db.begin_write()?;
27 {
28 let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
29 let json_bytes = serde_json::to_vec(trigger)?;
30 table.insert(trigger.id.as_str(), json_bytes.as_slice())?;
31 }
32 write_txn.commit()?;
33 Ok(())
34 }
35
36 pub fn deactivate_trigger(&self, trigger_id: &str) -> Result<()> {
38 let write_txn = self.db.begin_write()?;
39 {
40 let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
41 table.remove(trigger_id)?;
42 }
43 write_txn.commit()?;
44 Ok(())
45 }
46
47 pub fn get_active_trigger_by_workflow(
49 &self,
50 workflow_id: &str,
51 ) -> Result<Option<ActiveTrigger>> {
52 let read_txn = self.db.begin_read()?;
53 let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
54
55 for item in table.iter()? {
56 let (_, value) = item?;
57 let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
58 if trigger.workflow_id == workflow_id {
59 return Ok(Some(trigger));
60 }
61 }
62
63 Ok(None)
64 }
65
66 pub fn get_active_trigger(&self, trigger_id: &str) -> Result<Option<ActiveTrigger>> {
68 let read_txn = self.db.begin_read()?;
69 let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
70
71 if let Some(value) = table.get(trigger_id)? {
72 let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
73 Ok(Some(trigger))
74 } else {
75 Ok(None)
76 }
77 }
78
79 pub fn get_workflow_by_webhook(&self, webhook_id: &str) -> Result<Option<String>> {
81 if let Some(trigger) = self.get_active_trigger(webhook_id)? {
82 Ok(Some(trigger.workflow_id))
83 } else {
84 Ok(None)
85 }
86 }
87
88 pub fn update_trigger(&self, trigger: &ActiveTrigger) -> Result<()> {
90 let write_txn = self.db.begin_write()?;
91 {
92 let mut table = write_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
93 let json_bytes = serde_json::to_vec(trigger)?;
94 table.insert(trigger.id.as_str(), json_bytes.as_slice())?;
95 }
96 write_txn.commit()?;
97 Ok(())
98 }
99
100 pub fn list_active_triggers(&self) -> Result<Vec<ActiveTrigger>> {
102 let read_txn = self.db.begin_read()?;
103 let table = read_txn.open_table(ACTIVE_TRIGGERS_TABLE)?;
104
105 let mut triggers = Vec::new();
106 for item in table.iter()? {
107 let (_, value) = item?;
108 let trigger: ActiveTrigger = serde_json::from_slice(value.value())?;
109 triggers.push(trigger);
110 }
111
112 Ok(triggers)
113 }
114
115 pub fn list_schedule_triggers(&self) -> Result<Vec<ActiveTrigger>> {
117 let triggers = self.list_active_triggers()?;
118 Ok(triggers
119 .into_iter()
120 .filter(|t| {
121 matches!(
122 t.trigger_config,
123 crate::models::TriggerConfig::Schedule { .. }
124 )
125 })
126 .collect())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use crate::models::{ActiveTrigger, AuthConfig, TriggerConfig};
134 use tempfile::tempdir;
135
136 fn create_test_webhook_trigger(id: &str, workflow_id: &str) -> ActiveTrigger {
137 ActiveTrigger {
138 id: id.to_string(),
139 workflow_id: workflow_id.to_string(),
140 trigger_config: TriggerConfig::Webhook {
141 path: format!("/api/webhook/{}", id),
142 method: "POST".to_string(),
143 auth: Some(AuthConfig::ApiKey {
144 key: "test-key".to_string(),
145 header_name: Some("X-API-Key".to_string()),
146 }),
147 },
148 trigger_count: 0,
149 activated_at: chrono::Utc::now().timestamp(),
150 last_triggered_at: None,
151 }
152 }
153
154 fn create_test_schedule_trigger(id: &str, workflow_id: &str) -> ActiveTrigger {
155 ActiveTrigger {
156 id: id.to_string(),
157 workflow_id: workflow_id.to_string(),
158 trigger_config: TriggerConfig::Schedule {
159 cron: "0 */5 * * * *".to_string(),
160 timezone: Some("UTC".to_string()),
161 payload: Some(serde_json::json!({"scheduled": true})),
162 },
163 trigger_count: 0,
164 activated_at: chrono::Utc::now().timestamp(),
165 last_triggered_at: None,
166 }
167 }
168
169 fn setup_test_storage() -> (TriggerStorage, tempfile::TempDir) {
170 let temp_dir = tempdir().unwrap();
171 let db_path = temp_dir.path().join("test.db");
172 let db = Arc::new(Database::create(db_path).unwrap());
173 let storage = TriggerStorage::new(db).unwrap();
174 (storage, temp_dir)
175 }
176
177 #[test]
178 fn test_activate_trigger() {
179 let (storage, _temp_dir) = setup_test_storage();
180
181 let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
182 storage.activate_trigger(&trigger).unwrap();
183
184 let retrieved = storage.get_active_trigger("trigger-001").unwrap();
186 assert!(retrieved.is_some());
187
188 let retrieved = retrieved.unwrap();
189 assert_eq!(retrieved.id, "trigger-001");
190 assert_eq!(retrieved.workflow_id, "workflow-001");
191 }
192
193 #[test]
194 fn test_deactivate_trigger() {
195 let (storage, _temp_dir) = setup_test_storage();
196
197 let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
199 storage.activate_trigger(&trigger).unwrap();
200
201 storage.deactivate_trigger("trigger-001").unwrap();
203
204 let retrieved = storage.get_active_trigger("trigger-001").unwrap();
206 assert!(retrieved.is_none());
207 }
208
209 #[test]
210 fn test_get_active_trigger() {
211 let (storage, _temp_dir) = setup_test_storage();
212
213 let trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
214 storage.activate_trigger(&trigger).unwrap();
215
216 let retrieved = storage.get_active_trigger("trigger-001").unwrap();
217 assert!(retrieved.is_some());
218
219 let non_existent = storage.get_active_trigger("nonexistent").unwrap();
221 assert!(non_existent.is_none());
222 }
223
224 #[test]
225 fn test_get_trigger_by_workflow() {
226 let (storage, _temp_dir) = setup_test_storage();
227
228 let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
230 let trigger2 = create_test_webhook_trigger("trigger-002", "workflow-002");
231 let trigger3 = create_test_schedule_trigger("trigger-003", "workflow-001");
232
233 storage.activate_trigger(&trigger1).unwrap();
234 storage.activate_trigger(&trigger2).unwrap();
235 storage.activate_trigger(&trigger3).unwrap();
236
237 let found = storage
239 .get_active_trigger_by_workflow("workflow-001")
240 .unwrap();
241 assert!(found.is_some());
242 let found = found.unwrap();
243 assert_eq!(found.workflow_id, "workflow-001");
244
245 let not_found = storage
247 .get_active_trigger_by_workflow("workflow-999")
248 .unwrap();
249 assert!(not_found.is_none());
250 }
251
252 #[test]
253 fn test_get_workflow_by_webhook() {
254 let (storage, _temp_dir) = setup_test_storage();
255
256 let trigger = create_test_webhook_trigger("webhook-001", "workflow-001");
257 storage.activate_trigger(&trigger).unwrap();
258
259 let workflow_id = storage.get_workflow_by_webhook("webhook-001").unwrap();
261 assert!(workflow_id.is_some());
262 assert_eq!(workflow_id.unwrap(), "workflow-001");
263
264 let not_found = storage.get_workflow_by_webhook("nonexistent").unwrap();
266 assert!(not_found.is_none());
267 }
268
269 #[test]
270 fn test_update_trigger() {
271 let (storage, _temp_dir) = setup_test_storage();
272
273 let mut trigger = create_test_webhook_trigger("trigger-001", "workflow-001");
275 storage.activate_trigger(&trigger).unwrap();
276
277 trigger.trigger_count = 5;
279 trigger.last_triggered_at = Some(chrono::Utc::now().timestamp());
280 storage.update_trigger(&trigger).unwrap();
281
282 let retrieved = storage.get_active_trigger("trigger-001").unwrap().unwrap();
284 assert_eq!(retrieved.trigger_count, 5);
285 assert!(retrieved.last_triggered_at.is_some());
286 }
287
288 #[test]
289 fn test_list_active_triggers() {
290 let (storage, _temp_dir) = setup_test_storage();
291
292 let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
294 let trigger2 = create_test_webhook_trigger("trigger-002", "workflow-002");
295 let trigger3 = create_test_schedule_trigger("trigger-003", "workflow-003");
296
297 storage.activate_trigger(&trigger1).unwrap();
298 storage.activate_trigger(&trigger2).unwrap();
299 storage.activate_trigger(&trigger3).unwrap();
300
301 let triggers = storage.list_active_triggers().unwrap();
303 assert_eq!(triggers.len(), 3);
304
305 let ids: Vec<String> = triggers.iter().map(|t| t.id.clone()).collect();
306 assert!(ids.contains(&"trigger-001".to_string()));
307 assert!(ids.contains(&"trigger-002".to_string()));
308 assert!(ids.contains(&"trigger-003".to_string()));
309 }
310
311 #[test]
312 fn test_list_schedule_triggers() {
313 let (storage, _temp_dir) = setup_test_storage();
314
315 let webhook1 = create_test_webhook_trigger("webhook-001", "workflow-001");
317 let webhook2 = create_test_webhook_trigger("webhook-002", "workflow-002");
318 let schedule1 = create_test_schedule_trigger("schedule-001", "workflow-003");
319 let schedule2 = create_test_schedule_trigger("schedule-002", "workflow-004");
320
321 storage.activate_trigger(&webhook1).unwrap();
322 storage.activate_trigger(&webhook2).unwrap();
323 storage.activate_trigger(&schedule1).unwrap();
324 storage.activate_trigger(&schedule2).unwrap();
325
326 let schedule_triggers = storage.list_schedule_triggers().unwrap();
328 assert_eq!(schedule_triggers.len(), 2);
329
330 let ids: Vec<String> = schedule_triggers.iter().map(|t| t.id.clone()).collect();
331 assert!(ids.contains(&"schedule-001".to_string()));
332 assert!(ids.contains(&"schedule-002".to_string()));
333 assert!(!ids.contains(&"webhook-001".to_string()));
334 }
335
336 #[test]
337 fn test_trigger_persistence() {
338 let temp_dir = tempdir().unwrap();
339 let db_path = temp_dir.path().join("test.db");
340
341 {
343 let db = Arc::new(Database::create(&db_path).unwrap());
344 let storage = TriggerStorage::new(db).unwrap();
345
346 let trigger1 = create_test_webhook_trigger("trigger-001", "workflow-001");
347 let trigger2 = create_test_schedule_trigger("trigger-002", "workflow-002");
348
349 storage.activate_trigger(&trigger1).unwrap();
350 storage.activate_trigger(&trigger2).unwrap();
351 }
352
353 {
355 let db = Arc::new(Database::open(&db_path).unwrap());
356 let storage = TriggerStorage::new(db).unwrap();
357
358 let triggers = storage.list_active_triggers().unwrap();
359 assert_eq!(triggers.len(), 2);
360
361 let trigger1 = storage.get_active_trigger("trigger-001").unwrap();
362 assert!(trigger1.is_some());
363 assert_eq!(trigger1.unwrap().workflow_id, "workflow-001");
364
365 let trigger2 = storage.get_active_trigger("trigger-002").unwrap();
366 assert!(trigger2.is_some());
367 assert_eq!(trigger2.unwrap().workflow_id, "workflow-002");
368 }
369 }
370}