restflow_core/
python.rs

1use anyhow::{Result, anyhow};
2use once_cell::sync::OnceCell;
3use serde::{Deserialize, Serialize};
4use serde_json::Value;
5use sha2::{Digest, Sha256};
6use std::collections::HashMap;
7use std::env;
8use std::path::{Path, PathBuf};
9use std::process::Stdio;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::fs;
13use tokio::io::AsyncWriteExt;
14use tokio::process::Command;
15use tracing::info;
16
17#[derive(Debug, Serialize, Deserialize, Clone)]
18pub struct TemplateInfo {
19    pub id: String,
20    pub name: String,
21    pub description: String,
22    pub dependencies: Vec<String>,
23}
24
25#[derive(Debug)]
26pub struct PythonManager {
27    uv_binary: PathBuf,
28    python_dir: PathBuf,
29    venv_dir: PathBuf,
30    scripts_dir: PathBuf,
31    templates_dir: PathBuf,
32    /// Lazy initialization avoids downloading uv at startup if Python is never used
33    initialized: OnceCell<()>,
34}
35
36impl PythonManager {
37    pub async fn new() -> Result<Arc<Self>> {
38        let current_dir = env::current_dir()?;
39        let uv_binary = current_dir
40            .join(".uv")
41            .join(if cfg!(windows) { "uv.exe" } else { "uv" });
42        let python_dir = current_dir.join("python");
43        let venv_dir = python_dir.join(".venv");
44        let scripts_dir = python_dir.join("scripts");
45        let templates_dir = scripts_dir.join("templates");
46
47        let manager = Arc::new(Self {
48            uv_binary,
49            python_dir,
50            venv_dir,
51            scripts_dir,
52            templates_dir,
53            initialized: OnceCell::new(),
54        });
55
56        manager.ensure_initialized().await?;
57        Ok(manager)
58    }
59
60    async fn ensure_initialized(&self) -> Result<()> {
61        if self.initialized.get().is_some() {
62            return Ok(());
63        }
64
65        fs::create_dir_all(&self.python_dir).await?;
66        fs::create_dir_all(&self.scripts_dir).await?;
67        fs::create_dir_all(&self.templates_dir).await?;
68        fs::create_dir_all(self.uv_binary.parent().unwrap()).await?;
69
70        if !self.uv_binary.exists() {
71            self.download_uv().await?;
72        }
73
74        if !self.venv_dir.exists() || !self.python_dir.join("pyproject.toml").exists() {
75            self.setup_environment().await?;
76        }
77
78        self.initialized
79            .set(())
80            .map_err(|_| anyhow!("Failed to mark as initialized"))?;
81        Ok(())
82    }
83
84    /// Downloads uv binary and verifies SHA256 checksum to prevent supply chain attacks.
85    /// Uses a fixed version instead of 'latest' for reproducibility and security.
86    async fn download_uv(&self) -> Result<()> {
87        info!("Downloading uv package manager");
88
89        // Use fixed version for security
90        const UV_VERSION: &str = "0.8.15";
91
92        let filename = match (env::consts::OS, env::consts::ARCH) {
93            ("macos", "aarch64") => "uv-aarch64-apple-darwin.tar.gz",
94            ("macos", "x86_64") => "uv-x86_64-apple-darwin.tar.gz",
95            ("windows", _) => "uv-x86_64-pc-windows-msvc.zip",
96            ("linux", "x86_64") => "uv-x86_64-unknown-linux-gnu.tar.gz",
97            ("linux", "aarch64") => "uv-aarch64-unknown-linux-gnu.tar.gz",
98            _ => return Err(anyhow!("Unsupported platform")),
99        };
100
101        // Generate URLs for both file and checksum
102        let base_url = format!(
103            "https://github.com/astral-sh/uv/releases/download/{}",
104            UV_VERSION
105        );
106        let file_url = format!("{}/{}", base_url, filename);
107        let checksum_url = format!("{}/{}.sha256", base_url, filename);
108
109        info!(url = %file_url, "Downloading uv binary");
110
111        // Download the file
112        let response = reqwest::get(&file_url).await?;
113        if !response.status().is_success() {
114            return Err(anyhow!("Failed to download uv: HTTP {}", response.status()));
115        }
116        let bytes = response.bytes().await?;
117
118        // Download the checksum
119        info!("Downloading checksum for verification");
120        let checksum_response = reqwest::get(&checksum_url).await?;
121        if !checksum_response.status().is_success() {
122            return Err(anyhow!(
123                "Failed to download checksum: HTTP {}",
124                checksum_response.status()
125            ));
126        }
127        let checksum_text = checksum_response.text().await?;
128
129        // Parse the expected checksum (format: "HASH  filename" or "HASH *filename")
130        let expected_checksum = checksum_text
131            .split_whitespace()
132            .next()
133            .ok_or_else(|| anyhow!("Invalid checksum format"))?
134            .to_lowercase();
135
136        // Calculate actual checksum
137        let mut hasher = Sha256::new();
138        hasher.update(&bytes);
139        let actual_checksum = format!("{:x}", hasher.finalize());
140
141        // Verify checksum
142        if actual_checksum != expected_checksum {
143            return Err(anyhow!(
144                "Checksum verification failed!\nExpected: {}\nActual: {}",
145                expected_checksum,
146                actual_checksum
147            ));
148        }
149
150        info!("Checksum verified successfully");
151
152        let temp_dir = tempfile::tempdir()?;
153        let archive_path = temp_dir.path().join(if filename.ends_with(".zip") {
154            "uv.zip"
155        } else {
156            "uv.tar.gz"
157        });
158
159        fs::write(&archive_path, &bytes).await?;
160
161        // Extract the binary
162        if archive_path.extension().and_then(|s| s.to_str()) == Some("zip") {
163            self.extract_zip(&archive_path).await?;
164        } else {
165            self.extract_tar_gz(&archive_path).await?;
166        }
167
168        #[cfg(unix)]
169        {
170            use std::os::unix::fs::PermissionsExt;
171            let mut perms = fs::metadata(&self.uv_binary).await?.permissions();
172            perms.set_mode(0o755); // Make binary executable (rwxr-xr-x)
173            fs::set_permissions(&self.uv_binary, perms).await?;
174        }
175
176        info!("uv installed successfully");
177        Ok(())
178    }
179
180    /// Extract tar.gz archive
181    async fn extract_tar_gz(&self, archive_path: &Path) -> Result<()> {
182        use flate2::read::GzDecoder;
183        use std::fs::File;
184        use tar::Archive;
185
186        let file = File::open(archive_path)?;
187        let gz = GzDecoder::new(file);
188        let mut archive = Archive::new(gz);
189
190        for entry in archive.entries()? {
191            let mut entry = entry?;
192            let path = entry.path()?;
193            if path.file_name() == Some(std::ffi::OsStr::new("uv")) {
194                entry.unpack(&self.uv_binary)?;
195                return Ok(());
196            }
197        }
198
199        Err(anyhow!("uv binary not found in archive"))
200    }
201
202    /// Extract zip archive (Windows)
203    async fn extract_zip(&self, archive_path: &Path) -> Result<()> {
204        use std::fs::File;
205        use std::io::copy;
206        use zip::ZipArchive;
207
208        let file = File::open(archive_path)?;
209        let mut archive = ZipArchive::new(file)?;
210
211        for i in 0..archive.len() {
212            let mut file = archive.by_index(i)?;
213            if file.name().ends_with("uv.exe") {
214                let mut out_file = File::create(&self.uv_binary)?;
215                copy(&mut file, &mut out_file)?;
216                return Ok(());
217            }
218        }
219
220        Err(anyhow!("uv.exe not found in archive"))
221    }
222
223    async fn setup_environment(&self) -> Result<()> {
224        info!("Setting up Python environment");
225
226        Command::new(&self.uv_binary)
227            .args(["python", "install", "3.12"])
228            .output()
229            .await?;
230
231        if !self.venv_dir.exists() {
232            Command::new(&self.uv_binary)
233                .current_dir(&self.python_dir)
234                .args(["venv"])
235                .output()
236                .await?;
237        }
238
239        // Sync dependencies (requires pyproject.toml to exist)
240        if self.python_dir.join("pyproject.toml").exists() {
241            Command::new(&self.uv_binary)
242                .current_dir(&self.python_dir)
243                .args(["sync"])
244                .output()
245                .await?;
246        }
247
248        info!("Python environment ready");
249        Ok(())
250    }
251
252    /// Executes a Python script in an isolated subprocess with JSON I/O.
253    /// Scripts receive input via stdin and must output valid JSON to stdout.
254    pub async fn execute_script(&self, script_name: &str, input: Value) -> Result<Value> {
255        self.ensure_initialized().await?;
256
257        // Basic validation for internal use
258        if script_name.is_empty() || script_name.contains(['/', '\\', '.']) {
259            return Err(anyhow!("Invalid script name"));
260        }
261
262        let script_path = self.scripts_dir.join(format!("{}.py", script_name));
263        if !script_path.exists() {
264            return Err(anyhow!("Script not found: {}", script_name));
265        }
266
267        // Use uv run for execution
268        let mut cmd = Command::new(&self.uv_binary);
269        cmd.current_dir(&self.python_dir)
270            .args(["run", "python", script_path.to_str().unwrap()])
271            .stdin(Stdio::piped())
272            .stdout(Stdio::piped())
273            .stderr(Stdio::piped())
274            .kill_on_drop(true);
275
276        let mut child = cmd.spawn()?;
277
278        // Pass input via stdin
279        if let Some(mut stdin) = child.stdin.take() {
280            let input_json = serde_json::to_string(&input)?;
281            stdin.write_all(input_json.as_bytes()).await?;
282            stdin.flush().await?;
283            drop(stdin);
284        }
285
286        // 30 second timeout prevents hanging scripts from blocking workflow execution
287        let output = tokio::time::timeout(Duration::from_secs(30), child.wait_with_output())
288            .await
289            .map_err(|_| anyhow!("Script execution timeout"))??;
290
291        if !output.status.success() {
292            let stderr = String::from_utf8_lossy(&output.stderr);
293            return Err(anyhow!("Script failed: {}", stderr));
294        }
295
296        let output_str = String::from_utf8(output.stdout)?;
297        serde_json::from_str(&output_str).map_err(|e| anyhow!("Failed to parse output: {}", e))
298    }
299
300    pub async fn save_script(&self, name: &str, content: &str) -> Result<PathBuf> {
301        self.ensure_initialized().await?;
302
303        if name.is_empty() || name.contains(['/', '\\', '.']) {
304            return Err(anyhow!("Invalid script name"));
305        }
306
307        let script_path = self.scripts_dir.join(format!("{}.py", name));
308        fs::write(&script_path, content).await?;
309        Ok(script_path)
310    }
311
312    pub async fn list_scripts(&self) -> Result<Vec<String>> {
313        self.ensure_initialized().await?;
314
315        let mut scripts = Vec::new();
316        if self.scripts_dir.exists() {
317            let mut entries = fs::read_dir(&self.scripts_dir).await?;
318            while let Some(entry) = entries.next_entry().await? {
319                let path = entry.path();
320                if path.extension().and_then(|s| s.to_str()) == Some("py")
321                    && let Some(name) = path.file_stem().and_then(|s| s.to_str())
322                {
323                    scripts.push(name.to_string());
324                }
325            }
326        }
327        Ok(scripts)
328    }
329
330    pub fn is_ready(&self) -> bool {
331        self.initialized.get().is_some()
332    }
333
334    /// Execute inline Python code with PEP 723 dependencies using uv run
335    pub async fn execute_inline_code(
336        &self,
337        code: &str,
338        input: Value,
339        env_vars: HashMap<String, String>,
340    ) -> Result<Value> {
341        self.ensure_initialized().await?;
342
343        // Create temporary script file
344        let temp_dir = tempfile::tempdir()?;
345        let script_path = temp_dir.path().join("script.py");
346        fs::write(&script_path, code).await?;
347
348        // Use uv run --no-project to execute (automatically handles PEP 723 dependencies)
349        // --refresh ensures dependency changes are detected and installed
350        let mut cmd = Command::new(&self.uv_binary);
351        cmd.args([
352            "run",
353            "--no-project",
354            "--refresh",
355            script_path.to_str().unwrap(),
356        ])
357        .stdin(Stdio::piped())
358        .stdout(Stdio::piped())
359        .stderr(Stdio::piped())
360        .kill_on_drop(true);
361
362        // Inject environment variables (e.g., API keys from Secret Manager)
363        for (key, value) in env_vars {
364            cmd.env(key, value);
365        }
366
367        let mut child = cmd.spawn()?;
368
369        // Pass input via stdin
370        if let Some(mut stdin) = child.stdin.take() {
371            let input_json = serde_json::to_string(&input)?;
372            stdin.write_all(input_json.as_bytes()).await?;
373            stdin.flush().await?;
374            drop(stdin);
375        }
376
377        // 30 second timeout for inline code execution
378        let output = tokio::time::timeout(Duration::from_secs(30), child.wait_with_output())
379            .await
380            .map_err(|_| anyhow!("Script execution timeout"))??;
381
382        if !output.status.success() {
383            let stderr = String::from_utf8_lossy(&output.stderr);
384            return Err(anyhow!("Script execution failed: {}", stderr));
385        }
386
387        let output_str = String::from_utf8(output.stdout)?;
388        serde_json::from_str(&output_str)
389            .map_err(|e| anyhow!("Failed to parse output as JSON: {}", e))
390    }
391
392    /// List all available Python script templates by scanning .py files
393    pub async fn list_templates(&self) -> Result<Vec<TemplateInfo>> {
394        self.ensure_initialized().await?;
395
396        let mut templates = Vec::new();
397
398        if !self.templates_dir.exists() {
399            return Ok(templates);
400        }
401
402        let mut entries = fs::read_dir(&self.templates_dir).await?;
403        while let Some(entry) = entries.next_entry().await? {
404            let path = entry.path();
405
406            // Only process .py files
407            if path.extension().and_then(|s| s.to_str()) != Some("py") {
408                continue;
409            }
410
411            let file_stem = path.file_stem().and_then(|s| s.to_str());
412            if file_stem.is_none() {
413                continue;
414            }
415
416            let content = fs::read_to_string(&path).await?;
417
418            // Parse metadata block: # /// metadata ... # ///
419            let metadata = Self::parse_metadata_block(&content)?;
420            let dependencies = Self::parse_dependencies_block(&content)?;
421
422            templates.push(TemplateInfo {
423                id: file_stem.unwrap().to_string(),
424                name: metadata.get("name").cloned().unwrap_or_default(),
425                description: metadata.get("description").cloned().unwrap_or_default(),
426                dependencies,
427            });
428        }
429
430        // Sort alphabetically by id (filename)
431        templates.sort_by(|a, b| a.id.cmp(&b.id));
432
433        Ok(templates)
434    }
435
436    /// Parse metadata block from Python file
437    fn parse_metadata_block(content: &str) -> Result<HashMap<String, String>> {
438        // Match the entire metadata block: # /// metadata ... # ///
439        let re = regex::Regex::new(r"(?s)# /// metadata\s*\n(.*?)# ///")
440            .map_err(|e| anyhow!("Invalid metadata regex: {}", e))?;
441
442        if let Some(caps) = re.captures(content) {
443            let block_content = caps.get(1).unwrap().as_str();
444
445            // Remove '# ' prefix from each line and join
446            let json_str: String = block_content
447                .lines()
448                .map(|line| line.trim_start_matches('#').trim())
449                .collect::<Vec<_>>()
450                .join("\n");
451
452            let metadata: HashMap<String, Value> = serde_json::from_str(&json_str)?;
453
454            // Convert to HashMap<String, String>
455            let mut result = HashMap::new();
456            for (key, value) in metadata {
457                if let Some(s) = value.as_str() {
458                    result.insert(key, s.to_string());
459                }
460            }
461            Ok(result)
462        } else {
463            Ok(HashMap::new())
464        }
465    }
466
467    /// Parse dependencies block from Python file (PEP 723 format)
468    fn parse_dependencies_block(content: &str) -> Result<Vec<String>> {
469        // Match the entire script block: # /// script ... # ///
470        let re = regex::Regex::new(r"(?s)# /// script\s*\n(.*?)# ///")
471            .map_err(|e| anyhow!("Invalid dependencies regex: {}", e))?;
472
473        if let Some(caps) = re.captures(content) {
474            let block_content = caps.get(1).unwrap().as_str();
475
476            // Remove '# ' prefix from each line
477            let clean_content: String = block_content
478                .lines()
479                .map(|line| line.trim_start_matches('#').trim())
480                .collect::<Vec<_>>()
481                .join("\n");
482
483            // Extract dependencies array using regex ((?s) enables DOTALL mode to match newlines)
484            let dep_re = regex::Regex::new(r"(?s)dependencies\s*=\s*\[(.*?)\]")
485                .map_err(|e| anyhow!("Invalid dependency array regex: {}", e))?;
486
487            if let Some(dep_caps) = dep_re.captures(&clean_content) {
488                let deps_str = dep_caps.get(1).unwrap().as_str();
489
490                // Parse each dependency (quoted strings)
491                let deps: Vec<String> = deps_str
492                    .split(',')
493                    .filter_map(|dep| {
494                        let trimmed = dep.trim().trim_matches('"').trim();
495                        if trimmed.is_empty() {
496                            None
497                        } else {
498                            Some(trimmed.to_string())
499                        }
500                    })
501                    .collect();
502
503                return Ok(deps);
504            }
505        }
506
507        Ok(Vec::new())
508    }
509
510    /// Get a specific template by ID
511    pub async fn get_template(&self, template_id: &str) -> Result<HashMap<String, String>> {
512        self.ensure_initialized().await?;
513
514        // Validate template ID (alphanumeric, underscore, and hyphen only)
515        if !template_id
516            .chars()
517            .all(|c| c.is_alphanumeric() || c == '_' || c == '-')
518        {
519            return Err(anyhow!("Invalid template ID"));
520        }
521
522        // Build template file path
523        let template_path = self.templates_dir.join(format!("{}.py", template_id));
524        if !template_path.exists() {
525            return Err(anyhow!("Template not found: {}", template_id));
526        }
527
528        // Read file content
529        let content = fs::read_to_string(&template_path).await?;
530
531        // Parse metadata and dependencies
532        let metadata = Self::parse_metadata_block(&content)?;
533        let dependencies = Self::parse_dependencies_block(&content)?;
534
535        // Remove metadata blocks from content (user only sees actual code)
536        let clean_content = Self::strip_metadata_blocks(&content);
537
538        // Return template info
539        let mut result = HashMap::new();
540        result.insert("id".to_string(), template_id.to_string());
541        result.insert(
542            "name".to_string(),
543            metadata.get("name").cloned().unwrap_or_default(),
544        );
545        result.insert(
546            "description".to_string(),
547            metadata.get("description").cloned().unwrap_or_default(),
548        );
549        result.insert("content".to_string(), clean_content);
550        result.insert(
551            "dependencies".to_string(),
552            serde_json::to_string(&dependencies)?,
553        );
554
555        Ok(result)
556    }
557
558    /// Remove metadata and script blocks from Python code
559    fn strip_metadata_blocks(content: &str) -> String {
560        let mut result = content.to_string();
561
562        // Remove # /// script ... # ///
563        if let Ok(re) = regex::Regex::new(r"(?s)# /// script\s*\n.*?# ///\s*\n+") {
564            result = re.replace_all(&result, "").to_string();
565        }
566
567        // Remove # /// metadata ... # ///
568        if let Ok(re) = regex::Regex::new(r"(?s)# /// metadata\s*\n.*?# ///\s*\n+") {
569            result = re.replace_all(&result, "").to_string();
570        }
571
572        result.trim_start().to_string()
573    }
574
575    /// Create a mock PythonManager for testing without initialization.
576    /// This avoids downloading uv binary or creating filesystem structures.
577    #[cfg(test)]
578    pub fn new_mock() -> Arc<Self> {
579        Arc::new(Self {
580            uv_binary: PathBuf::from("/tmp/test-uv"),
581            python_dir: PathBuf::from("/tmp/python"),
582            venv_dir: PathBuf::from("/tmp/.venv"),
583            scripts_dir: PathBuf::from("/tmp/scripts"),
584            templates_dir: PathBuf::from("/tmp/templates"),
585            initialized: OnceCell::new(),
586        })
587    }
588}