refactor(postgres): Implement FSMBackend trait for FinitoPostgres

Implements the new backend trait for the FinitoPostgres type which
now represents instances of the Postgres backend.

This refactoring is not yet fully complete, as some restructuring of
the code is in order.
This commit is contained in:
Vincent Ambo 2018-12-13 13:40:54 +01:00
parent 183ee2accc
commit 43f71ae82f

View file

@ -20,35 +20,15 @@ mod error;
pub use error::{Result, Error};
use chrono::prelude::{DateTime, Utc};
use finito::FSM;
use postgres::GenericConnection;
use finito::{FSM, FSMBackend};
use postgres::{Connection, GenericConnection};
use postgres::transaction::Transaction;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::fmt;
use std::marker::PhantomData;
use uuid::Uuid;
/// This struct represents rows in the database table in which
/// machines (i.e. the current state of a Finito state machine) are
/// persisted.
#[derive(Debug, ToSql, FromSql)]
struct MachineT {
/// ID of the persisted state machine.
id: Uuid,
/// Time at which the FSM was first created.
created: DateTime<Utc>,
/// Name of the type of FSM represented by this state.
fsm: String,
/// Current state of the FSM (TODO: Can the serialised FSM type be
/// used?)
state: Value,
}
/// This struct represents rows in the database table in which events
/// are persisted.
#[derive(Debug, ToSql, FromSql)]
@ -118,41 +98,110 @@ struct ActionT {
// The following functions implement the public interface of
// `finito-postgres`.
/// This type is used as a type-safe wrapper around the ID of a state
/// machine. It carries information about the FSM type and is intended
/// to add a layer of checking to prevent IDs from being mixed up.
#[derive(Clone)]
pub struct MachineId<S: FSM> {
uuid: Uuid,
phantom: PhantomData<S>,
/// TODO: Write docs for this type, brain does not want to do it right
/// now.
pub struct FinitoPostgres<S> {
state: S,
// TODO: Use connection pool?
conn: Connection,
}
/// Custom debug implementation to format machine IDs using the name
/// of the FSM and their UUID.
impl <S: FSM> fmt::Debug for MachineId<S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated())
impl <State: 'static> FSMBackend<State> for FinitoPostgres<State> {
type Key = Uuid;
type Error = Error;
fn insert_machine<S: FSM + Serialize>(&self, initial: S) -> Result<Uuid> {
let query = r#"
INSERT INTO machines (id, fsm, state)
VALUES ($1, $2, $3)
"#;
let id = Uuid::new_v4();
let fsm = S::FSM_NAME.to_string();
let state = serde_json::to_value(initial).expect("TODO");
self.conn.execute(query, &[&id, &fsm, &state]).expect("TODO");
return Ok(id);
}
fn get_machine<S: FSM + DeserializeOwned>(&self, key: Uuid) -> Result<S> {
get_machine_internal(&self.conn, key, false)
}
/// Advance a persisted state machine by applying an event, and
/// storing the event as well as all resulting actions.
///
/// This function holds a database-lock on the state's row while
/// advancing the machine.
///
/// **Note**: This function returns the new state of the machine
/// immediately after applying the event, however this does not
/// necessarily equate to the state of the machine after all related
/// processing is finished as running actions may result in additional
/// transitions.
fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result<S>
where S: FSM + Serialize + DeserializeOwned,
S::State: From<&'a State>,
S::Event: Serialize + DeserializeOwned,
S::Action: Serialize + DeserializeOwned {
let tx = self.conn.transaction().expect("TODO");
let state = get_machine_internal(&tx, key, true)?;
// Advancing the FSM consumes the event, so it is persisted first:
let event_id = insert_event::<_, S>(&tx, key, &event)?;
// Core advancing logic is run:
let (new_state, actions) = finito::advance(state, event);
// Resulting actions are persisted (TODO: and interpreted)
let mut action_ids = vec![];
for action in actions {
let action_id = insert_action::<_, S>(&tx, key, event_id, &action)?;
action_ids.push(action_id);
}
// And finally the state is updated:
update_state(&tx, key, &new_state).expect("TODO");
tx.commit().expect("TODO");
self.run_actions::<S>(key, action_ids);
Ok(new_state)
}
}
impl <S: FSM> MachineId<S> {
/// Convert a UUID into a strongly typed machine ID.
pub fn from_uuid(uuid: Uuid) -> Self {
MachineId {
uuid,
phantom: PhantomData,
impl <State: 'static> FinitoPostgres<State> {
/// Execute several actions at the same time, each in a separate
/// thread. Note that actions returning further events, causing
/// further transitions, returning further actions and so on will
/// potentially cause multiple threads to get created.
fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec<Uuid>) where
S: FSM + Serialize + DeserializeOwned,
S::Event: Serialize + DeserializeOwned,
S::Action: Serialize + DeserializeOwned,
S::State: From<&'a State> {
let state: S::State = (&self.state).into();
for action_id in action_ids {
let tx = self.conn.transaction().expect("TODO");
// TODO: Determine which concurrency setup we actually want.
if let Ok(events) = run_action(tx, action_id, &state, PhantomData::<S>) {
for event in events {
self.advance::<S>(fsm_id, event).expect("TODO");
}
}
}
}
/// Return the UUID contained in a machine ID.
pub fn to_uuid(&self) -> Uuid {
self.uuid
}
}
/// Insert a single state-machine into the database and return its
/// newly allocated, random UUID.
pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<Uuid> where
C: GenericConnection,
S: FSM + Serialize {
let query = r#"
@ -166,12 +215,12 @@ pub fn insert_machine<C, S>(conn: &C, initial: S) -> Result<MachineId<S>> where
conn.execute(query, &[&id, &fsm, &state]).expect("TODO");
return Ok(MachineId::from_uuid(id));
return Ok(id);
}
/// Insert a single event into the database and return its UUID.
fn insert_event<C, S>(conn: &C,
fsm_id: &MachineId<S>,
fsm_id: Uuid,
event: &S::Event) -> Result<Uuid>
where
C: GenericConnection,
@ -186,13 +235,13 @@ where
let fsm = S::FSM_NAME.to_string();
let event_value = serde_json::to_value(event).expect("TODO");
conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO");
conn.execute(query, &[&id, &fsm, &fsm_id, &event_value]).expect("TODO");
return Ok(id)
}
/// Insert a single action into the database and return its UUID.
fn insert_action<C, S>(conn: &C,
fsm_id: &MachineId<S>,
fsm_id: Uuid,
event_id: Uuid,
action: &S::Action) -> Result<Uuid> where
C: GenericConnection,
@ -207,14 +256,17 @@ fn insert_action<C, S>(conn: &C,
let fsm = S::FSM_NAME.to_string();
let action_value = serde_json::to_value(action).expect("TODO");
conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id,
&action_value, &ActionStatus::Pending]).expect("TODO");
conn.execute(
query,
&[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending]
).expect("TODO");
return Ok(id)
}
/// Update the state of a specified machine.
fn update_state<C, S>(conn: &C,
fsm_id: &MachineId<S>,
fsm_id: Uuid,
state: &S) -> Result<()> where
C: GenericConnection,
S: FSM + Serialize {
@ -223,7 +275,7 @@ fn update_state<C, S>(conn: &C,
"#;
let state_value = serde_json::to_value(state).expect("TODO");
let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()])
let res_count = conn.execute(query, &[&state_value, &fsm_id])
.expect("TODO");
if res_count != 1 {
@ -246,16 +298,16 @@ fn alter_for_update(alter: bool, query: &str) -> String {
/// Retrieve the current state of a state machine from the database,
/// optionally locking the machine state for the duration of some
/// enclosing transaction.
pub fn get_machine<C, S>(conn: &C,
id: &MachineId<S>,
for_update: bool) -> Result<S> where
fn get_machine_internal<C, S>(conn: &C,
id: Uuid,
for_update: bool) -> Result<S> where
C: GenericConnection,
S: FSM + DeserializeOwned {
let query = alter_for_update(for_update, r#"
SELECT state FROM machines WHERE id = $1
"#);
let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
let rows = conn.query(&query, &[&id]).expect("TODO");
if let Some(row) = rows.into_iter().next() {
Ok(serde_json::from_value(row.get(0)).expect("TODO"))
@ -311,49 +363,6 @@ fn update_action_status<C, S>(conn: &C,
Ok(())
}
/// Advance a persisted state machine by applying an event, and
/// storing the event as well as all resulting actions.
///
/// This function holds a database-lock on the state's row while
/// advancing the machine.
///
/// **Note**: This function returns the new state of the machine
/// immediately after applying the event, however this does not
/// necessarily equate to the state of the machine after all related
/// processing is finished as running actions may result in additional
/// transitions.
pub fn advance<C, S>(conn: &C,
id: &MachineId<S>,
event: S::Event) -> Result<S> where
C: GenericConnection,
S: FSM + Serialize + DeserializeOwned,
S::Event: Serialize,
S::Action: Serialize + DeserializeOwned {
let tx = conn.transaction().expect("TODO");
let state = get_machine(&tx, id, true)?;
// Advancing the FSM consumes the event, so it is persisted first:
let event_id = insert_event(&tx, id, &event)?;
// Core advancing logic is run:
let (new_state, actions) = finito::advance(state, event);
// Resulting actions are persisted (TODO: and interpreted)
let mut action_ids = vec![];
for action in actions {
let action_id = insert_action(&tx, id, event_id, &action)?;
action_ids.push(action_id);
}
// And finally the state is updated:
update_state(&tx, id, &new_state).expect("TODO");
tx.commit().expect("TODO");
run_actions(conn, id, action_ids);
Ok(new_state)
}
/// Execute a single action in case it is pending or retryable. Holds
/// a lock on the action's database row while performing the action
/// and writes back the status afterwards.
@ -362,7 +371,7 @@ pub fn advance<C, S>(conn: &C,
/// panic), the error will be persisted. Should it fail by panicking
/// (which developers should never do explicitly in action
/// interpreters) its status will not be changed.
fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
fn run_action<S>(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData<S>)
-> Result<Vec<S::Event>> where
S: FSM,
S::Action: DeserializeOwned {
@ -370,7 +379,7 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
let result = match status {
ActionStatus::Pending => {
match <S as FSM>::act(action) {
match S::act(action, state) {
// If the action succeeded, update its status to
// completed and return the created events.
Ok(events) => {
@ -402,24 +411,3 @@ fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
tx.commit().expect("TODO");
Ok(result)
}
/// Execute several actions at the same time, each in a separate
/// thread. Note that actions returning further events, causing
/// further transitions, returning further actions and so on will
/// potentially cause multiple threads to get created.
fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where
C: GenericConnection,
S: FSM + Serialize + DeserializeOwned,
S::Event: Serialize,
S::Action: Serialize + DeserializeOwned {
for action_id in action_ids {
let tx = conn.transaction().expect("TODO");
// TODO: Determine which concurrency setup we actually want.
if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) {
for event in events {
advance(conn, fsm_id, event).expect("TODO");
}
}
}
}