restflow_core/storage/
workflow.rs1use crate::models::Workflow;
2use anyhow::Result;
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use std::sync::Arc;
5
6pub const WORKFLOW_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("workflow");
7
8pub struct WorkflowStorage {
9 db: Arc<Database>,
10}
11
12impl WorkflowStorage {
13 pub fn new(db: Arc<Database>) -> Result<Self> {
14 let write_txn = db.begin_write()?;
16 write_txn.open_table(WORKFLOW_TABLE)?;
17 write_txn.commit()?;
18
19 Ok(Self { db })
20 }
21
22 pub fn create_workflow(&self, workflow: &Workflow) -> Result<()> {
23 let write_txn = self.db.begin_write()?;
24 {
25 let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
26 let json_bytes = serde_json::to_vec(workflow)?;
27 table.insert(workflow.id.as_str(), json_bytes.as_slice())?;
28 }
29 write_txn.commit()?;
30 Ok(())
31 }
32
33 pub fn get_workflow(&self, id: &str) -> Result<Workflow> {
34 let read_txn = self.db.begin_read()?;
35 let table = read_txn.open_table(WORKFLOW_TABLE)?;
36
37 if let Some(value) = table.get(id)? {
38 let workflow: Workflow = serde_json::from_slice(value.value())?;
39 Ok(workflow)
40 } else {
41 Err(anyhow::anyhow!("Workflow {} not found", id))
42 }
43 }
44
45 pub fn list_workflows(&self) -> Result<Vec<Workflow>> {
46 let read_txn = self.db.begin_read()?;
47 let table = read_txn.open_table(WORKFLOW_TABLE)?;
48
49 let mut workflows = Vec::new();
50 for item in table.iter()? {
51 let (_, value) = item?;
52 let workflow: Workflow = serde_json::from_slice(value.value())?;
53 workflows.push(workflow);
54 }
55
56 Ok(workflows)
57 }
58
59 pub fn update_workflow(&self, id: &str, workflow: &Workflow) -> Result<()> {
60 let write_txn = self.db.begin_write()?;
61 {
62 let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
63
64 if table.get(id)?.is_none() {
65 return Err(anyhow::anyhow!("Workflow not found"));
66 }
67
68 let json_bytes = serde_json::to_vec(workflow)?;
69 table.insert(id, json_bytes.as_slice())?;
70 }
71 write_txn.commit()?;
72 Ok(())
73 }
74
75 pub fn delete_workflow(&self, id: &str) -> Result<()> {
76 let write_txn = self.db.begin_write()?;
77 {
78 let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
79
80 if table.get(id)?.is_none() {
81 return Err(anyhow::anyhow!("Workflow not found"));
82 }
83
84 table.remove(id)?;
85 }
86 write_txn.commit()?;
87 Ok(())
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94 use crate::models::{Edge, Node, NodeType, Workflow};
95 use tempfile::tempdir;
96
97 fn create_test_workflow(id: &str) -> Workflow {
98 Workflow {
99 id: id.to_string(),
100 name: format!("Test Workflow {}", id),
101 nodes: vec![
102 Node {
103 id: "node1".to_string(),
104 node_type: NodeType::Agent,
105 config: serde_json::json!({
106 "model": "gpt-4",
107 "prompt": "Test prompt"
108 }),
109 position: None,
110 },
111 Node {
112 id: "node2".to_string(),
113 node_type: NodeType::HttpRequest,
114 config: serde_json::json!({
115 "url": "https://api.example.com",
116 "method": "GET"
117 }),
118 position: None,
119 },
120 ],
121 edges: vec![Edge {
122 from: "node1".to_string(),
123 to: "node2".to_string(),
124 }],
125 }
126 }
127
128 #[test]
129 fn test_create_and_get_workflow() {
130 let temp_dir = tempdir().unwrap();
131 let db_path = temp_dir.path().join("test.db");
132 let db = Arc::new(Database::create(db_path).unwrap());
133 let storage = WorkflowStorage::new(db).unwrap();
134
135 let workflow = create_test_workflow("wf-001");
136
137 storage.create_workflow(&workflow).unwrap();
139
140 let retrieved = storage.get_workflow("wf-001").unwrap();
142 assert_eq!(retrieved.id, "wf-001");
143 assert_eq!(retrieved.name, "Test Workflow wf-001");
144 assert_eq!(retrieved.nodes.len(), 2);
145 assert_eq!(retrieved.edges.len(), 1);
146 }
147
148 #[test]
149 fn test_list_workflows() {
150 let temp_dir = tempdir().unwrap();
151 let db_path = temp_dir.path().join("test.db");
152 let db = Arc::new(Database::create(db_path).unwrap());
153 let storage = WorkflowStorage::new(db).unwrap();
154
155 for i in 1..=3 {
157 let workflow = create_test_workflow(&format!("wf-{:03}", i));
158 storage.create_workflow(&workflow).unwrap();
159 }
160
161 let workflows = storage.list_workflows().unwrap();
163 assert_eq!(workflows.len(), 3);
164
165 let ids: Vec<String> = workflows.iter().map(|w| w.id.clone()).collect();
166 assert!(ids.contains(&"wf-001".to_string()));
167 assert!(ids.contains(&"wf-002".to_string()));
168 assert!(ids.contains(&"wf-003".to_string()));
169 }
170
171 #[test]
172 fn test_update_workflow() {
173 let temp_dir = tempdir().unwrap();
174 let db_path = temp_dir.path().join("test.db");
175 let db = Arc::new(Database::create(db_path).unwrap());
176 let storage = WorkflowStorage::new(db).unwrap();
177
178 let mut workflow = create_test_workflow("wf-001");
180 storage.create_workflow(&workflow).unwrap();
181
182 workflow.name = "Updated Workflow".to_string();
184 workflow.nodes.push(Node {
185 id: "node3".to_string(),
186 node_type: NodeType::DataTransform,
187 config: serde_json::json!({"transform": "x + 10"}),
188 position: None,
189 });
190
191 storage.update_workflow("wf-001", &workflow).unwrap();
192
193 let retrieved = storage.get_workflow("wf-001").unwrap();
195 assert_eq!(retrieved.name, "Updated Workflow");
196 assert_eq!(retrieved.nodes.len(), 3);
197 }
198
199 #[test]
200 fn test_update_nonexistent_workflow() {
201 let temp_dir = tempdir().unwrap();
202 let db_path = temp_dir.path().join("test.db");
203 let db = Arc::new(Database::create(db_path).unwrap());
204 let storage = WorkflowStorage::new(db).unwrap();
205
206 let workflow = create_test_workflow("nonexistent");
207 let result = storage.update_workflow("nonexistent", &workflow);
208
209 assert!(result.is_err());
210 assert!(result.unwrap_err().to_string().contains("not found"));
211 }
212
213 #[test]
214 fn test_delete_workflow() {
215 let temp_dir = tempdir().unwrap();
216 let db_path = temp_dir.path().join("test.db");
217 let db = Arc::new(Database::create(db_path).unwrap());
218 let storage = WorkflowStorage::new(db).unwrap();
219
220 let workflow = create_test_workflow("wf-001");
222 storage.create_workflow(&workflow).unwrap();
223
224 storage.delete_workflow("wf-001").unwrap();
226
227 let result = storage.get_workflow("wf-001");
229 assert!(result.is_err());
230 assert!(result.unwrap_err().to_string().contains("not found"));
231 }
232
233 #[test]
234 fn test_delete_nonexistent_workflow() {
235 let temp_dir = tempdir().unwrap();
236 let db_path = temp_dir.path().join("test.db");
237 let db = Arc::new(Database::create(db_path).unwrap());
238 let storage = WorkflowStorage::new(db).unwrap();
239
240 let result = storage.delete_workflow("nonexistent");
241 assert!(result.is_err());
242 assert!(result.unwrap_err().to_string().contains("not found"));
243 }
244
245 #[test]
246 fn test_get_nonexistent_workflow() {
247 let temp_dir = tempdir().unwrap();
248 let db_path = temp_dir.path().join("test.db");
249 let db = Arc::new(Database::create(db_path).unwrap());
250 let storage = WorkflowStorage::new(db).unwrap();
251
252 let result = storage.get_workflow("nonexistent");
253 assert!(result.is_err());
254 assert!(result.unwrap_err().to_string().contains("not found"));
255 }
256}