restflow_core/models/
task.rs1use crate::engine::context::ExecutionContext;
2use crate::models::{Node, NodeInput, NodeOutput, Workflow};
3use crate::storage::Storage;
4use anyhow::Result;
5use once_cell::sync::OnceCell;
6use serde::{Deserialize, Serialize};
7use std::sync::Arc;
8use ts_rs::TS;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)]
12#[ts(export)]
13pub enum TaskStatus {
14 Pending,
15 Running,
16 Completed,
17 Failed,
18}
19
20#[derive(Debug, Clone, Serialize, Deserialize, TS)]
22#[ts(export)]
23pub struct Task {
24 pub id: String,
26 pub execution_id: String,
27 pub workflow_id: String,
28 pub node_id: String,
29 pub status: TaskStatus,
30 pub created_at: i64,
31 pub started_at: Option<i64>,
32 pub completed_at: Option<i64>,
33 pub input: NodeInput,
34 pub output: Option<NodeOutput>,
35 pub error: Option<String>,
36
37 pub context: ExecutionContext,
39
40 #[serde(skip)]
42 #[ts(skip)]
43 node: OnceCell<Node>,
44 #[serde(skip)]
45 #[ts(skip)]
46 workflow: OnceCell<Arc<Workflow>>,
47}
48
49impl Task {
50 pub fn new(
52 execution_id: String,
53 workflow_id: String,
54 node_id: String,
55 input: NodeInput,
56 context: ExecutionContext,
57 ) -> Self {
58 let created_at = chrono::Utc::now().timestamp_nanos_opt().unwrap_or_else(|| {
63 chrono::Utc::now().timestamp_millis() * 1_000_000
65 });
66
67 Self {
68 id: Uuid::new_v4().to_string(),
69 execution_id,
70 workflow_id,
71 node_id,
72 status: TaskStatus::Pending,
73 created_at,
74 started_at: None,
75 completed_at: None,
76 input,
77 output: None,
78 error: None,
79 context,
80 node: OnceCell::new(),
81 workflow: OnceCell::new(),
82 }
83 }
84
85 pub fn get_node(&self, storage: &Storage) -> Result<&Node> {
87 self.node.get_or_try_init(|| {
88 let workflow = self.get_workflow(storage)?;
89 workflow
90 .nodes
91 .iter()
92 .find(|n| n.id == self.node_id)
93 .cloned()
94 .ok_or_else(|| anyhow::anyhow!("Node {} not found in workflow", self.node_id))
95 })
96 }
97
98 pub fn get_workflow(&self, storage: &Storage) -> Result<Arc<Workflow>> {
100 self.workflow
101 .get_or_try_init(|| Ok(Arc::new(storage.workflows.get_workflow(&self.workflow_id)?)))
102 .cloned()
103 }
104
105 pub fn set_workflow(&self, workflow: Arc<Workflow>) -> Result<(), Arc<Workflow>> {
108 self.workflow.set(workflow)
109 }
110
111 pub fn start(&mut self) {
113 self.status = TaskStatus::Running;
114 self.started_at = Some(chrono::Utc::now().timestamp_millis());
115 }
116
117 pub fn complete(&mut self, output: NodeOutput) {
119 self.status = TaskStatus::Completed;
120 self.completed_at = Some(chrono::Utc::now().timestamp_millis());
121 self.output = Some(output);
122 }
123
124 pub fn fail(&mut self, error: String) {
126 self.status = TaskStatus::Failed;
127 self.completed_at = Some(chrono::Utc::now().timestamp_millis());
128 self.error = Some(error);
129 }
130
131 pub fn for_single_node(node: Node, input: NodeInput) -> Self {
133 let execution_id = Uuid::new_v4().to_string();
134 let workflow_id = format!("single-node-{}", node.id);
135 let context = ExecutionContext::new(execution_id.clone());
136
137 let task = Self::new(execution_id, workflow_id, node.id.clone(), input, context);
138
139 let _ = task.node.set(node);
141 task
142 }
143
144 pub fn priority(&self) -> u64 {
146 self.created_at as u64
147 }
148}