restflow_core/storage/
execution_history.rs

1use crate::models::{ExecutionStatus, ExecutionSummary};
2use anyhow::{anyhow, Result};
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7const EXECUTION_DATA: TableDefinition<&str, &[u8]> = TableDefinition::new("execution_history:data");
8const EXECUTION_INDEX: TableDefinition<&str, &str> =
9    TableDefinition::new("execution_history:index");
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12struct StoredExecutionSummary {
13    execution_id: String,
14    workflow_id: String,
15    status: ExecutionStatus,
16    started_at: i64,
17    completed_at: Option<i64>,
18    total_tasks: usize,
19    completed_tasks: usize,
20    failed_tasks: usize,
21}
22
23impl From<&StoredExecutionSummary> for ExecutionSummary {
24    fn from(value: &StoredExecutionSummary) -> Self {
25        ExecutionSummary {
26            execution_id: value.execution_id.clone(),
27            workflow_id: value.workflow_id.clone(),
28            status: value.status.clone(),
29            started_at: value.started_at,
30            completed_at: value.completed_at,
31            total_tasks: value.total_tasks,
32            completed_tasks: value.completed_tasks,
33            failed_tasks: value.failed_tasks,
34        }
35    }
36}
37
38pub struct ExecutionHistoryStorage {
39    db: Arc<Database>,
40}
41
42impl ExecutionHistoryStorage {
43    pub fn new(db: Arc<Database>) -> Result<Self> {
44        let write_txn = db.begin_write()?;
45        write_txn.open_table(EXECUTION_DATA)?;
46        write_txn.open_table(EXECUTION_INDEX)?;
47        write_txn.commit()?;
48        Ok(Self { db })
49    }
50
51    pub fn record_task_created(
52        &self,
53        workflow_id: &str,
54        execution_id: &str,
55        created_at_nano: i64,
56    ) -> Result<()> {
57        let txn = self.db.begin_write()?;
58        {
59            let mut data_table = txn.open_table(EXECUTION_DATA)?;
60
61            let mut summary = if let Some(existing) = data_table.get(execution_id)? {
62                serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
63            } else {
64                StoredExecutionSummary {
65                    execution_id: execution_id.to_string(),
66                    workflow_id: workflow_id.to_string(),
67                    status: ExecutionStatus::Running,
68                    started_at: nanos_to_millis(created_at_nano),
69                    completed_at: None,
70                    total_tasks: 0,
71                    completed_tasks: 0,
72                    failed_tasks: 0,
73                }
74            };
75
76            summary.total_tasks = summary.total_tasks.saturating_add(1);
77
78            let serialized = serde_json::to_vec(&summary)?;
79            data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
80            drop(data_table);
81
82            let mut index_table = txn.open_table(EXECUTION_INDEX)?;
83            let key =
84                Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
85            index_table.insert(key.as_str(), summary.execution_id.as_str())?;
86        }
87        txn.commit()?;
88        Ok(())
89    }
90
91    pub fn record_task_completed(
92        &self,
93        workflow_id: &str,
94        execution_id: &str,
95        timestamp_ms: i64,
96    ) -> Result<()> {
97        let txn = self.db.begin_write()?;
98        {
99            let mut data_table = txn.open_table(EXECUTION_DATA)?;
100
101            let mut summary = if let Some(existing) = data_table.get(execution_id)? {
102                serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
103            } else {
104                return Err(anyhow!("Execution summary not found for {execution_id}"));
105            };
106
107            debug_assert_eq!(summary.workflow_id, workflow_id);
108
109            summary.completed_tasks = summary.completed_tasks.saturating_add(1);
110
111            if summary.failed_tasks == 0
112                && summary.completed_tasks + summary.failed_tasks == summary.total_tasks
113            {
114                summary.status = ExecutionStatus::Completed;
115                summary.completed_at = Some(timestamp_ms);
116            }
117
118            let serialized = serde_json::to_vec(&summary)?;
119            data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
120            drop(data_table);
121
122            let mut index_table = txn.open_table(EXECUTION_INDEX)?;
123            let key =
124                Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
125            index_table.insert(key.as_str(), summary.execution_id.as_str())?;
126        }
127        txn.commit()?;
128        Ok(())
129    }
130
131    pub fn record_task_failed(
132        &self,
133        workflow_id: &str,
134        execution_id: &str,
135        timestamp_ms: i64,
136    ) -> Result<()> {
137        let txn = self.db.begin_write()?;
138        {
139            let mut data_table = txn.open_table(EXECUTION_DATA)?;
140
141            let mut summary = if let Some(existing) = data_table.get(execution_id)? {
142                serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
143            } else {
144                return Err(anyhow!("Execution summary not found for {execution_id}"));
145            };
146
147            debug_assert_eq!(summary.workflow_id, workflow_id);
148
149            summary.failed_tasks = summary.failed_tasks.saturating_add(1);
150            summary.status = ExecutionStatus::Failed;
151            summary.completed_at.get_or_insert(timestamp_ms);
152
153            let serialized = serde_json::to_vec(&summary)?;
154            data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
155            drop(data_table);
156
157            let mut index_table = txn.open_table(EXECUTION_INDEX)?;
158            let key =
159                Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
160            index_table.insert(key.as_str(), summary.execution_id.as_str())?;
161        }
162        txn.commit()?;
163        Ok(())
164    }
165
166    pub fn list_paginated(
167        &self,
168        workflow_id: &str,
169        page: usize,
170        page_size: usize,
171    ) -> Result<crate::models::ExecutionHistoryPage> {
172        let page = if page == 0 { 1 } else { page };
173        let page_size = page_size.clamp(1, 100);
174        let read_txn = self.db.begin_read()?;
175        let index = read_txn.open_table(EXECUTION_INDEX)?;
176        let data = read_txn.open_table(EXECUTION_DATA)?;
177
178        let prefix = format!("{workflow_id}:");
179
180        let mut exec_ids: Vec<String> = Vec::new();
181        let mut iter = index.range(prefix.as_str()..)?;
182        while let Some(Ok((key, value))) = iter.next() {
183            let key_str = key.value();
184            if !key_str.starts_with(&prefix) {
185                break;
186            }
187
188            let exec_id = value.value();
189            exec_ids.push(exec_id.to_string());
190        }
191
192        let total = exec_ids.len();
193
194        let total_pages = if total == 0 {
195            0
196        } else {
197            ((total - 1) / page_size) + 1
198        };
199
200        let current_page = if total_pages == 0 {
201            1
202        } else {
203            page.min(total_pages)
204        };
205
206        let start_index = (current_page - 1).saturating_mul(page_size);
207        let end_index = (start_index + page_size).min(total);
208        let mut items = Vec::new();
209        if start_index < end_index {
210            for exec_id in &exec_ids[start_index..end_index] {
211                if let Some(summary_bytes) = data.get(exec_id.as_str())? {
212                    let summary: StoredExecutionSummary = serde_json::from_slice(summary_bytes.value())?;
213                    items.push(ExecutionSummary::from(&summary));
214                }
215            }
216        }
217
218        Ok(crate::models::ExecutionHistoryPage {
219            items,
220            total,
221            page: current_page,
222            page_size,
223            total_pages,
224        })
225    }
226
227    fn index_key(workflow_id: &str, started_at_ms: i64, execution_id: &str) -> String {
228        let started_at_ms = started_at_ms.max(0) as u64;
229        let reverse_ts = u64::MAX - started_at_ms;
230        format!("{workflow_id}:{reverse_ts:020}:{execution_id}")
231    }
232}
233
234fn nanos_to_millis(timestamp: i64) -> i64 {
235    if timestamp == 0 {
236        0
237    } else {
238        timestamp / 1_000_000
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245    use tempfile::tempdir;
246
247    fn test_store() -> ExecutionHistoryStorage {
248        let dir = tempdir().unwrap();
249        let db_path = dir.path().join("execution_history.redb");
250        let db = Arc::new(Database::create(db_path).unwrap());
251        ExecutionHistoryStorage::new(db).unwrap()
252    }
253
254    #[test]
255    fn test_create_and_list_execution_history() {
256        let store = test_store();
257
258        store
259            .record_task_created("wf1", "exec-1", 1_000_000_000)
260            .unwrap();
261        store
262            .record_task_created("wf1", "exec-1", 1_000_000_001)
263            .unwrap();
264        store.record_task_completed("wf1", "exec-1", 1_000).unwrap();
265        store
266            .record_task_created("wf1", "exec-2", 2_000_000_000)
267            .unwrap();
268        store.record_task_failed("wf1", "exec-2", 2_000).unwrap();
269
270        let page = store.list_paginated("wf1", 1, 10).unwrap();
271        assert_eq!(page.total, 2);
272        assert_eq!(page.page, 1);
273        assert_eq!(page.items.len(), 2);
274        assert_eq!(page.items[0].execution_id, "exec-2");
275        assert_eq!(page.items[1].execution_id, "exec-1");
276    }
277
278    #[test]
279    fn test_pagination_bounds() {
280        let store = test_store();
281        for i in 0..5 {
282            let exec_id = format!("exec-{i}");
283            store
284                .record_task_created("wf1", &exec_id, 1_000_000_000 - (i as i64) * 1_000_000)
285                .unwrap();
286        }
287
288        let page = store.list_paginated("wf1", 2, 2).unwrap();
289        assert_eq!(page.items.len(), 2);
290        assert_eq!(page.items[0].execution_id, "exec-2");
291        assert_eq!(page.items[1].execution_id, "exec-3");
292
293        let page = store.list_paginated("wf1", 10, 2).unwrap();
294        assert_eq!(page.page, 3);
295        assert_eq!(page.items.len(), 1);
296        assert_eq!(page.items[0].execution_id, "exec-4");
297    }
298}