restflow_core/storage/
workflow.rs

1use crate::models::Workflow;
2use anyhow::Result;
3use redb::{Database, ReadableDatabase, ReadableTable, TableDefinition};
4use std::sync::Arc;
5
6pub const WORKFLOW_TABLE: TableDefinition<&str, &[u8]> = TableDefinition::new("workflow");
7
8pub struct WorkflowStorage {
9    db: Arc<Database>,
10}
11
12impl WorkflowStorage {
13    pub fn new(db: Arc<Database>) -> Result<Self> {
14        // Create table if not exists
15        let write_txn = db.begin_write()?;
16        write_txn.open_table(WORKFLOW_TABLE)?;
17        write_txn.commit()?;
18
19        Ok(Self { db })
20    }
21
22    pub fn create_workflow(&self, workflow: &Workflow) -> Result<()> {
23        let write_txn = self.db.begin_write()?;
24        {
25            let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
26            let json_bytes = serde_json::to_vec(workflow)?;
27            table.insert(workflow.id.as_str(), json_bytes.as_slice())?;
28        }
29        write_txn.commit()?;
30        Ok(())
31    }
32
33    pub fn get_workflow(&self, id: &str) -> Result<Workflow> {
34        let read_txn = self.db.begin_read()?;
35        let table = read_txn.open_table(WORKFLOW_TABLE)?;
36
37        if let Some(value) = table.get(id)? {
38            let workflow: Workflow = serde_json::from_slice(value.value())?;
39            Ok(workflow)
40        } else {
41            Err(anyhow::anyhow!("Workflow {} not found", id))
42        }
43    }
44
45    pub fn list_workflows(&self) -> Result<Vec<Workflow>> {
46        let read_txn = self.db.begin_read()?;
47        let table = read_txn.open_table(WORKFLOW_TABLE)?;
48
49        let mut workflows = Vec::new();
50        for item in table.iter()? {
51            let (_, value) = item?;
52            let workflow: Workflow = serde_json::from_slice(value.value())?;
53            workflows.push(workflow);
54        }
55
56        Ok(workflows)
57    }
58
59    pub fn update_workflow(&self, id: &str, workflow: &Workflow) -> Result<()> {
60        let write_txn = self.db.begin_write()?;
61        {
62            let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
63
64            if table.get(id)?.is_none() {
65                return Err(anyhow::anyhow!("Workflow not found"));
66            }
67
68            let json_bytes = serde_json::to_vec(workflow)?;
69            table.insert(id, json_bytes.as_slice())?;
70        }
71        write_txn.commit()?;
72        Ok(())
73    }
74
75    pub fn delete_workflow(&self, id: &str) -> Result<()> {
76        let write_txn = self.db.begin_write()?;
77        {
78            let mut table = write_txn.open_table(WORKFLOW_TABLE)?;
79
80            if table.get(id)?.is_none() {
81                return Err(anyhow::anyhow!("Workflow not found"));
82            }
83
84            table.remove(id)?;
85        }
86        write_txn.commit()?;
87        Ok(())
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94    use crate::models::{Edge, Node, NodeType, Workflow};
95    use tempfile::tempdir;
96
97    fn create_test_workflow(id: &str) -> Workflow {
98        Workflow {
99            id: id.to_string(),
100            name: format!("Test Workflow {}", id),
101            nodes: vec![
102                Node {
103                    id: "node1".to_string(),
104                    node_type: NodeType::Agent,
105                    config: serde_json::json!({
106                        "model": "gpt-4",
107                        "prompt": "Test prompt"
108                    }),
109                    position: None,
110                },
111                Node {
112                    id: "node2".to_string(),
113                    node_type: NodeType::HttpRequest,
114                    config: serde_json::json!({
115                        "url": "https://api.example.com",
116                        "method": "GET"
117                    }),
118                    position: None,
119                },
120            ],
121            edges: vec![Edge {
122                from: "node1".to_string(),
123                to: "node2".to_string(),
124            }],
125        }
126    }
127
128    #[test]
129    fn test_create_and_get_workflow() {
130        let temp_dir = tempdir().unwrap();
131        let db_path = temp_dir.path().join("test.db");
132        let db = Arc::new(Database::create(db_path).unwrap());
133        let storage = WorkflowStorage::new(db).unwrap();
134
135        let workflow = create_test_workflow("wf-001");
136
137        // Create workflow
138        storage.create_workflow(&workflow).unwrap();
139
140        // Get workflow
141        let retrieved = storage.get_workflow("wf-001").unwrap();
142        assert_eq!(retrieved.id, "wf-001");
143        assert_eq!(retrieved.name, "Test Workflow wf-001");
144        assert_eq!(retrieved.nodes.len(), 2);
145        assert_eq!(retrieved.edges.len(), 1);
146    }
147
148    #[test]
149    fn test_list_workflows() {
150        let temp_dir = tempdir().unwrap();
151        let db_path = temp_dir.path().join("test.db");
152        let db = Arc::new(Database::create(db_path).unwrap());
153        let storage = WorkflowStorage::new(db).unwrap();
154
155        // Create multiple workflows
156        for i in 1..=3 {
157            let workflow = create_test_workflow(&format!("wf-{:03}", i));
158            storage.create_workflow(&workflow).unwrap();
159        }
160
161        // List workflows
162        let workflows = storage.list_workflows().unwrap();
163        assert_eq!(workflows.len(), 3);
164
165        let ids: Vec<String> = workflows.iter().map(|w| w.id.clone()).collect();
166        assert!(ids.contains(&"wf-001".to_string()));
167        assert!(ids.contains(&"wf-002".to_string()));
168        assert!(ids.contains(&"wf-003".to_string()));
169    }
170
171    #[test]
172    fn test_update_workflow() {
173        let temp_dir = tempdir().unwrap();
174        let db_path = temp_dir.path().join("test.db");
175        let db = Arc::new(Database::create(db_path).unwrap());
176        let storage = WorkflowStorage::new(db).unwrap();
177
178        // Create initial workflow
179        let mut workflow = create_test_workflow("wf-001");
180        storage.create_workflow(&workflow).unwrap();
181
182        // Update workflow
183        workflow.name = "Updated Workflow".to_string();
184        workflow.nodes.push(Node {
185            id: "node3".to_string(),
186            node_type: NodeType::DataTransform,
187            config: serde_json::json!({"transform": "x + 10"}),
188            position: None,
189        });
190
191        storage.update_workflow("wf-001", &workflow).unwrap();
192
193        // Verify update
194        let retrieved = storage.get_workflow("wf-001").unwrap();
195        assert_eq!(retrieved.name, "Updated Workflow");
196        assert_eq!(retrieved.nodes.len(), 3);
197    }
198
199    #[test]
200    fn test_update_nonexistent_workflow() {
201        let temp_dir = tempdir().unwrap();
202        let db_path = temp_dir.path().join("test.db");
203        let db = Arc::new(Database::create(db_path).unwrap());
204        let storage = WorkflowStorage::new(db).unwrap();
205
206        let workflow = create_test_workflow("nonexistent");
207        let result = storage.update_workflow("nonexistent", &workflow);
208
209        assert!(result.is_err());
210        assert!(result.unwrap_err().to_string().contains("not found"));
211    }
212
213    #[test]
214    fn test_delete_workflow() {
215        let temp_dir = tempdir().unwrap();
216        let db_path = temp_dir.path().join("test.db");
217        let db = Arc::new(Database::create(db_path).unwrap());
218        let storage = WorkflowStorage::new(db).unwrap();
219
220        // Create workflow
221        let workflow = create_test_workflow("wf-001");
222        storage.create_workflow(&workflow).unwrap();
223
224        // Delete workflow
225        storage.delete_workflow("wf-001").unwrap();
226
227        // Verify deletion
228        let result = storage.get_workflow("wf-001");
229        assert!(result.is_err());
230        assert!(result.unwrap_err().to_string().contains("not found"));
231    }
232
233    #[test]
234    fn test_delete_nonexistent_workflow() {
235        let temp_dir = tempdir().unwrap();
236        let db_path = temp_dir.path().join("test.db");
237        let db = Arc::new(Database::create(db_path).unwrap());
238        let storage = WorkflowStorage::new(db).unwrap();
239
240        let result = storage.delete_workflow("nonexistent");
241        assert!(result.is_err());
242        assert!(result.unwrap_err().to_string().contains("not found"));
243    }
244
245    #[test]
246    fn test_get_nonexistent_workflow() {
247        let temp_dir = tempdir().unwrap();
248        let db_path = temp_dir.path().join("test.db");
249        let db = Arc::new(Database::create(db_path).unwrap());
250        let storage = WorkflowStorage::new(db).unwrap();
251
252        let result = storage.get_workflow("nonexistent");
253        assert!(result.is_err());
254        assert!(result.unwrap_err().to_string().contains("not found"));
255    }
256}