restflow_core/engine/
context.rs

1use crate::python::PythonManager;
2use crate::storage::{SecretStorage, Storage};
3use once_cell::sync::Lazy;
4use regex::Regex;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9use ts_rs::TS;
10
11/// ExecutionContext namespace constants
12/// Provides type-safe key builders to avoid hardcoded strings
13pub mod namespace {
14    /// Trigger-related keys
15    pub mod trigger {
16        /// Trigger input data (webhook payload, manual input, schedule time)
17        pub const PAYLOAD: &str = "trigger.payload";
18    }
19
20    /// Builds node output key: node.{id}
21    pub fn node(id: &str) -> String {
22        format!("node.{}", id)
23    }
24
25    /// Builds variable key: var.{name}
26    pub fn var(name: &str) -> String {
27        format!("var.{}", name)
28    }
29
30    /// Builds config key: config.{name}
31    pub fn config(name: &str) -> String {
32        format!("config.{}", name)
33    }
34}
35
36// Compile regex once at first use, then reuse for performance
37// Pattern: {{variable_name}} or {{node_id.field.subfield}}
38// \{\{  - Match literal {{
39// ([^}]+) - Capture group: one or more non-} characters (the variable path)
40// \}\} - Match literal }}
41static INTERPOLATION_REGEX: Lazy<Regex> =
42    Lazy::new(|| Regex::new(r"\{\{([^}]+)\}\}").expect("Invalid regex"));
43
44#[derive(Debug, Clone, Serialize, Deserialize, TS)]
45#[ts(export)]
46pub struct ExecutionContext {
47    pub workflow_id: String,
48    pub execution_id: String,
49    /// Unified data storage with namespace prefixes:
50    /// - trigger.* : Trigger outputs
51    /// - node.*    : Node outputs
52    /// - var.*     : User variables
53    /// - config.*  : Global configuration
54    #[ts(type = "Record<string, any>")]
55    pub data: HashMap<String, Value>,
56    #[serde(skip)]
57    #[ts(skip)]
58    pub secret_storage: Option<Arc<SecretStorage>>,
59    #[serde(skip)]
60    #[ts(skip)]
61    pub python_manager: Option<Arc<PythonManager>>,
62}
63
64impl ExecutionContext {
65    pub fn new(workflow_id: String) -> Self {
66        Self {
67            workflow_id,
68            execution_id: uuid::Uuid::new_v4().to_string(),
69            data: HashMap::new(),
70            secret_storage: None,
71            python_manager: None,
72        }
73    }
74
75    /// Create execution context with specific execution_id (for test executions)
76    pub fn with_execution_id(workflow_id: String, execution_id: String) -> Self {
77        Self {
78            workflow_id,
79            execution_id,
80            data: HashMap::new(),
81            secret_storage: None,
82            python_manager: None,
83        }
84    }
85
86    pub fn with_secret_storage(mut self, storage: Arc<SecretStorage>) -> Self {
87        self.secret_storage = Some(storage);
88        self
89    }
90
91    pub fn with_python_manager(mut self, manager: Arc<PythonManager>) -> Self {
92        self.python_manager = Some(manager);
93        self
94    }
95
96    pub fn ensure_secret_storage(&mut self, storage: &Storage) {
97        if self.secret_storage.is_none() {
98            self.secret_storage = Some(Arc::new(storage.secrets.clone()));
99        }
100    }
101
102    /// Set data directly (recommended, key must include namespace)
103    pub fn set(&mut self, key: &str, value: Value) {
104        self.data.insert(key.to_string(), value);
105    }
106
107    /// Get data directly (recommended, key must include namespace)
108    pub fn get(&self, key: &str) -> Option<&Value> {
109        self.data.get(key)
110    }
111
112    /// Set variable (convenience method, automatically adds var. prefix)
113    pub fn set_var(&mut self, name: &str, value: Value) {
114        self.set(&namespace::var(name), value);
115    }
116
117    /// Get variable (convenience method, automatically adds var. prefix)
118    pub fn get_var(&self, name: &str) -> Option<&Value> {
119        self.get(&namespace::var(name))
120    }
121
122    /// Set node output (convenience method, automatically adds node. prefix)
123    pub fn set_node(&mut self, id: &str, output: Value) {
124        self.set(&namespace::node(id), output);
125    }
126
127    /// Get node output (convenience method, automatically adds node. prefix)
128    pub fn get_node(&self, id: &str) -> Option<&Value> {
129        self.get(&namespace::node(id))
130    }
131
132    pub fn interpolate_value(&self, value: &Value) -> Value {
133        match value {
134            Value::String(s) => {
135                let mut result = s.clone();
136
137                for cap in INTERPOLATION_REGEX.captures_iter(s) {
138                    if let Some(var_path) = cap.get(1) {
139                        let path = var_path.as_str();
140                        if let Some(replacement) = self.resolve_path(path) {
141                            // For string values, extract the string content without quotes
142                            // For other types (numbers, booleans, objects), use to_string()
143                            let replacement_str = match &replacement {
144                                Value::String(s) => s.clone(),
145                                _ => replacement.to_string(),
146                            };
147                            result = result.replace(&cap[0], &replacement_str);
148                        }
149                    }
150                }
151
152                Value::String(result)
153            }
154            Value::Object(map) => {
155                let mut new_map = serde_json::Map::new();
156                for (k, v) in map {
157                    new_map.insert(k.clone(), self.interpolate_value(v));
158                }
159                Value::Object(new_map)
160            }
161            Value::Array(arr) => {
162                Value::Array(arr.iter().map(|v| self.interpolate_value(v)).collect())
163            }
164            _ => value.clone(),
165        }
166    }
167
168    fn resolve_path(&self, path: &str) -> Option<Value> {
169        let parts: Vec<&str> = path.split('.').collect();
170        if parts.is_empty() {
171            return None;
172        }
173
174        // New logic: explicit namespaces
175        // Supported formats:
176        // - trigger.payload.body → data["trigger.payload"], then navigate to .body
177        // - node.http1.status → data["node.http1"], then navigate to .status
178        // - var.counter → data["var.counter"]
179        // - config.api_key → data["config.api_key"]
180
181        // Try two-level key (namespace.name)
182        if parts.len() >= 2 {
183            let two_level_key = format!("{}.{}", parts[0], parts[1]);
184            if let Some(root) = self.data.get(&two_level_key) {
185                // If there's a deeper path, continue navigation
186                if parts.len() > 2 {
187                    return Self::navigate_nested(root, &parts[2..]);
188                } else {
189                    return Some(root.clone());
190                }
191            }
192        }
193
194        // Try single-level key (direct lookup of full path)
195        self.data.get(path).cloned()
196    }
197
198    /// Navigate nested fields in Value object
199    fn navigate_nested(mut current: &Value, parts: &[&str]) -> Option<Value> {
200        for part in parts {
201            current = current.as_object()?.get(*part)?;
202        }
203        Some(current.clone())
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use serde_json::json;
211
212    #[test]
213    fn test_new_context() {
214        let ctx = ExecutionContext::new("wf-001".to_string());
215        assert_eq!(ctx.workflow_id, "wf-001");
216        assert!(!ctx.execution_id.is_empty());
217        assert!(ctx.data.is_empty());
218    }
219
220    #[test]
221    fn test_with_execution_id() {
222        let ctx = ExecutionContext::with_execution_id("wf-001".to_string(), "exec-001".to_string());
223        assert_eq!(ctx.workflow_id, "wf-001");
224        assert_eq!(ctx.execution_id, "exec-001");
225    }
226
227    #[test]
228    fn test_set_get_direct() {
229        let mut ctx = ExecutionContext::new("wf-001".to_string());
230        ctx.set("custom.key", json!({"value": 42}));
231
232        let result = ctx.get("custom.key");
233        assert!(result.is_some());
234        assert_eq!(result.unwrap()["value"], 42);
235    }
236
237    #[test]
238    fn test_set_get_var() {
239        let mut ctx = ExecutionContext::new("wf-001".to_string());
240        ctx.set_var("counter", json!(10));
241
242        let result = ctx.get_var("counter");
243        assert_eq!(result, Some(&json!(10)));
244
245        // Verify it's stored with var. prefix
246        assert_eq!(ctx.get("var.counter"), Some(&json!(10)));
247    }
248
249    #[test]
250    fn test_set_get_node() {
251        let mut ctx = ExecutionContext::new("wf-001".to_string());
252        ctx.set_node("http1", json!({
253            "status": 200,
254            "body": {"message": "success"}
255        }));
256
257        let result = ctx.get_node("http1");
258        assert!(result.is_some());
259        assert_eq!(result.unwrap()["status"], 200);
260
261        // Verify it's stored with node. prefix
262        assert_eq!(ctx.get("node.http1").unwrap()["status"], 200);
263    }
264
265    #[test]
266    fn test_interpolate_simple_variable() {
267        let mut ctx = ExecutionContext::new("wf-001".to_string());
268        ctx.set_var("name", json!("Alice"));
269
270        let input = json!("Hello {{var.name}}!");
271        let result = ctx.interpolate_value(&input);
272
273        assert_eq!(result, json!("Hello Alice!"));
274    }
275
276    #[test]
277    fn test_interpolate_node_output() {
278        let mut ctx = ExecutionContext::new("wf-001".to_string());
279        ctx.set_node("http1", json!({"status": 200}));
280
281        let input = json!("Status: {{node.http1.status}}");
282        let result = ctx.interpolate_value(&input);
283
284        assert_eq!(result, json!("Status: 200"));
285    }
286
287    #[test]
288    fn test_interpolate_trigger_payload() {
289        let mut ctx = ExecutionContext::new("wf-001".to_string());
290        ctx.set(namespace::trigger::PAYLOAD, json!({
291            "user": "bob",
292            "action": "login"
293        }));
294
295        let input = json!("User {{trigger.payload.user}} performed {{trigger.payload.action}}");
296        let result = ctx.interpolate_value(&input);
297
298        assert_eq!(result, json!("User bob performed login"));
299    }
300
301    #[test]
302    fn test_interpolate_multiple_variables() {
303        let mut ctx = ExecutionContext::new("wf-001".to_string());
304        ctx.set_var("first", json!("John"));
305        ctx.set_var("last", json!("Doe"));
306
307        let input = json!("Name: {{var.first}} {{var.last}}");
308        let result = ctx.interpolate_value(&input);
309
310        assert_eq!(result, json!("Name: John Doe"));
311    }
312
313    #[test]
314    fn test_interpolate_nested_object() {
315        let mut ctx = ExecutionContext::new("wf-001".to_string());
316        ctx.set_var("url", json!("https://api.example.com"));
317
318        let input = json!({
319            "endpoint": "{{var.url}}/users",
320            "method": "GET"
321        });
322        let result = ctx.interpolate_value(&input);
323
324        assert_eq!(result["endpoint"], "https://api.example.com/users");
325        assert_eq!(result["method"], "GET");
326    }
327
328    #[test]
329    fn test_interpolate_array() {
330        let mut ctx = ExecutionContext::new("wf-001".to_string());
331        ctx.set_var("item", json!("apple"));
332
333        let input = json!(["{{var.item}}", "banana", "{{var.item}}"]);
334        let result = ctx.interpolate_value(&input);
335
336        assert_eq!(result[0], "apple");
337        assert_eq!(result[1], "banana");
338        assert_eq!(result[2], "apple");
339    }
340
341    #[test]
342    fn test_interpolate_deeply_nested() {
343        let mut ctx = ExecutionContext::new("wf-001".to_string());
344        ctx.set_node("api", json!({
345            "response": {
346                "data": {
347                    "items": [{"id": 1}, {"id": 2}]
348                }
349            }
350        }));
351
352        let input = json!({
353            "result": {
354                "nested": {
355                    "value": "ID: {{node.api.response}}"
356                }
357            }
358        });
359        let result = ctx.interpolate_value(&input);
360
361        // Should interpolate to JSON string representation
362        let nested = &result["result"]["nested"]["value"];
363        assert!(nested.as_str().unwrap().contains("data"));
364    }
365
366    #[test]
367    fn test_interpolate_non_existent_variable() {
368        let ctx = ExecutionContext::new("wf-001".to_string());
369
370        let input = json!("Value: {{var.nonexistent}}");
371        let result = ctx.interpolate_value(&input);
372
373        // Should remain unchanged
374        assert_eq!(result, json!("Value: {{var.nonexistent}}"));
375    }
376
377    #[test]
378    fn test_interpolate_non_string_values() {
379        let ctx = ExecutionContext::new("wf-001".to_string());
380
381        let input = json!({
382            "number": 42,
383            "boolean": true,
384            "null": null
385        });
386        let result = ctx.interpolate_value(&input);
387
388        assert_eq!(result["number"], 42);
389        assert_eq!(result["boolean"], true);
390        assert_eq!(result["null"], json!(null));
391    }
392
393    #[test]
394    fn test_resolve_path_two_level_key() {
395        let mut ctx = ExecutionContext::new("wf-001".to_string());
396        ctx.set("node.http1", json!({"status": 200}));
397
398        let result = ctx.resolve_path("node.http1");
399        assert_eq!(result, Some(json!({"status": 200})));
400    }
401
402    #[test]
403    fn test_resolve_path_with_nested_navigation() {
404        let mut ctx = ExecutionContext::new("wf-001".to_string());
405        ctx.set("node.http1", json!({
406            "response": {
407                "data": {
408                    "user": {"name": "Alice"}
409                }
410            }
411        }));
412
413        let result = ctx.resolve_path("node.http1.response.data.user.name");
414        assert_eq!(result, Some(json!("Alice")));
415    }
416
417    #[test]
418    fn test_navigate_nested_valid_path() {
419        let value = json!({
420            "level1": {
421                "level2": {
422                    "level3": "deep_value"
423                }
424            }
425        });
426
427        let result = ExecutionContext::navigate_nested(&value, &["level1", "level2", "level3"]);
428        assert_eq!(result, Some(json!("deep_value")));
429    }
430
431    #[test]
432    fn test_navigate_nested_invalid_path() {
433        let value = json!({"a": {"b": "value"}});
434
435        let result = ExecutionContext::navigate_nested(&value, &["a", "nonexistent", "c"]);
436        assert_eq!(result, None);
437    }
438
439    #[test]
440    fn test_interpolation_regex_performance() {
441        let mut ctx = ExecutionContext::new("wf-001".to_string());
442        ctx.set_var("x", json!(1));
443
444        // Test that INTERPOLATION_REGEX is lazily initialized and reused
445        for _ in 0..100 {
446            let input = json!("Value: {{var.x}}");
447            let result = ctx.interpolate_value(&input);
448            assert_eq!(result, json!("Value: 1"));
449        }
450    }
451
452    #[test]
453    fn test_namespace_constants() {
454        assert_eq!(namespace::trigger::PAYLOAD, "trigger.payload");
455        assert_eq!(namespace::node("test"), "node.test");
456        assert_eq!(namespace::var("counter"), "var.counter");
457        assert_eq!(namespace::config("api_key"), "config.api_key");
458    }
459
460    #[test]
461    fn test_complex_workflow_context() {
462        let mut ctx = ExecutionContext::new("wf-001".to_string());
463
464        // Simulate a workflow with trigger, multiple nodes
465        ctx.set(namespace::trigger::PAYLOAD, json!({
466            "webhook": {
467                "body": {"user_id": 123}
468            }
469        }));
470
471        ctx.set_node("fetch_user", json!({
472            "status": 200,
473            "user": {"name": "Bob", "email": "bob@example.com"}
474        }));
475
476        ctx.set_var("notification_template", json!("Hello {{var.username}}!"));
477        ctx.set_var("username", json!("Bob"));
478
479        // Test interpolation across namespaces
480        let template = json!({
481            "to": "{{node.fetch_user.user.email}}",
482            "subject": "Welcome",
483            "body": "User ID: {{trigger.payload.webhook.body.user_id}}, Name: {{node.fetch_user.user.name}}"
484        });
485
486        let result = ctx.interpolate_value(&template);
487        assert_eq!(result["to"], "bob@example.com");
488        assert_eq!(result["body"], "User ID: 123, Name: Bob");
489    }
490}