1use crate::models::{ExecutionStatus, ExecutionSummary};
2use anyhow::{anyhow, Result};
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use serde::{Deserialize, Serialize};
5use std::sync::Arc;
6
7const EXECUTION_DATA: TableDefinition<&str, &[u8]> = TableDefinition::new("execution_history:data");
8const EXECUTION_INDEX: TableDefinition<&str, &str> =
9 TableDefinition::new("execution_history:index");
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
12struct StoredExecutionSummary {
13 execution_id: String,
14 workflow_id: String,
15 status: ExecutionStatus,
16 started_at: i64,
17 completed_at: Option<i64>,
18 total_tasks: usize,
19 completed_tasks: usize,
20 failed_tasks: usize,
21}
22
23impl From<&StoredExecutionSummary> for ExecutionSummary {
24 fn from(value: &StoredExecutionSummary) -> Self {
25 ExecutionSummary {
26 execution_id: value.execution_id.clone(),
27 workflow_id: value.workflow_id.clone(),
28 status: value.status.clone(),
29 started_at: value.started_at,
30 completed_at: value.completed_at,
31 total_tasks: value.total_tasks,
32 completed_tasks: value.completed_tasks,
33 failed_tasks: value.failed_tasks,
34 }
35 }
36}
37
38pub struct ExecutionHistoryStorage {
39 db: Arc<Database>,
40}
41
42impl ExecutionHistoryStorage {
43 pub fn new(db: Arc<Database>) -> Result<Self> {
44 let write_txn = db.begin_write()?;
45 write_txn.open_table(EXECUTION_DATA)?;
46 write_txn.open_table(EXECUTION_INDEX)?;
47 write_txn.commit()?;
48 Ok(Self { db })
49 }
50
51 pub fn record_task_created(
52 &self,
53 workflow_id: &str,
54 execution_id: &str,
55 created_at_nano: i64,
56 ) -> Result<()> {
57 let txn = self.db.begin_write()?;
58 {
59 let mut data_table = txn.open_table(EXECUTION_DATA)?;
60
61 let mut summary = if let Some(existing) = data_table.get(execution_id)? {
62 serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
63 } else {
64 StoredExecutionSummary {
65 execution_id: execution_id.to_string(),
66 workflow_id: workflow_id.to_string(),
67 status: ExecutionStatus::Running,
68 started_at: nanos_to_millis(created_at_nano),
69 completed_at: None,
70 total_tasks: 0,
71 completed_tasks: 0,
72 failed_tasks: 0,
73 }
74 };
75
76 summary.total_tasks = summary.total_tasks.saturating_add(1);
77
78 let serialized = serde_json::to_vec(&summary)?;
79 data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
80 drop(data_table);
81
82 let mut index_table = txn.open_table(EXECUTION_INDEX)?;
83 let key =
84 Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
85 index_table.insert(key.as_str(), summary.execution_id.as_str())?;
86 }
87 txn.commit()?;
88 Ok(())
89 }
90
91 pub fn record_task_completed(
92 &self,
93 workflow_id: &str,
94 execution_id: &str,
95 timestamp_ms: i64,
96 ) -> Result<()> {
97 let txn = self.db.begin_write()?;
98 {
99 let mut data_table = txn.open_table(EXECUTION_DATA)?;
100
101 let mut summary = if let Some(existing) = data_table.get(execution_id)? {
102 serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
103 } else {
104 return Err(anyhow!("Execution summary not found for {execution_id}"));
105 };
106
107 debug_assert_eq!(summary.workflow_id, workflow_id);
108
109 summary.completed_tasks = summary.completed_tasks.saturating_add(1);
110
111 if summary.failed_tasks == 0
112 && summary.completed_tasks + summary.failed_tasks == summary.total_tasks
113 {
114 summary.status = ExecutionStatus::Completed;
115 summary.completed_at = Some(timestamp_ms);
116 }
117
118 let serialized = serde_json::to_vec(&summary)?;
119 data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
120 drop(data_table);
121
122 let mut index_table = txn.open_table(EXECUTION_INDEX)?;
123 let key =
124 Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
125 index_table.insert(key.as_str(), summary.execution_id.as_str())?;
126 }
127 txn.commit()?;
128 Ok(())
129 }
130
131 pub fn record_task_failed(
132 &self,
133 workflow_id: &str,
134 execution_id: &str,
135 timestamp_ms: i64,
136 ) -> Result<()> {
137 let txn = self.db.begin_write()?;
138 {
139 let mut data_table = txn.open_table(EXECUTION_DATA)?;
140
141 let mut summary = if let Some(existing) = data_table.get(execution_id)? {
142 serde_json::from_slice::<StoredExecutionSummary>(existing.value())?
143 } else {
144 return Err(anyhow!("Execution summary not found for {execution_id}"));
145 };
146
147 debug_assert_eq!(summary.workflow_id, workflow_id);
148
149 summary.failed_tasks = summary.failed_tasks.saturating_add(1);
150 summary.status = ExecutionStatus::Failed;
151 summary.completed_at.get_or_insert(timestamp_ms);
152
153 let serialized = serde_json::to_vec(&summary)?;
154 data_table.insert(summary.execution_id.as_str(), serialized.as_slice())?;
155 drop(data_table);
156
157 let mut index_table = txn.open_table(EXECUTION_INDEX)?;
158 let key =
159 Self::index_key(&summary.workflow_id, summary.started_at, &summary.execution_id);
160 index_table.insert(key.as_str(), summary.execution_id.as_str())?;
161 }
162 txn.commit()?;
163 Ok(())
164 }
165
166 pub fn list_paginated(
167 &self,
168 workflow_id: &str,
169 page: usize,
170 page_size: usize,
171 ) -> Result<crate::models::ExecutionHistoryPage> {
172 let page = if page == 0 { 1 } else { page };
173 let page_size = page_size.clamp(1, 100);
174 let read_txn = self.db.begin_read()?;
175 let index = read_txn.open_table(EXECUTION_INDEX)?;
176 let data = read_txn.open_table(EXECUTION_DATA)?;
177
178 let prefix = format!("{workflow_id}:");
179
180 let mut exec_ids: Vec<String> = Vec::new();
181 let mut iter = index.range(prefix.as_str()..)?;
182 while let Some(Ok((key, value))) = iter.next() {
183 let key_str = key.value();
184 if !key_str.starts_with(&prefix) {
185 break;
186 }
187
188 let exec_id = value.value();
189 exec_ids.push(exec_id.to_string());
190 }
191
192 let total = exec_ids.len();
193
194 let total_pages = if total == 0 {
195 0
196 } else {
197 ((total - 1) / page_size) + 1
198 };
199
200 let current_page = if total_pages == 0 {
201 1
202 } else {
203 page.min(total_pages)
204 };
205
206 let start_index = (current_page - 1).saturating_mul(page_size);
207 let end_index = (start_index + page_size).min(total);
208 let mut items = Vec::new();
209 if start_index < end_index {
210 for exec_id in &exec_ids[start_index..end_index] {
211 if let Some(summary_bytes) = data.get(exec_id.as_str())? {
212 let summary: StoredExecutionSummary = serde_json::from_slice(summary_bytes.value())?;
213 items.push(ExecutionSummary::from(&summary));
214 }
215 }
216 }
217
218 Ok(crate::models::ExecutionHistoryPage {
219 items,
220 total,
221 page: current_page,
222 page_size,
223 total_pages,
224 })
225 }
226
227 fn index_key(workflow_id: &str, started_at_ms: i64, execution_id: &str) -> String {
228 let started_at_ms = started_at_ms.max(0) as u64;
229 let reverse_ts = u64::MAX - started_at_ms;
230 format!("{workflow_id}:{reverse_ts:020}:{execution_id}")
231 }
232}
233
234fn nanos_to_millis(timestamp: i64) -> i64 {
235 if timestamp == 0 {
236 0
237 } else {
238 timestamp / 1_000_000
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245 use tempfile::tempdir;
246
247 fn test_store() -> ExecutionHistoryStorage {
248 let dir = tempdir().unwrap();
249 let db_path = dir.path().join("execution_history.redb");
250 let db = Arc::new(Database::create(db_path).unwrap());
251 ExecutionHistoryStorage::new(db).unwrap()
252 }
253
254 #[test]
255 fn test_create_and_list_execution_history() {
256 let store = test_store();
257
258 store
259 .record_task_created("wf1", "exec-1", 1_000_000_000)
260 .unwrap();
261 store
262 .record_task_created("wf1", "exec-1", 1_000_000_001)
263 .unwrap();
264 store.record_task_completed("wf1", "exec-1", 1_000).unwrap();
265 store
266 .record_task_created("wf1", "exec-2", 2_000_000_000)
267 .unwrap();
268 store.record_task_failed("wf1", "exec-2", 2_000).unwrap();
269
270 let page = store.list_paginated("wf1", 1, 10).unwrap();
271 assert_eq!(page.total, 2);
272 assert_eq!(page.page, 1);
273 assert_eq!(page.items.len(), 2);
274 assert_eq!(page.items[0].execution_id, "exec-2");
275 assert_eq!(page.items[1].execution_id, "exec-1");
276 }
277
278 #[test]
279 fn test_pagination_bounds() {
280 let store = test_store();
281 for i in 0..5 {
282 let exec_id = format!("exec-{i}");
283 store
284 .record_task_created("wf1", &exec_id, 1_000_000_000 - (i as i64) * 1_000_000)
285 .unwrap();
286 }
287
288 let page = store.list_paginated("wf1", 2, 2).unwrap();
289 assert_eq!(page.items.len(), 2);
290 assert_eq!(page.items[0].execution_id, "exec-2");
291 assert_eq!(page.items[1].execution_id, "exec-3");
292
293 let page = store.list_paginated("wf1", 10, 2).unwrap();
294 assert_eq!(page.page, 3);
295 assert_eq!(page.items.len(), 1);
296 assert_eq!(page.items[0].execution_id, "exec-4");
297 }
298}