restflow_core/node/
registry.rs

1use crate::engine::context::ExecutionContext;
2use crate::models::{
3    AgentOutput, HttpOutput, NodeOutput, NodeType, PrintOutput, PythonOutput,
4};
5use crate::node::trigger::TriggerExecutor;
6use anyhow::Result;
7use async_trait::async_trait;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[async_trait]
13pub trait NodeExecutor: Send + Sync {
14    async fn execute(
15        &self,
16        node_type: &NodeType,
17        config: &Value,
18        context: &mut ExecutionContext,
19    ) -> Result<NodeOutput>;
20}
21
22pub struct NodeRegistry {
23    executors: HashMap<NodeType, Arc<dyn NodeExecutor>>,
24}
25
26impl NodeRegistry {
27    pub fn new() -> Self {
28        let mut registry = Self {
29            executors: HashMap::new(),
30        };
31
32        // Register trigger executor (using unified TriggerExecutor)
33        let trigger_executor = Arc::new(TriggerExecutor);
34        registry.register(NodeType::ManualTrigger, trigger_executor.clone());
35        registry.register(NodeType::WebhookTrigger, trigger_executor.clone());
36        registry.register(NodeType::ScheduleTrigger, trigger_executor);
37
38        // Register other node executors
39        registry.register(NodeType::HttpRequest, Arc::new(HttpRequestExecutor));
40        registry.register(NodeType::Print, Arc::new(PrintExecutor));
41        registry.register(NodeType::Agent, Arc::new(AgentExecutor));
42        registry.register(NodeType::Python, Arc::new(PythonExecutor));
43
44        registry
45    }
46
47    pub fn register(&mut self, node_type: NodeType, executor: Arc<dyn NodeExecutor>) {
48        self.executors.insert(node_type, executor);
49    }
50
51    pub fn get(&self, node_type: &NodeType) -> Option<Arc<dyn NodeExecutor>> {
52        self.executors.get(node_type).cloned()
53    }
54}
55
56impl Default for NodeRegistry {
57    fn default() -> Self {
58        Self::new()
59    }
60}
61
62struct HttpRequestExecutor;
63
64#[async_trait]
65impl NodeExecutor for HttpRequestExecutor {
66    async fn execute(
67        &self,
68        _node_type: &NodeType,
69        config: &Value,
70        _context: &mut ExecutionContext,
71    ) -> Result<NodeOutput> {
72        let url = config["url"]
73            .as_str()
74            .ok_or_else(|| anyhow::anyhow!("URL not found in config"))?;
75        let method = config["method"].as_str().unwrap_or("GET");
76
77        // Parse timeout (default: 30 seconds)
78        let timeout_ms = config["timeout_ms"].as_u64().unwrap_or(30000);
79        let timeout = std::time::Duration::from_millis(timeout_ms);
80
81        // Build client with timeout
82        let client = reqwest::Client::builder()
83            .timeout(timeout)
84            .build()
85            .map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?;
86
87        // Build request
88        let mut request_builder = match method.to_uppercase().as_str() {
89            "GET" => client.get(url),
90            "POST" => client.post(url),
91            "PUT" => client.put(url),
92            "DELETE" => client.delete(url),
93            "PATCH" => client.patch(url),
94            _ => return Err(anyhow::anyhow!("Unsupported HTTP method: {}", method)),
95        };
96
97        // Add headers if present
98        if let Some(headers) = config.get("headers").and_then(|h| h.as_object()) {
99            for (key, value) in headers {
100                if let Some(value_str) = value.as_str() {
101                    request_builder = request_builder.header(key, value_str);
102                }
103            }
104        }
105
106        // Add body if present (for POST, PUT, PATCH)
107        if matches!(method.to_uppercase().as_str(), "POST" | "PUT" | "PATCH")
108            && let Some(body) = config.get("body")
109        {
110            if body.is_string() {
111                // String body
112                request_builder = request_builder.body(body.as_str().unwrap().to_string());
113            } else {
114                // JSON body
115                request_builder = request_builder.json(body);
116            }
117        }
118
119        // Send request
120        let response = request_builder
121            .send()
122            .await
123            .map_err(|e| anyhow::anyhow!("HTTP request failed: {}", e))?;
124
125        // Extract status and headers
126        let status = response.status().as_u16();
127        let headers: HashMap<String, String> = response
128            .headers()
129            .iter()
130            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
131            .collect();
132
133        // Read body
134        let body_text = response
135            .text()
136            .await
137            .map_err(|e| anyhow::anyhow!("Failed to read response body: {}", e))?;
138
139        // Try to parse as JSON, fallback to string
140        let body = serde_json::from_str::<Value>(&body_text).unwrap_or(Value::String(body_text));
141
142        Ok(NodeOutput::Http(HttpOutput {
143            status,
144            headers,
145            body,
146        }))
147    }
148}
149
150struct PrintExecutor;
151
152#[async_trait]
153impl NodeExecutor for PrintExecutor {
154    async fn execute(
155        &self,
156        _node_type: &NodeType,
157        config: &Value,
158        _context: &mut ExecutionContext,
159    ) -> Result<NodeOutput> {
160        let message = config["message"].as_str().unwrap_or("No message provided");
161        println!("{}", message);
162
163        Ok(NodeOutput::Print(PrintOutput {
164            printed: message.to_string(),
165        }))
166    }
167}
168
169struct AgentExecutor;
170
171#[async_trait]
172impl NodeExecutor for AgentExecutor {
173    async fn execute(
174        &self,
175        _node_type: &NodeType,
176        config: &Value,
177        context: &mut ExecutionContext,
178    ) -> Result<NodeOutput> {
179        use crate::node::agent::AgentNode;
180
181        let agent = AgentNode::from_config(config)?;
182        let input = config["input"].as_str().unwrap_or("Hello");
183
184        let secret_storage = context.secret_storage.as_ref().map(|s| s.as_ref());
185        let response = agent
186            .execute(input, secret_storage)
187            .await
188            .map_err(|e| anyhow::anyhow!("Agent execution failed: {}", e))?;
189
190        Ok(NodeOutput::Agent(AgentOutput { response }))
191    }
192}
193
194struct PythonExecutor;
195
196#[async_trait]
197impl NodeExecutor for PythonExecutor {
198    async fn execute(
199        &self,
200        _node_type: &NodeType,
201        config: &Value,
202        context: &mut ExecutionContext,
203    ) -> Result<NodeOutput> {
204        use crate::node::python::PythonNode;
205
206        let python = PythonNode::from_config(config)?;
207        let script = python.build_script();
208
209        // Get input from config or use empty object
210        let input = config.get("input").cloned().unwrap_or_else(|| serde_json::json!({}));
211
212        // Get PythonManager from context
213        let manager = context
214            .python_manager
215            .as_ref()
216            .ok_or_else(|| anyhow::anyhow!("Python manager not available"))?;
217
218        // Read common AI API keys from Secret Manager
219        let mut env_vars = std::collections::HashMap::new();
220        if let Some(secret_storage) = &context.secret_storage {
221            // Try to load OPENAI_API_KEY
222            if let Ok(Some(key)) = secret_storage.get_secret("OPENAI_API_KEY") {
223                env_vars.insert("OPENAI_API_KEY".to_string(), key);
224            }
225            // Add other AI providers as needed
226            if let Ok(Some(key)) = secret_storage.get_secret("ANTHROPIC_API_KEY") {
227                env_vars.insert("ANTHROPIC_API_KEY".to_string(), key);
228            }
229        }
230
231        let result = manager.execute_inline_code(&script, input, env_vars).await?;
232
233        Ok(NodeOutput::Python(PythonOutput { result }))
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use serde_json::json;
241
242    #[test]
243    fn test_registry_creation() {
244        let registry = NodeRegistry::new();
245
246        // Verify all node types are registered
247        assert!(registry.get(&NodeType::HttpRequest).is_some());
248        assert!(registry.get(&NodeType::Print).is_some());
249        assert!(registry.get(&NodeType::Agent).is_some());
250        assert!(registry.get(&NodeType::Python).is_some());
251        assert!(registry.get(&NodeType::ManualTrigger).is_some());
252        assert!(registry.get(&NodeType::WebhookTrigger).is_some());
253        assert!(registry.get(&NodeType::ScheduleTrigger).is_some());
254    }
255
256    #[test]
257    fn test_registry_get_nonexistent() {
258        let registry = NodeRegistry::new();
259
260        // All known types should be registered, so we can't easily test nonexistent
261        // This test just verifies the registry works
262        assert!(registry.get(&NodeType::Print).is_some());
263    }
264
265    #[tokio::test]
266    async fn test_print_executor_basic() {
267        let executor = PrintExecutor;
268        let mut context = ExecutionContext::new("test".to_string());
269
270        let config = json!({
271            "message": "Hello World"
272        });
273
274        let result = executor
275            .execute(&NodeType::Print, &config, &mut context)
276            .await;
277
278        assert!(result.is_ok());
279        if let NodeOutput::Print(output) = result.unwrap() {
280            assert_eq!(output.printed, "Hello World");
281        } else {
282            panic!("Expected Print output");
283        }
284    }
285
286    #[tokio::test]
287    async fn test_print_executor_empty_message() {
288        let executor = PrintExecutor;
289        let mut context = ExecutionContext::new("test".to_string());
290
291        let config = json!({});
292
293        let result = executor
294            .execute(&NodeType::Print, &config, &mut context)
295            .await;
296
297        assert!(result.is_ok());
298        if let NodeOutput::Print(output) = result.unwrap() {
299            assert_eq!(output.printed, "No message provided");
300        } else {
301            panic!("Expected Print output");
302        }
303    }
304
305    #[tokio::test]
306    async fn test_http_executor_missing_url() {
307        let executor = HttpRequestExecutor;
308        let mut context = ExecutionContext::new("test".to_string());
309
310        let config = json!({
311            "method": "GET"
312        });
313
314        let result = executor
315            .execute(&NodeType::HttpRequest, &config, &mut context)
316            .await;
317
318        assert!(result.is_err());
319        assert!(result.unwrap_err().to_string().contains("URL not found"));
320    }
321
322    #[tokio::test]
323    async fn test_http_executor_unsupported_method() {
324        let executor = HttpRequestExecutor;
325        let mut context = ExecutionContext::new("test".to_string());
326
327        let config = json!({
328            "url": "http://example.com",
329            "method": "INVALID"
330        });
331
332        let result = executor
333            .execute(&NodeType::HttpRequest, &config, &mut context)
334            .await;
335
336        assert!(result.is_err());
337        assert!(result
338            .unwrap_err()
339            .to_string()
340            .contains("Unsupported HTTP method"));
341    }
342
343    #[tokio::test]
344    #[ignore = "Requires network access"]
345    async fn test_http_executor_get_request() {
346        let executor = HttpRequestExecutor;
347        let mut context = ExecutionContext::new("test".to_string());
348
349        let config = json!({
350            "url": "https://httpbin.org/get",
351            "method": "GET"
352        });
353
354        let result = executor
355            .execute(&NodeType::HttpRequest, &config, &mut context)
356            .await;
357
358        assert!(result.is_ok());
359        if let NodeOutput::Http(output) = result.unwrap() {
360            assert_eq!(output.status, 200);
361            assert!(output.body.is_object());
362        } else {
363            panic!("Expected Http output");
364        }
365    }
366
367    #[tokio::test]
368    #[ignore = "Requires network access"]
369    async fn test_http_executor_post_with_json() {
370        let executor = HttpRequestExecutor;
371        let mut context = ExecutionContext::new("test".to_string());
372
373        let config = json!({
374            "url": "https://httpbin.org/post",
375            "method": "POST",
376            "body": {"test": "data"},
377            "headers": {"Content-Type": "application/json"}
378        });
379
380        let result = executor
381            .execute(&NodeType::HttpRequest, &config, &mut context)
382            .await;
383
384        assert!(result.is_ok());
385        if let NodeOutput::Http(output) = result.unwrap() {
386            assert_eq!(output.status, 200);
387        } else {
388            panic!("Expected Http output");
389        }
390    }
391
392    #[tokio::test]
393    async fn test_python_executor_without_manager() {
394        let executor = PythonExecutor;
395        let mut context = ExecutionContext::new("test".to_string());
396
397        let config = json!({
398            "code": "print('test')"
399        });
400
401        let result = executor
402            .execute(&NodeType::Python, &config, &mut context)
403            .await;
404
405        assert!(result.is_err());
406        assert!(result
407            .unwrap_err()
408            .to_string()
409            .contains("Python manager not available"));
410    }
411}