feat(postgres): Implement Postgres-backed 'advance' function

Transactionally updates a state machine with an incoming event. Note
that this does not yet interpret actions.
This commit is contained in:
Vincent Ambo 2018-09-26 17:28:45 +02:00
parent b1e00ff026
commit 40caa5ffa2
2 changed files with 73 additions and 14 deletions

View file

@ -5,4 +5,5 @@ use std::result;
pub type Result<T> = result::Result<T, Error>; pub type Result<T> = result::Result<T, Error>;
#[derive(Debug)]
pub enum Error { SomeError } pub enum Error { SomeError }

View file

@ -91,6 +91,9 @@ struct ActionT {
/// ID of the state machine belonging to this event. /// ID of the state machine belonging to this event.
fsm_id: Uuid, fsm_id: Uuid,
/// ID of the event that resulted in this action.
event_id: Uuid,
/// Serialised content of the action. /// Serialised content of the action.
action: Value, action: Value,
@ -158,7 +161,9 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
} }
/// Insert a single event into the database and return its UUID. /// Insert a single event into the database and return its UUID.
pub fn insert_event<C, S>(conn: &C, fsm_id: MachineId<S>, event: S::Event) -> Result<Uuid> fn insert_event<C, S>(conn: &C,
fsm_id: &MachineId<S>,
event: &S::Event) -> Result<Uuid>
where where
C: GenericConnection, C: GenericConnection,
S: FSM, S: FSM,
@ -177,35 +182,70 @@ where
} }
/// Insert a single action into the database and return its UUID. /// Insert a single action into the database and return its UUID.
pub fn insert_action<C, S>(conn: &C, fn insert_action<C, S>(conn: &C,
fsm_id: MachineId<S>, fsm_id: &MachineId<S>,
action: S::Action) -> Result<Uuid> where event_id: Uuid,
action: &S::Action) -> Result<Uuid> where
C: GenericConnection, C: GenericConnection,
S: FSM, S: FSM,
S::Action: Serialize { S::Action: Serialize {
let query = r#" let query = r#"
INSERT INTO actions (id, created, fsm, fsm_id, action, status) INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status)
VALUES ($1, NOW(), $2, $3, $4, $5) VALUES ($1, NOW(), $2, $3, $4, $5, $6)
"#; "#;
let id = Uuid::new_v4(); let id = Uuid::new_v4();
let fsm = S::FSM_NAME.to_string(); let fsm = S::FSM_NAME.to_string();
let action_value = serde_json::to_value(action).expect("TODO"); let action_value = serde_json::to_value(action).expect("TODO");
conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id,
&ActionStatus::Pending]).expect("TODO"); &action_value, &ActionStatus::Pending]).expect("TODO");
return Ok(id) return Ok(id)
} }
/// Retrieve the current state of a state machine from the database. /// Update the state of a specified machine.
pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where fn update_state<C, S>(conn: &C,
fsm_id: &MachineId<S>,
state: &S) -> Result<()> where
C: GenericConnection,
S: FSM + Serialize {
let query = r#"
UPDATE machines SET state = $1 WHERE id = $2
"#;
let state_value = serde_json::to_value(state).expect("TODO");
let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()])
.expect("TODO");
if res_count != 1 {
// TODO: not found error!
unimplemented!()
} else {
Ok(())
}
}
/// Retrieve the current state of a state machine from the database,
/// optionally locking the machine state for the duration of some
/// enclosing transaction.
pub fn get_machine<C, S>(conn: &C,
id: &MachineId<S>,
for_update: bool) -> Result<S> where
C: GenericConnection, C: GenericConnection,
S: FSM + DeserializeOwned { S: FSM + DeserializeOwned {
let query = r#" let query = r#"
SELECT (id, created, fsm, state) FROM machines WHERE id = $1 SELECT (id, created, fsm, state) FROM machines WHERE id = $1
"#; "#;
let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); // If the machine is being fetched in the context of a
// transaction, with the intention to update it, the relevant
// clause needs to be appended:
let query = match for_update {
false => query.to_string(),
true => format!("{} FOR UPDATE", query),
};
let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
let mut machines = rows.into_iter().map(|row| MachineT { let mut machines = rows.into_iter().map(|row| MachineT {
id: row.get(0), id: row.get(0),
created: row.get(1), created: row.get(1),
@ -233,9 +273,27 @@ pub fn get_machine<C, S>(conn: &C, id: MachineId<S>) -> Result<S> where
/// processing is finished as running actions may result in additional /// processing is finished as running actions may result in additional
/// transitions. /// transitions.
pub fn advance<C, S>(conn: &C, pub fn advance<C, S>(conn: &C,
id: MachineId<S>, id: &MachineId<S>,
event: S::Event) -> Result<S> where event: S::Event) -> Result<S> where
C: GenericConnection, C: GenericConnection,
S: FSM + DeserializeOwned { S: FSM + Serialize + DeserializeOwned,
unimplemented!() S::Event: Serialize,
S::Action: Serialize {
let tx = conn.transaction().expect("TODO");
let state = get_machine(&tx, id, true).expect("TODO");
// Advancing the FSM consumes the event, so it is persisted first:
let event_id = insert_event(&tx, id, &event).expect("TODO");
// Core advancing logic is run:
let (new_state, actions) = finito::advance(state, event);
// Resulting actions are persisted (TODO: and interpreted)
for action in actions {
insert_action(&tx, id, event_id, &action).expect("TODO");
}
// And finally the state is updated:
update_state(&tx, id, &new_state).expect("TODO");
Ok(new_state)
} }