diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index ccbf0c107..0fb30a99d 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -5,4 +5,5 @@ use std::result; pub type Result = result::Result; +#[derive(Debug)] pub enum Error { SomeError } diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index aee8cf9ad..152592405 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -91,6 +91,9 @@ struct ActionT { /// ID of the state machine belonging to this event. fsm_id: Uuid, + /// ID of the event that resulted in this action. + event_id: Uuid, + /// Serialised content of the action. action: Value, @@ -158,7 +161,9 @@ pub fn insert_machine(conn: &C, initial: S) -> Result> where } /// Insert a single event into the database and return its UUID. -pub fn insert_event(conn: &C, fsm_id: MachineId, event: S::Event) -> Result +fn insert_event(conn: &C, + fsm_id: &MachineId, + event: &S::Event) -> Result where C: GenericConnection, S: FSM, @@ -177,35 +182,70 @@ where } /// Insert a single action into the database and return its UUID. -pub fn insert_action(conn: &C, - fsm_id: MachineId, - action: S::Action) -> Result where +fn insert_action(conn: &C, + fsm_id: &MachineId, + event_id: Uuid, + action: &S::Action) -> Result where C: GenericConnection, S: FSM, S::Action: Serialize { let query = r#" - INSERT INTO actions (id, created, fsm, fsm_id, action, status) - VALUES ($1, NOW(), $2, $3, $4, $5) + INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status) + VALUES ($1, NOW(), $2, $3, $4, $5, $6) "#; let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); let action_value = serde_json::to_value(action).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, - &ActionStatus::Pending]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id, + &action_value, &ActionStatus::Pending]).expect("TODO"); return Ok(id) } -/// Retrieve the current state of a state machine from the database. -pub fn get_machine(conn: &C, id: MachineId) -> Result where +/// Update the state of a specified machine. +fn update_state(conn: &C, + fsm_id: &MachineId, + state: &S) -> Result<()> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + UPDATE machines SET state = $1 WHERE id = $2 + "#; + + let state_value = serde_json::to_value(state).expect("TODO"); + let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()]) + .expect("TODO"); + + if res_count != 1 { + // TODO: not found error! + unimplemented!() + } else { + Ok(()) + } +} + +/// Retrieve the current state of a state machine from the database, +/// optionally locking the machine state for the duration of some +/// enclosing transaction. +pub fn get_machine(conn: &C, + id: &MachineId, + for_update: bool) -> Result where C: GenericConnection, S: FSM + DeserializeOwned { let query = r#" SELECT (id, created, fsm, state) FROM machines WHERE id = $1 "#; - let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); + // If the machine is being fetched in the context of a + // transaction, with the intention to update it, the relevant + // clause needs to be appended: + let query = match for_update { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + }; + + let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); let mut machines = rows.into_iter().map(|row| MachineT { id: row.get(0), created: row.get(1), @@ -233,9 +273,27 @@ pub fn get_machine(conn: &C, id: MachineId) -> Result where /// processing is finished as running actions may result in additional /// transitions. pub fn advance(conn: &C, - id: MachineId, + id: &MachineId, event: S::Event) -> Result where C: GenericConnection, - S: FSM + DeserializeOwned { - unimplemented!() + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize { + let tx = conn.transaction().expect("TODO"); + let state = get_machine(&tx, id, true).expect("TODO"); + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event(&tx, id, &event).expect("TODO"); + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + for action in actions { + insert_action(&tx, id, event_id, &action).expect("TODO"); + } + + // And finally the state is updated: + update_state(&tx, id, &new_state).expect("TODO"); + Ok(new_state) }