restflow_core/models/
task.rs

1use crate::engine::context::ExecutionContext;
2use crate::models::{Node, NodeInput, NodeOutput, Workflow};
3use crate::storage::Storage;
4use anyhow::Result;
5use once_cell::sync::OnceCell;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use ts_rs::TS;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
12#[ts(export)]
13pub enum TaskStatus {
14    Pending,
15    Running,
16    Completed,
17    Failed,
18}
19
20/// Unified task structure that replaces both TaskRecord and WorkflowTask
21#[derive(Debug, Clone, Serialize, Deserialize, TS)]
22#[ts(export)]
23pub struct Task {
24    // Core fields (always persisted)
25    pub id: String,
26    pub execution_id: String,
27    pub workflow_id: String,
28    pub node_id: String,
29    pub status: TaskStatus,
30    pub created_at: i64,
31    pub started_at: Option<i64>,
32    pub completed_at: Option<i64>,
33    pub input: NodeInput,
34    pub output: Option<NodeOutput>,
35    pub error: Option<String>,
36
37    // Execution context (serialized and stored)
38    pub context: ExecutionContext,
39
40    // Runtime data (lazy-loaded, not serialized)
41    #[serde(skip)]
42    #[ts(skip)]
43    node: OnceCell<Node>,
44    #[serde(skip)]
45    #[ts(skip)]
46    workflow: OnceCell<Arc<Workflow>>,
47}
48
49impl Task {
50    /// Create a new task
51    pub fn new(
52        execution_id: String,
53        workflow_id: String,
54        node_id: String,
55        input: NodeInput,
56        context: ExecutionContext,
57    ) -> Self {
58        // Use nanosecond precision to avoid collision in high-concurrency scenarios
59        // Note: Nanosecond precision provides ~10^9 unique values per second, making collision
60        // probability negligible in practice. If absolute uniqueness is required in the future,
61        // consider using (timestamp_nanos, uuid) composite key for the pending queue.
62        let created_at = chrono::Utc::now().timestamp_nanos_opt().unwrap_or_else(|| {
63            // Fallback for year > 2262 (extremely unlikely)
64            chrono::Utc::now().timestamp_millis() * 1_000_000
65        });
66
67        Self {
68            id: Uuid::new_v4().to_string(),
69            execution_id,
70            workflow_id,
71            node_id,
72            status: TaskStatus::Pending,
73            created_at,
74            started_at: None,
75            completed_at: None,
76            input,
77            output: None,
78            error: None,
79            context,
80            node: OnceCell::new(),
81            workflow: OnceCell::new(),
82        }
83    }
84
85    /// Get the node for this task (lazy-loaded)
86    pub fn get_node(&self, storage: &Storage) -> Result<&Node> {
87        self.node.get_or_try_init(|| {
88            let workflow = self.get_workflow(storage)?;
89            workflow
90                .nodes
91                .iter()
92                .find(|n| n.id == self.node_id)
93                .cloned()
94                .ok_or_else(|| anyhow::anyhow!("Node {} not found in workflow", self.node_id))
95        })
96    }
97
98    /// Get the workflow for this task (lazy-loaded and shared)
99    pub fn get_workflow(&self, storage: &Storage) -> Result<Arc<Workflow>> {
100        self.workflow
101            .get_or_try_init(|| Ok(Arc::new(storage.workflows.get_workflow(&self.workflow_id)?)))
102            .cloned()
103    }
104
105    /// Pre-populate the workflow Arc to avoid lazy loading from storage
106    /// This is useful when creating tasks from a workflow that's already in memory
107    pub fn set_workflow(&self, workflow: Arc<Workflow>) -> Result<(), Arc<Workflow>> {
108        self.workflow.set(workflow)
109    }
110
111    /// Mark task as running
112    pub fn start(&mut self) {
113        self.status = TaskStatus::Running;
114        self.started_at = Some(chrono::Utc::now().timestamp_millis());
115    }
116
117    /// Mark task as completed
118    pub fn complete(&mut self, output: NodeOutput) {
119        self.status = TaskStatus::Completed;
120        self.completed_at = Some(chrono::Utc::now().timestamp_millis());
121        self.output = Some(output);
122    }
123
124    /// Mark task as failed
125    pub fn fail(&mut self, error: String) {
126        self.status = TaskStatus::Failed;
127        self.completed_at = Some(chrono::Utc::now().timestamp_millis());
128        self.error = Some(error);
129    }
130
131    /// Create a task for a single node execution (no workflow context)
132    pub fn for_single_node(node: Node, input: NodeInput) -> Self {
133        let execution_id = Uuid::new_v4().to_string();
134        let workflow_id = format!("single-node-{}", node.id);
135        let context = ExecutionContext::new(execution_id.clone());
136
137        let task = Self::new(execution_id, workflow_id, node.id.clone(), input, context);
138
139        // Pre-populate the node since we already have it
140        let _ = task.node.set(node);
141        task
142    }
143
144    /// Get priority for queue ordering (lower timestamp = higher priority)
145    pub fn priority(&self) -> u64 {
146        self.created_at as u64
147    }
148}