restflow_core/node/
trigger.rs

1use crate::engine::context::{ExecutionContext, namespace};
2use crate::models::{NodeOutput, NodeType, ScheduleOutput, ManualTriggerOutput, WebhookTriggerOutput};
3use crate::node::registry::NodeExecutor;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::collections::HashMap;
8
9/// Unified Trigger node executor
10///
11/// All Trigger types (Webhook, Manual, Schedule) use the same logic:
12/// Read trigger.payload from context and output it for downstream nodes
13pub struct TriggerExecutor;
14
15#[async_trait]
16impl NodeExecutor for TriggerExecutor {
17    async fn execute(
18        &self,
19        node_type: &NodeType,
20        _config: &Value,
21        context: &mut ExecutionContext,
22    ) -> Result<NodeOutput> {
23        let payload = context
24            .get(namespace::trigger::PAYLOAD)
25            .cloned()
26            .ok_or_else(|| anyhow::anyhow!("Trigger payload not found in context"))?;
27
28        match node_type {
29            NodeType::ScheduleTrigger => {
30                // Schedule trigger: extract triggered_at and payload
31                let triggered_at = chrono::Utc::now().timestamp_millis();
32                Ok(NodeOutput::ScheduleTrigger(ScheduleOutput {
33                    triggered_at,
34                    payload,
35                }))
36            }
37            NodeType::ManualTrigger => {
38                // Manual trigger: simple triggered_at + payload
39                let triggered_at = chrono::Utc::now().timestamp_millis();
40                Ok(NodeOutput::ManualTrigger(ManualTriggerOutput {
41                    triggered_at,
42                    payload,
43                }))
44            }
45            NodeType::WebhookTrigger => {
46                // Webhook trigger: extract HTTP request information from context
47                let triggered_at = chrono::Utc::now().timestamp_millis();
48                let method = context
49                    .get("trigger.method")
50                    .and_then(|v| v.as_str())
51                    .unwrap_or("POST")
52                    .to_string();
53
54                let headers = context
55                    .get("trigger.headers")
56                    .and_then(|v| serde_json::from_value::<HashMap<String, String>>(v.clone()).ok())
57                    .unwrap_or_default();
58
59                let query = context
60                    .get("trigger.query")
61                    .and_then(|v| serde_json::from_value::<HashMap<String, String>>(v.clone()).ok())
62                    .unwrap_or_default();
63
64                Ok(NodeOutput::WebhookTrigger(WebhookTriggerOutput {
65                    triggered_at,
66                    method,
67                    headers,
68                    body: payload,
69                    query,
70                }))
71            }
72            _ => Err(anyhow::anyhow!(
73                "TriggerExecutor called with non-trigger node type: {:?}",
74                node_type
75            )),
76        }
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use super::*;
83    use serde_json::json;
84
85    fn create_context_with_payload(payload: Value) -> ExecutionContext {
86        let mut context = ExecutionContext::new("test-exec".to_string());
87        context.set(namespace::trigger::PAYLOAD, payload);
88        context
89    }
90
91    #[tokio::test]
92    async fn test_manual_trigger_execution() {
93        let executor = TriggerExecutor;
94        let node_type = NodeType::ManualTrigger;
95        let config = json!({});
96        let payload = json!({"message": "User triggered workflow"});
97        let mut context = create_context_with_payload(payload.clone());
98
99        let result = executor.execute(&node_type, &config, &mut context).await;
100        assert!(result.is_ok());
101
102        let output = result.unwrap();
103        match output {
104            NodeOutput::ManualTrigger(manual_output) => {
105                assert!(manual_output.triggered_at > 0);
106                assert_eq!(manual_output.payload, payload);
107            }
108            _ => panic!("Expected ManualTriggerOutput"),
109        }
110    }
111
112    #[tokio::test]
113    async fn test_webhook_trigger_execution() {
114        let executor = TriggerExecutor;
115        let node_type = NodeType::WebhookTrigger;
116        let config = json!({});
117        let payload = json!({"data": "Webhook request body"});
118
119        let mut context = create_context_with_payload(payload.clone());
120        context.set("trigger.method", json!("POST"));
121        context.set("trigger.headers", json!({"content-type": "application/json"}));
122        context.set("trigger.query", json!({"key": "value"}));
123
124        let result = executor.execute(&node_type, &config, &mut context).await;
125        assert!(result.is_ok());
126
127        let output = result.unwrap();
128        match output {
129            NodeOutput::WebhookTrigger(webhook_output) => {
130                assert!(webhook_output.triggered_at > 0);
131                assert_eq!(webhook_output.method, "POST");
132                assert_eq!(webhook_output.body, payload);
133                assert_eq!(webhook_output.headers.get("content-type"), Some(&"application/json".to_string()));
134                assert_eq!(webhook_output.query.get("key"), Some(&"value".to_string()));
135            }
136            _ => panic!("Expected WebhookTriggerOutput"),
137        }
138    }
139
140    #[tokio::test]
141    async fn test_webhook_trigger_defaults() {
142        let executor = TriggerExecutor;
143        let node_type = NodeType::WebhookTrigger;
144        let config = json!({});
145        let payload = json!({"data": "test"});
146        let mut context = create_context_with_payload(payload.clone());
147        // Don't set method, headers, query - test defaults
148
149        let result = executor.execute(&node_type, &config, &mut context).await;
150        assert!(result.is_ok());
151
152        let output = result.unwrap();
153        match output {
154            NodeOutput::WebhookTrigger(webhook_output) => {
155                assert_eq!(webhook_output.method, "POST"); // Default
156                assert!(webhook_output.headers.is_empty());
157                assert!(webhook_output.query.is_empty());
158            }
159            _ => panic!("Expected WebhookTriggerOutput"),
160        }
161    }
162
163    #[tokio::test]
164    async fn test_schedule_trigger_execution() {
165        let executor = TriggerExecutor;
166        let node_type = NodeType::ScheduleTrigger;
167        let config = json!({});
168        let payload = json!({"scheduled": true});
169        let mut context = create_context_with_payload(payload.clone());
170
171        let result = executor.execute(&node_type, &config, &mut context).await;
172        assert!(result.is_ok());
173
174        let output = result.unwrap();
175        match output {
176            NodeOutput::ScheduleTrigger(schedule_output) => {
177                assert!(schedule_output.triggered_at > 0);
178                assert_eq!(schedule_output.payload, payload);
179            }
180            _ => panic!("Expected ScheduleOutput"),
181        }
182    }
183
184    #[tokio::test]
185    async fn test_trigger_missing_payload() {
186        let executor = TriggerExecutor;
187        let node_type = NodeType::ManualTrigger;
188        let config = json!({});
189        let mut context = ExecutionContext::new("test-exec".to_string());
190        // Don't set payload
191
192        let result = executor.execute(&node_type, &config, &mut context).await;
193        assert!(result.is_err());
194        assert!(result.unwrap_err().to_string().contains("Trigger payload not found"));
195    }
196
197    #[tokio::test]
198    async fn test_trigger_with_non_trigger_node_type() {
199        let executor = TriggerExecutor;
200        let node_type = NodeType::Agent;
201        let config = json!({});
202        let mut context = create_context_with_payload(json!({}));
203
204        let result = executor.execute(&node_type, &config, &mut context).await;
205        assert!(result.is_err());
206        assert!(result.unwrap_err().to_string().contains("non-trigger node type"));
207    }
208}