diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 5ed9693bc..af8013142 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -22,6 +22,7 @@ pub use error::{Result, Error}; use chrono::prelude::{DateTime, Utc}; use finito::FSM; use postgres::GenericConnection; +use postgres::transaction::Transaction; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; @@ -69,7 +70,7 @@ struct EventT { } /// This enum represents the possible statuses an action can be in. -#[derive(Debug, ToSql, FromSql)] +#[derive(Debug, PartialEq, ToSql, FromSql)] #[postgres(name = "actionstatus")] enum ActionStatus { /// The action was requested but has not run yet. @@ -234,6 +235,15 @@ fn update_state(conn: &C, } } +/// Conditionally alter SQL statement to append locking clause inside +/// of a transaction. +fn alter_for_update(alter: bool, query: &str) -> String { + match alter { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + } +} + /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. @@ -242,34 +252,66 @@ pub fn get_machine(conn: &C, for_update: bool) -> Result where C: GenericConnection, S: FSM + DeserializeOwned { - let query = r#" - SELECT id, created, fsm, state FROM machines WHERE id = $1 - "#; - - // If the machine is being fetched in the context of a - // transaction, with the intention to update it, the relevant - // clause needs to be appended: - let query = match for_update { - false => query.to_string(), - true => format!("{} FOR UPDATE", query), - }; + let query = alter_for_update(for_update, r#" + SELECT state FROM machines WHERE id = $1 + "#); let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); - let mut machines = rows.into_iter().map(|row| MachineT { - id: id.to_uuid(), - created: row.get(1), - fsm: row.get(2), - state: row.get(3), - }); - if let Some(machine) = machines.next() { - Ok(serde_json::from_value(machine.state).expect("TODO")) + if let Some(row) = rows.into_iter().next() { + Ok(serde_json::from_value(row.get(0)).expect("TODO")) } else { // TODO: return appropriate not found error Err(Error::SomeError) } } +/// Retrieve an action from the database, optionally locking it for +/// the duration of some enclosing transaction. +fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where + C: GenericConnection, + S: FSM, + S::Action: DeserializeOwned { + let query = alter_for_update(true, r#" + SELECT status, content FROM actions + WHERE id = $1 AND fsm = $2 + "#); + + let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO"); + + if let Some(row) = rows.into_iter().next() { + let action = serde_json::from_value(row.get(1)).expect("TODO"); + Ok((row.get(0), action)) + } else { + // TODO: return appropriate not found error + Err(Error::SomeError) + } +} + +/// Update the status of an action after an attempt to run it. +fn update_action_status(conn: &C, + id: Uuid, + status: ActionStatus, + error: Option, + _fsm: PhantomData) -> Result<()> where + C: GenericConnection, + S: FSM { + let query = r#" + UPDATE actions SET status = $1, error = $2 + WHERE id = $3 AND fsm = $4 + "#; + + let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME]) + .expect("TODO"); + + if result != 1 { + // TODO: Fail in the most gruesome way! + unimplemented!() + } + + Ok(()) +} + /// Advance a persisted state machine by applying an event, and /// storing the event as well as all resulting actions. /// @@ -287,24 +329,84 @@ pub fn advance(conn: &C, C: GenericConnection, S: FSM + Serialize + DeserializeOwned, S::Event: Serialize, - S::Action: Serialize { + S::Action: Serialize + DeserializeOwned { let tx = conn.transaction().expect("TODO"); - let state = get_machine(&tx, id, true).expect("TODO"); + let state = get_machine(&tx, id, true)?; // Advancing the FSM consumes the event, so it is persisted first: - let event_id = insert_event(&tx, id, &event).expect("TODO"); + let event_id = insert_event(&tx, id, &event)?; // Core advancing logic is run: let (new_state, actions) = finito::advance(state, event); // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; for action in actions { - insert_action(&tx, id, event_id, &action).expect("TODO"); + let action_id = insert_action(&tx, id, event_id, &action)?; + action_ids.push(action_id); } // And finally the state is updated: update_state(&tx, id, &new_state).expect("TODO"); tx.commit().expect("TODO"); + run_actions(conn, id, action_ids); + Ok(new_state) } + +/// Execute a single action in case it is pending or retryable. Holds +/// a lock on the action's database row while performing the action +/// and writes back the status afterwards. +/// +/// Should the execution of an action fail cleanly (i.e. without a +/// panic), the error will be persisted. Should it fail by panicking +/// (which developers should never do explicitly in action +/// interpreters) its status will not be changed. +fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) + -> Result> where + S: FSM, + S::Action: DeserializeOwned { + let (status, action) = get_action::(&tx, id)?; + + let result = match status { + ActionStatus::Pending => { + let events = ::act(action); + update_action_status( + &tx, id, ActionStatus::Completed, None, PhantomData:: + )?; + + events + }, + + _ => { + // TODO: Currently only pending actions are run because + // retryable actions are not yet implemented. + vec![] + }, + }; + + tx.commit().expect("TODO"); + Ok(result) +} + +/// Execute several actions at the same time, each in a separate +/// thread. Note that actions returning further events, causing +/// further transitions, returning further actions and so on will +/// potentially cause multiple threads to get created. +fn run_actions(conn: &C, fsm_id: &MachineId, action_ids: Vec) where + C: GenericConnection, + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize + DeserializeOwned { + for action_id in action_ids { + let tx = conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, PhantomData::) { + for event in events { + advance(conn, fsm_id, event).expect("TODO"); + } + } + } +}