1use crate::python::PythonManager;
2use crate::storage::{SecretStorage, Storage};
3use once_cell::sync::Lazy;
4use regex::Regex;
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use std::collections::HashMap;
8use std::sync::Arc;
9use ts_rs::TS;
10
11pub mod namespace {
14 pub mod trigger {
16 pub const PAYLOAD: &str = "trigger.payload";
18 }
19
20 pub fn node(id: &str) -> String {
22 format!("node.{}", id)
23 }
24
25 pub fn var(name: &str) -> String {
27 format!("var.{}", name)
28 }
29
30 pub fn config(name: &str) -> String {
32 format!("config.{}", name)
33 }
34}
35
36static INTERPOLATION_REGEX: Lazy<Regex> =
42 Lazy::new(|| Regex::new(r"\{\{([^}]+)\}\}").expect("Invalid regex"));
43
44#[derive(Debug, Clone, Serialize, Deserialize, TS)]
45#[ts(export)]
46pub struct ExecutionContext {
47 pub workflow_id: String,
48 pub execution_id: String,
49 #[ts(type = "Record<string, any>")]
55 pub data: HashMap<String, Value>,
56 #[serde(skip)]
57 #[ts(skip)]
58 pub secret_storage: Option<Arc<SecretStorage>>,
59 #[serde(skip)]
60 #[ts(skip)]
61 pub python_manager: Option<Arc<PythonManager>>,
62}
63
64impl ExecutionContext {
65 pub fn new(workflow_id: String) -> Self {
66 Self {
67 workflow_id,
68 execution_id: uuid::Uuid::new_v4().to_string(),
69 data: HashMap::new(),
70 secret_storage: None,
71 python_manager: None,
72 }
73 }
74
75 pub fn with_execution_id(workflow_id: String, execution_id: String) -> Self {
77 Self {
78 workflow_id,
79 execution_id,
80 data: HashMap::new(),
81 secret_storage: None,
82 python_manager: None,
83 }
84 }
85
86 pub fn with_secret_storage(mut self, storage: Arc<SecretStorage>) -> Self {
87 self.secret_storage = Some(storage);
88 self
89 }
90
91 pub fn with_python_manager(mut self, manager: Arc<PythonManager>) -> Self {
92 self.python_manager = Some(manager);
93 self
94 }
95
96 pub fn ensure_secret_storage(&mut self, storage: &Storage) {
97 if self.secret_storage.is_none() {
98 self.secret_storage = Some(Arc::new(storage.secrets.clone()));
99 }
100 }
101
102 pub fn set(&mut self, key: &str, value: Value) {
104 self.data.insert(key.to_string(), value);
105 }
106
107 pub fn get(&self, key: &str) -> Option<&Value> {
109 self.data.get(key)
110 }
111
112 pub fn set_var(&mut self, name: &str, value: Value) {
114 self.set(&namespace::var(name), value);
115 }
116
117 pub fn get_var(&self, name: &str) -> Option<&Value> {
119 self.get(&namespace::var(name))
120 }
121
122 pub fn set_node(&mut self, id: &str, output: Value) {
124 self.set(&namespace::node(id), output);
125 }
126
127 pub fn get_node(&self, id: &str) -> Option<&Value> {
129 self.get(&namespace::node(id))
130 }
131
132 pub fn interpolate_value(&self, value: &Value) -> Value {
133 match value {
134 Value::String(s) => {
135 let mut result = s.clone();
136
137 for cap in INTERPOLATION_REGEX.captures_iter(s) {
138 if let Some(var_path) = cap.get(1) {
139 let path = var_path.as_str();
140 if let Some(replacement) = self.resolve_path(path) {
141 let replacement_str = match &replacement {
144 Value::String(s) => s.clone(),
145 _ => replacement.to_string(),
146 };
147 result = result.replace(&cap[0], &replacement_str);
148 }
149 }
150 }
151
152 Value::String(result)
153 }
154 Value::Object(map) => {
155 let mut new_map = serde_json::Map::new();
156 for (k, v) in map {
157 new_map.insert(k.clone(), self.interpolate_value(v));
158 }
159 Value::Object(new_map)
160 }
161 Value::Array(arr) => {
162 Value::Array(arr.iter().map(|v| self.interpolate_value(v)).collect())
163 }
164 _ => value.clone(),
165 }
166 }
167
168 fn resolve_path(&self, path: &str) -> Option<Value> {
169 let parts: Vec<&str> = path.split('.').collect();
170 if parts.is_empty() {
171 return None;
172 }
173
174 if parts.len() >= 2 {
183 let two_level_key = format!("{}.{}", parts[0], parts[1]);
184 if let Some(root) = self.data.get(&two_level_key) {
185 if parts.len() > 2 {
187 return Self::navigate_nested(root, &parts[2..]);
188 } else {
189 return Some(root.clone());
190 }
191 }
192 }
193
194 self.data.get(path).cloned()
196 }
197
198 fn navigate_nested(mut current: &Value, parts: &[&str]) -> Option<Value> {
200 for part in parts {
201 current = current.as_object()?.get(*part)?;
202 }
203 Some(current.clone())
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use serde_json::json;
211
212 #[test]
213 fn test_new_context() {
214 let ctx = ExecutionContext::new("wf-001".to_string());
215 assert_eq!(ctx.workflow_id, "wf-001");
216 assert!(!ctx.execution_id.is_empty());
217 assert!(ctx.data.is_empty());
218 }
219
220 #[test]
221 fn test_with_execution_id() {
222 let ctx = ExecutionContext::with_execution_id("wf-001".to_string(), "exec-001".to_string());
223 assert_eq!(ctx.workflow_id, "wf-001");
224 assert_eq!(ctx.execution_id, "exec-001");
225 }
226
227 #[test]
228 fn test_set_get_direct() {
229 let mut ctx = ExecutionContext::new("wf-001".to_string());
230 ctx.set("custom.key", json!({"value": 42}));
231
232 let result = ctx.get("custom.key");
233 assert!(result.is_some());
234 assert_eq!(result.unwrap()["value"], 42);
235 }
236
237 #[test]
238 fn test_set_get_var() {
239 let mut ctx = ExecutionContext::new("wf-001".to_string());
240 ctx.set_var("counter", json!(10));
241
242 let result = ctx.get_var("counter");
243 assert_eq!(result, Some(&json!(10)));
244
245 assert_eq!(ctx.get("var.counter"), Some(&json!(10)));
247 }
248
249 #[test]
250 fn test_set_get_node() {
251 let mut ctx = ExecutionContext::new("wf-001".to_string());
252 ctx.set_node("http1", json!({
253 "status": 200,
254 "body": {"message": "success"}
255 }));
256
257 let result = ctx.get_node("http1");
258 assert!(result.is_some());
259 assert_eq!(result.unwrap()["status"], 200);
260
261 assert_eq!(ctx.get("node.http1").unwrap()["status"], 200);
263 }
264
265 #[test]
266 fn test_interpolate_simple_variable() {
267 let mut ctx = ExecutionContext::new("wf-001".to_string());
268 ctx.set_var("name", json!("Alice"));
269
270 let input = json!("Hello {{var.name}}!");
271 let result = ctx.interpolate_value(&input);
272
273 assert_eq!(result, json!("Hello Alice!"));
274 }
275
276 #[test]
277 fn test_interpolate_node_output() {
278 let mut ctx = ExecutionContext::new("wf-001".to_string());
279 ctx.set_node("http1", json!({"status": 200}));
280
281 let input = json!("Status: {{node.http1.status}}");
282 let result = ctx.interpolate_value(&input);
283
284 assert_eq!(result, json!("Status: 200"));
285 }
286
287 #[test]
288 fn test_interpolate_trigger_payload() {
289 let mut ctx = ExecutionContext::new("wf-001".to_string());
290 ctx.set(namespace::trigger::PAYLOAD, json!({
291 "user": "bob",
292 "action": "login"
293 }));
294
295 let input = json!("User {{trigger.payload.user}} performed {{trigger.payload.action}}");
296 let result = ctx.interpolate_value(&input);
297
298 assert_eq!(result, json!("User bob performed login"));
299 }
300
301 #[test]
302 fn test_interpolate_multiple_variables() {
303 let mut ctx = ExecutionContext::new("wf-001".to_string());
304 ctx.set_var("first", json!("John"));
305 ctx.set_var("last", json!("Doe"));
306
307 let input = json!("Name: {{var.first}} {{var.last}}");
308 let result = ctx.interpolate_value(&input);
309
310 assert_eq!(result, json!("Name: John Doe"));
311 }
312
313 #[test]
314 fn test_interpolate_nested_object() {
315 let mut ctx = ExecutionContext::new("wf-001".to_string());
316 ctx.set_var("url", json!("https://api.example.com"));
317
318 let input = json!({
319 "endpoint": "{{var.url}}/users",
320 "method": "GET"
321 });
322 let result = ctx.interpolate_value(&input);
323
324 assert_eq!(result["endpoint"], "https://api.example.com/users");
325 assert_eq!(result["method"], "GET");
326 }
327
328 #[test]
329 fn test_interpolate_array() {
330 let mut ctx = ExecutionContext::new("wf-001".to_string());
331 ctx.set_var("item", json!("apple"));
332
333 let input = json!(["{{var.item}}", "banana", "{{var.item}}"]);
334 let result = ctx.interpolate_value(&input);
335
336 assert_eq!(result[0], "apple");
337 assert_eq!(result[1], "banana");
338 assert_eq!(result[2], "apple");
339 }
340
341 #[test]
342 fn test_interpolate_deeply_nested() {
343 let mut ctx = ExecutionContext::new("wf-001".to_string());
344 ctx.set_node("api", json!({
345 "response": {
346 "data": {
347 "items": [{"id": 1}, {"id": 2}]
348 }
349 }
350 }));
351
352 let input = json!({
353 "result": {
354 "nested": {
355 "value": "ID: {{node.api.response}}"
356 }
357 }
358 });
359 let result = ctx.interpolate_value(&input);
360
361 let nested = &result["result"]["nested"]["value"];
363 assert!(nested.as_str().unwrap().contains("data"));
364 }
365
366 #[test]
367 fn test_interpolate_non_existent_variable() {
368 let ctx = ExecutionContext::new("wf-001".to_string());
369
370 let input = json!("Value: {{var.nonexistent}}");
371 let result = ctx.interpolate_value(&input);
372
373 assert_eq!(result, json!("Value: {{var.nonexistent}}"));
375 }
376
377 #[test]
378 fn test_interpolate_non_string_values() {
379 let ctx = ExecutionContext::new("wf-001".to_string());
380
381 let input = json!({
382 "number": 42,
383 "boolean": true,
384 "null": null
385 });
386 let result = ctx.interpolate_value(&input);
387
388 assert_eq!(result["number"], 42);
389 assert_eq!(result["boolean"], true);
390 assert_eq!(result["null"], json!(null));
391 }
392
393 #[test]
394 fn test_resolve_path_two_level_key() {
395 let mut ctx = ExecutionContext::new("wf-001".to_string());
396 ctx.set("node.http1", json!({"status": 200}));
397
398 let result = ctx.resolve_path("node.http1");
399 assert_eq!(result, Some(json!({"status": 200})));
400 }
401
402 #[test]
403 fn test_resolve_path_with_nested_navigation() {
404 let mut ctx = ExecutionContext::new("wf-001".to_string());
405 ctx.set("node.http1", json!({
406 "response": {
407 "data": {
408 "user": {"name": "Alice"}
409 }
410 }
411 }));
412
413 let result = ctx.resolve_path("node.http1.response.data.user.name");
414 assert_eq!(result, Some(json!("Alice")));
415 }
416
417 #[test]
418 fn test_navigate_nested_valid_path() {
419 let value = json!({
420 "level1": {
421 "level2": {
422 "level3": "deep_value"
423 }
424 }
425 });
426
427 let result = ExecutionContext::navigate_nested(&value, &["level1", "level2", "level3"]);
428 assert_eq!(result, Some(json!("deep_value")));
429 }
430
431 #[test]
432 fn test_navigate_nested_invalid_path() {
433 let value = json!({"a": {"b": "value"}});
434
435 let result = ExecutionContext::navigate_nested(&value, &["a", "nonexistent", "c"]);
436 assert_eq!(result, None);
437 }
438
439 #[test]
440 fn test_interpolation_regex_performance() {
441 let mut ctx = ExecutionContext::new("wf-001".to_string());
442 ctx.set_var("x", json!(1));
443
444 for _ in 0..100 {
446 let input = json!("Value: {{var.x}}");
447 let result = ctx.interpolate_value(&input);
448 assert_eq!(result, json!("Value: 1"));
449 }
450 }
451
452 #[test]
453 fn test_namespace_constants() {
454 assert_eq!(namespace::trigger::PAYLOAD, "trigger.payload");
455 assert_eq!(namespace::node("test"), "node.test");
456 assert_eq!(namespace::var("counter"), "var.counter");
457 assert_eq!(namespace::config("api_key"), "config.api_key");
458 }
459
460 #[test]
461 fn test_complex_workflow_context() {
462 let mut ctx = ExecutionContext::new("wf-001".to_string());
463
464 ctx.set(namespace::trigger::PAYLOAD, json!({
466 "webhook": {
467 "body": {"user_id": 123}
468 }
469 }));
470
471 ctx.set_node("fetch_user", json!({
472 "status": 200,
473 "user": {"name": "Bob", "email": "bob@example.com"}
474 }));
475
476 ctx.set_var("notification_template", json!("Hello {{var.username}}!"));
477 ctx.set_var("username", json!("Bob"));
478
479 let template = json!({
481 "to": "{{node.fetch_user.user.email}}",
482 "subject": "Welcome",
483 "body": "User ID: {{trigger.payload.webhook.body.user_id}}, Name: {{node.fetch_user.user.name}}"
484 });
485
486 let result = ctx.interpolate_value(&template);
487 assert_eq!(result["to"], "bob@example.com");
488 assert_eq!(result["body"], "User ID: 123, Name: Bob");
489 }
490}