1use crate::engine::context::ExecutionContext;
2use crate::models::{
3 AgentOutput, HttpOutput, NodeOutput, NodeType, PrintOutput, PythonOutput,
4};
5use crate::node::trigger::TriggerExecutor;
6use anyhow::Result;
7use async_trait::async_trait;
8use serde_json::Value;
9use std::collections::HashMap;
10use std::sync::Arc;
11
12#[async_trait]
13pub trait NodeExecutor: Send + Sync {
14 async fn execute(
15 &self,
16 node_type: &NodeType,
17 config: &Value,
18 context: &mut ExecutionContext,
19 ) -> Result<NodeOutput>;
20}
21
22pub struct NodeRegistry {
23 executors: HashMap<NodeType, Arc<dyn NodeExecutor>>,
24}
25
26impl NodeRegistry {
27 pub fn new() -> Self {
28 let mut registry = Self {
29 executors: HashMap::new(),
30 };
31
32 let trigger_executor = Arc::new(TriggerExecutor);
34 registry.register(NodeType::ManualTrigger, trigger_executor.clone());
35 registry.register(NodeType::WebhookTrigger, trigger_executor.clone());
36 registry.register(NodeType::ScheduleTrigger, trigger_executor);
37
38 registry.register(NodeType::HttpRequest, Arc::new(HttpRequestExecutor));
40 registry.register(NodeType::Print, Arc::new(PrintExecutor));
41 registry.register(NodeType::Agent, Arc::new(AgentExecutor));
42 registry.register(NodeType::Python, Arc::new(PythonExecutor));
43
44 registry
45 }
46
47 pub fn register(&mut self, node_type: NodeType, executor: Arc<dyn NodeExecutor>) {
48 self.executors.insert(node_type, executor);
49 }
50
51 pub fn get(&self, node_type: &NodeType) -> Option<Arc<dyn NodeExecutor>> {
52 self.executors.get(node_type).cloned()
53 }
54}
55
56impl Default for NodeRegistry {
57 fn default() -> Self {
58 Self::new()
59 }
60}
61
62struct HttpRequestExecutor;
63
64#[async_trait]
65impl NodeExecutor for HttpRequestExecutor {
66 async fn execute(
67 &self,
68 _node_type: &NodeType,
69 config: &Value,
70 _context: &mut ExecutionContext,
71 ) -> Result<NodeOutput> {
72 let url = config["url"]
73 .as_str()
74 .ok_or_else(|| anyhow::anyhow!("URL not found in config"))?;
75 let method = config["method"].as_str().unwrap_or("GET");
76
77 let timeout_ms = config["timeout_ms"].as_u64().unwrap_or(30000);
79 let timeout = std::time::Duration::from_millis(timeout_ms);
80
81 let client = reqwest::Client::builder()
83 .timeout(timeout)
84 .build()
85 .map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?;
86
87 let mut request_builder = match method.to_uppercase().as_str() {
89 "GET" => client.get(url),
90 "POST" => client.post(url),
91 "PUT" => client.put(url),
92 "DELETE" => client.delete(url),
93 "PATCH" => client.patch(url),
94 _ => return Err(anyhow::anyhow!("Unsupported HTTP method: {}", method)),
95 };
96
97 if let Some(headers) = config.get("headers").and_then(|h| h.as_object()) {
99 for (key, value) in headers {
100 if let Some(value_str) = value.as_str() {
101 request_builder = request_builder.header(key, value_str);
102 }
103 }
104 }
105
106 if matches!(method.to_uppercase().as_str(), "POST" | "PUT" | "PATCH")
108 && let Some(body) = config.get("body")
109 {
110 if body.is_string() {
111 request_builder = request_builder.body(body.as_str().unwrap().to_string());
113 } else {
114 request_builder = request_builder.json(body);
116 }
117 }
118
119 let response = request_builder
121 .send()
122 .await
123 .map_err(|e| anyhow::anyhow!("HTTP request failed: {}", e))?;
124
125 let status = response.status().as_u16();
127 let headers: HashMap<String, String> = response
128 .headers()
129 .iter()
130 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
131 .collect();
132
133 let body_text = response
135 .text()
136 .await
137 .map_err(|e| anyhow::anyhow!("Failed to read response body: {}", e))?;
138
139 let body = serde_json::from_str::<Value>(&body_text).unwrap_or(Value::String(body_text));
141
142 Ok(NodeOutput::Http(HttpOutput {
143 status,
144 headers,
145 body,
146 }))
147 }
148}
149
150struct PrintExecutor;
151
152#[async_trait]
153impl NodeExecutor for PrintExecutor {
154 async fn execute(
155 &self,
156 _node_type: &NodeType,
157 config: &Value,
158 _context: &mut ExecutionContext,
159 ) -> Result<NodeOutput> {
160 let message = config["message"].as_str().unwrap_or("No message provided");
161 println!("{}", message);
162
163 Ok(NodeOutput::Print(PrintOutput {
164 printed: message.to_string(),
165 }))
166 }
167}
168
169struct AgentExecutor;
170
171#[async_trait]
172impl NodeExecutor for AgentExecutor {
173 async fn execute(
174 &self,
175 _node_type: &NodeType,
176 config: &Value,
177 context: &mut ExecutionContext,
178 ) -> Result<NodeOutput> {
179 use crate::node::agent::AgentNode;
180
181 let agent = AgentNode::from_config(config)?;
182 let input = config["input"].as_str().unwrap_or("Hello");
183
184 let secret_storage = context.secret_storage.as_ref().map(|s| s.as_ref());
185 let response = agent
186 .execute(input, secret_storage)
187 .await
188 .map_err(|e| anyhow::anyhow!("Agent execution failed: {}", e))?;
189
190 Ok(NodeOutput::Agent(AgentOutput { response }))
191 }
192}
193
194struct PythonExecutor;
195
196#[async_trait]
197impl NodeExecutor for PythonExecutor {
198 async fn execute(
199 &self,
200 _node_type: &NodeType,
201 config: &Value,
202 context: &mut ExecutionContext,
203 ) -> Result<NodeOutput> {
204 use crate::node::python::PythonNode;
205
206 let python = PythonNode::from_config(config)?;
207 let script = python.build_script();
208
209 let input = config.get("input").cloned().unwrap_or_else(|| serde_json::json!({}));
211
212 let manager = context
214 .python_manager
215 .as_ref()
216 .ok_or_else(|| anyhow::anyhow!("Python manager not available"))?;
217
218 let mut env_vars = std::collections::HashMap::new();
220 if let Some(secret_storage) = &context.secret_storage {
221 if let Ok(Some(key)) = secret_storage.get_secret("OPENAI_API_KEY") {
223 env_vars.insert("OPENAI_API_KEY".to_string(), key);
224 }
225 if let Ok(Some(key)) = secret_storage.get_secret("ANTHROPIC_API_KEY") {
227 env_vars.insert("ANTHROPIC_API_KEY".to_string(), key);
228 }
229 }
230
231 let result = manager.execute_inline_code(&script, input, env_vars).await?;
232
233 Ok(NodeOutput::Python(PythonOutput { result }))
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use serde_json::json;
241
242 #[test]
243 fn test_registry_creation() {
244 let registry = NodeRegistry::new();
245
246 assert!(registry.get(&NodeType::HttpRequest).is_some());
248 assert!(registry.get(&NodeType::Print).is_some());
249 assert!(registry.get(&NodeType::Agent).is_some());
250 assert!(registry.get(&NodeType::Python).is_some());
251 assert!(registry.get(&NodeType::ManualTrigger).is_some());
252 assert!(registry.get(&NodeType::WebhookTrigger).is_some());
253 assert!(registry.get(&NodeType::ScheduleTrigger).is_some());
254 }
255
256 #[test]
257 fn test_registry_get_nonexistent() {
258 let registry = NodeRegistry::new();
259
260 assert!(registry.get(&NodeType::Print).is_some());
263 }
264
265 #[tokio::test]
266 async fn test_print_executor_basic() {
267 let executor = PrintExecutor;
268 let mut context = ExecutionContext::new("test".to_string());
269
270 let config = json!({
271 "message": "Hello World"
272 });
273
274 let result = executor
275 .execute(&NodeType::Print, &config, &mut context)
276 .await;
277
278 assert!(result.is_ok());
279 if let NodeOutput::Print(output) = result.unwrap() {
280 assert_eq!(output.printed, "Hello World");
281 } else {
282 panic!("Expected Print output");
283 }
284 }
285
286 #[tokio::test]
287 async fn test_print_executor_empty_message() {
288 let executor = PrintExecutor;
289 let mut context = ExecutionContext::new("test".to_string());
290
291 let config = json!({});
292
293 let result = executor
294 .execute(&NodeType::Print, &config, &mut context)
295 .await;
296
297 assert!(result.is_ok());
298 if let NodeOutput::Print(output) = result.unwrap() {
299 assert_eq!(output.printed, "No message provided");
300 } else {
301 panic!("Expected Print output");
302 }
303 }
304
305 #[tokio::test]
306 async fn test_http_executor_missing_url() {
307 let executor = HttpRequestExecutor;
308 let mut context = ExecutionContext::new("test".to_string());
309
310 let config = json!({
311 "method": "GET"
312 });
313
314 let result = executor
315 .execute(&NodeType::HttpRequest, &config, &mut context)
316 .await;
317
318 assert!(result.is_err());
319 assert!(result.unwrap_err().to_string().contains("URL not found"));
320 }
321
322 #[tokio::test]
323 async fn test_http_executor_unsupported_method() {
324 let executor = HttpRequestExecutor;
325 let mut context = ExecutionContext::new("test".to_string());
326
327 let config = json!({
328 "url": "http://example.com",
329 "method": "INVALID"
330 });
331
332 let result = executor
333 .execute(&NodeType::HttpRequest, &config, &mut context)
334 .await;
335
336 assert!(result.is_err());
337 assert!(result
338 .unwrap_err()
339 .to_string()
340 .contains("Unsupported HTTP method"));
341 }
342
343 #[tokio::test]
344 #[ignore = "Requires network access"]
345 async fn test_http_executor_get_request() {
346 let executor = HttpRequestExecutor;
347 let mut context = ExecutionContext::new("test".to_string());
348
349 let config = json!({
350 "url": "https://httpbin.org/get",
351 "method": "GET"
352 });
353
354 let result = executor
355 .execute(&NodeType::HttpRequest, &config, &mut context)
356 .await;
357
358 assert!(result.is_ok());
359 if let NodeOutput::Http(output) = result.unwrap() {
360 assert_eq!(output.status, 200);
361 assert!(output.body.is_object());
362 } else {
363 panic!("Expected Http output");
364 }
365 }
366
367 #[tokio::test]
368 #[ignore = "Requires network access"]
369 async fn test_http_executor_post_with_json() {
370 let executor = HttpRequestExecutor;
371 let mut context = ExecutionContext::new("test".to_string());
372
373 let config = json!({
374 "url": "https://httpbin.org/post",
375 "method": "POST",
376 "body": {"test": "data"},
377 "headers": {"Content-Type": "application/json"}
378 });
379
380 let result = executor
381 .execute(&NodeType::HttpRequest, &config, &mut context)
382 .await;
383
384 assert!(result.is_ok());
385 if let NodeOutput::Http(output) = result.unwrap() {
386 assert_eq!(output.status, 200);
387 } else {
388 panic!("Expected Http output");
389 }
390 }
391
392 #[tokio::test]
393 async fn test_python_executor_without_manager() {
394 let executor = PythonExecutor;
395 let mut context = ExecutionContext::new("test".to_string());
396
397 let config = json!({
398 "code": "print('test')"
399 });
400
401 let result = executor
402 .execute(&NodeType::Python, &config, &mut context)
403 .await;
404
405 assert!(result.is_err());
406 assert!(result
407 .unwrap_err()
408 .to_string()
409 .contains("Python manager not available"));
410 }
411}