restflow_core/node/
trigger.rs1use crate::engine::context::{ExecutionContext, namespace};
2use crate::models::{NodeOutput, NodeType, ScheduleOutput, ManualTriggerOutput, WebhookTriggerOutput};
3use crate::node::registry::NodeExecutor;
4use anyhow::Result;
5use async_trait::async_trait;
6use serde_json::Value;
7use std::collections::HashMap;
8
9pub struct TriggerExecutor;
14
15#[async_trait]
16impl NodeExecutor for TriggerExecutor {
17 async fn execute(
18 &self,
19 node_type: &NodeType,
20 _config: &Value,
21 context: &mut ExecutionContext,
22 ) -> Result<NodeOutput> {
23 let payload = context
24 .get(namespace::trigger::PAYLOAD)
25 .cloned()
26 .ok_or_else(|| anyhow::anyhow!("Trigger payload not found in context"))?;
27
28 match node_type {
29 NodeType::ScheduleTrigger => {
30 let triggered_at = chrono::Utc::now().timestamp_millis();
32 Ok(NodeOutput::ScheduleTrigger(ScheduleOutput {
33 triggered_at,
34 payload,
35 }))
36 }
37 NodeType::ManualTrigger => {
38 let triggered_at = chrono::Utc::now().timestamp_millis();
40 Ok(NodeOutput::ManualTrigger(ManualTriggerOutput {
41 triggered_at,
42 payload,
43 }))
44 }
45 NodeType::WebhookTrigger => {
46 let triggered_at = chrono::Utc::now().timestamp_millis();
48 let method = context
49 .get("trigger.method")
50 .and_then(|v| v.as_str())
51 .unwrap_or("POST")
52 .to_string();
53
54 let headers = context
55 .get("trigger.headers")
56 .and_then(|v| serde_json::from_value::<HashMap<String, String>>(v.clone()).ok())
57 .unwrap_or_default();
58
59 let query = context
60 .get("trigger.query")
61 .and_then(|v| serde_json::from_value::<HashMap<String, String>>(v.clone()).ok())
62 .unwrap_or_default();
63
64 Ok(NodeOutput::WebhookTrigger(WebhookTriggerOutput {
65 triggered_at,
66 method,
67 headers,
68 body: payload,
69 query,
70 }))
71 }
72 _ => Err(anyhow::anyhow!(
73 "TriggerExecutor called with non-trigger node type: {:?}",
74 node_type
75 )),
76 }
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use super::*;
83 use serde_json::json;
84
85 fn create_context_with_payload(payload: Value) -> ExecutionContext {
86 let mut context = ExecutionContext::new("test-exec".to_string());
87 context.set(namespace::trigger::PAYLOAD, payload);
88 context
89 }
90
91 #[tokio::test]
92 async fn test_manual_trigger_execution() {
93 let executor = TriggerExecutor;
94 let node_type = NodeType::ManualTrigger;
95 let config = json!({});
96 let payload = json!({"message": "User triggered workflow"});
97 let mut context = create_context_with_payload(payload.clone());
98
99 let result = executor.execute(&node_type, &config, &mut context).await;
100 assert!(result.is_ok());
101
102 let output = result.unwrap();
103 match output {
104 NodeOutput::ManualTrigger(manual_output) => {
105 assert!(manual_output.triggered_at > 0);
106 assert_eq!(manual_output.payload, payload);
107 }
108 _ => panic!("Expected ManualTriggerOutput"),
109 }
110 }
111
112 #[tokio::test]
113 async fn test_webhook_trigger_execution() {
114 let executor = TriggerExecutor;
115 let node_type = NodeType::WebhookTrigger;
116 let config = json!({});
117 let payload = json!({"data": "Webhook request body"});
118
119 let mut context = create_context_with_payload(payload.clone());
120 context.set("trigger.method", json!("POST"));
121 context.set("trigger.headers", json!({"content-type": "application/json"}));
122 context.set("trigger.query", json!({"key": "value"}));
123
124 let result = executor.execute(&node_type, &config, &mut context).await;
125 assert!(result.is_ok());
126
127 let output = result.unwrap();
128 match output {
129 NodeOutput::WebhookTrigger(webhook_output) => {
130 assert!(webhook_output.triggered_at > 0);
131 assert_eq!(webhook_output.method, "POST");
132 assert_eq!(webhook_output.body, payload);
133 assert_eq!(webhook_output.headers.get("content-type"), Some(&"application/json".to_string()));
134 assert_eq!(webhook_output.query.get("key"), Some(&"value".to_string()));
135 }
136 _ => panic!("Expected WebhookTriggerOutput"),
137 }
138 }
139
140 #[tokio::test]
141 async fn test_webhook_trigger_defaults() {
142 let executor = TriggerExecutor;
143 let node_type = NodeType::WebhookTrigger;
144 let config = json!({});
145 let payload = json!({"data": "test"});
146 let mut context = create_context_with_payload(payload.clone());
147 let result = executor.execute(&node_type, &config, &mut context).await;
150 assert!(result.is_ok());
151
152 let output = result.unwrap();
153 match output {
154 NodeOutput::WebhookTrigger(webhook_output) => {
155 assert_eq!(webhook_output.method, "POST"); assert!(webhook_output.headers.is_empty());
157 assert!(webhook_output.query.is_empty());
158 }
159 _ => panic!("Expected WebhookTriggerOutput"),
160 }
161 }
162
163 #[tokio::test]
164 async fn test_schedule_trigger_execution() {
165 let executor = TriggerExecutor;
166 let node_type = NodeType::ScheduleTrigger;
167 let config = json!({});
168 let payload = json!({"scheduled": true});
169 let mut context = create_context_with_payload(payload.clone());
170
171 let result = executor.execute(&node_type, &config, &mut context).await;
172 assert!(result.is_ok());
173
174 let output = result.unwrap();
175 match output {
176 NodeOutput::ScheduleTrigger(schedule_output) => {
177 assert!(schedule_output.triggered_at > 0);
178 assert_eq!(schedule_output.payload, payload);
179 }
180 _ => panic!("Expected ScheduleOutput"),
181 }
182 }
183
184 #[tokio::test]
185 async fn test_trigger_missing_payload() {
186 let executor = TriggerExecutor;
187 let node_type = NodeType::ManualTrigger;
188 let config = json!({});
189 let mut context = ExecutionContext::new("test-exec".to_string());
190 let result = executor.execute(&node_type, &config, &mut context).await;
193 assert!(result.is_err());
194 assert!(result.unwrap_err().to_string().contains("Trigger payload not found"));
195 }
196
197 #[tokio::test]
198 async fn test_trigger_with_non_trigger_node_type() {
199 let executor = TriggerExecutor;
200 let node_type = NodeType::Agent;
201 let config = json!({});
202 let mut context = create_context_with_payload(json!({}));
203
204 let result = executor.execute(&node_type, &config, &mut context).await;
205 assert!(result.is_err());
206 assert!(result.unwrap_err().to_string().contains("non-trigger node type"));
207 }
208}