feat(postgres): Implement initial (synchronous) actoin execution
Implements a simple model for executing actions which will run them in sequence, synchronously, after advancing an FSM and committing the initial transaction. Note that multiple things are not yet taken into account: * Error handling of actions (they can not currently fail) * Retrying of actions * Concurrency model I started out by implementing the concurrency model similarly to the green-threading method used in Hamingja (but using OS threads), but slowly noticed that it may not be the best way to do that. It needs a little bit of discussion. Either way for most actions this method is fast enough to work for implementing things on top of Finito's model.
This commit is contained in:
parent
3891ba84d5
commit
406a90e8d6
1 changed files with 126 additions and 24 deletions
|
@ -22,6 +22,7 @@ pub use error::{Result, Error};
|
||||||
use chrono::prelude::{DateTime, Utc};
|
use chrono::prelude::{DateTime, Utc};
|
||||||
use finito::FSM;
|
use finito::FSM;
|
||||||
use postgres::GenericConnection;
|
use postgres::GenericConnection;
|
||||||
|
use postgres::transaction::Transaction;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
@ -69,7 +70,7 @@ struct EventT {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This enum represents the possible statuses an action can be in.
|
/// This enum represents the possible statuses an action can be in.
|
||||||
#[derive(Debug, ToSql, FromSql)]
|
#[derive(Debug, PartialEq, ToSql, FromSql)]
|
||||||
#[postgres(name = "actionstatus")]
|
#[postgres(name = "actionstatus")]
|
||||||
enum ActionStatus {
|
enum ActionStatus {
|
||||||
/// The action was requested but has not run yet.
|
/// The action was requested but has not run yet.
|
||||||
|
@ -234,6 +235,15 @@ fn update_state<C, S>(conn: &C,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Conditionally alter SQL statement to append locking clause inside
|
||||||
|
/// of a transaction.
|
||||||
|
fn alter_for_update(alter: bool, query: &str) -> String {
|
||||||
|
match alter {
|
||||||
|
false => query.to_string(),
|
||||||
|
true => format!("{} FOR UPDATE", query),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Retrieve the current state of a state machine from the database,
|
/// Retrieve the current state of a state machine from the database,
|
||||||
/// optionally locking the machine state for the duration of some
|
/// optionally locking the machine state for the duration of some
|
||||||
/// enclosing transaction.
|
/// enclosing transaction.
|
||||||
|
@ -242,34 +252,66 @@ pub fn get_machine<C, S>(conn: &C,
|
||||||
for_update: bool) -> Result<S> where
|
for_update: bool) -> Result<S> where
|
||||||
C: GenericConnection,
|
C: GenericConnection,
|
||||||
S: FSM + DeserializeOwned {
|
S: FSM + DeserializeOwned {
|
||||||
let query = r#"
|
let query = alter_for_update(for_update, r#"
|
||||||
SELECT id, created, fsm, state FROM machines WHERE id = $1
|
SELECT state FROM machines WHERE id = $1
|
||||||
"#;
|
"#);
|
||||||
|
|
||||||
// If the machine is being fetched in the context of a
|
|
||||||
// transaction, with the intention to update it, the relevant
|
|
||||||
// clause needs to be appended:
|
|
||||||
let query = match for_update {
|
|
||||||
false => query.to_string(),
|
|
||||||
true => format!("{} FOR UPDATE", query),
|
|
||||||
};
|
|
||||||
|
|
||||||
let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
|
let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO");
|
||||||
let mut machines = rows.into_iter().map(|row| MachineT {
|
|
||||||
id: id.to_uuid(),
|
|
||||||
created: row.get(1),
|
|
||||||
fsm: row.get(2),
|
|
||||||
state: row.get(3),
|
|
||||||
});
|
|
||||||
|
|
||||||
if let Some(machine) = machines.next() {
|
if let Some(row) = rows.into_iter().next() {
|
||||||
Ok(serde_json::from_value(machine.state).expect("TODO"))
|
Ok(serde_json::from_value(row.get(0)).expect("TODO"))
|
||||||
} else {
|
} else {
|
||||||
// TODO: return appropriate not found error
|
// TODO: return appropriate not found error
|
||||||
Err(Error::SomeError)
|
Err(Error::SomeError)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retrieve an action from the database, optionally locking it for
|
||||||
|
/// the duration of some enclosing transaction.
|
||||||
|
fn get_action<C, S>(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where
|
||||||
|
C: GenericConnection,
|
||||||
|
S: FSM,
|
||||||
|
S::Action: DeserializeOwned {
|
||||||
|
let query = alter_for_update(true, r#"
|
||||||
|
SELECT status, content FROM actions
|
||||||
|
WHERE id = $1 AND fsm = $2
|
||||||
|
"#);
|
||||||
|
|
||||||
|
let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO");
|
||||||
|
|
||||||
|
if let Some(row) = rows.into_iter().next() {
|
||||||
|
let action = serde_json::from_value(row.get(1)).expect("TODO");
|
||||||
|
Ok((row.get(0), action))
|
||||||
|
} else {
|
||||||
|
// TODO: return appropriate not found error
|
||||||
|
Err(Error::SomeError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Update the status of an action after an attempt to run it.
|
||||||
|
fn update_action_status<C, S>(conn: &C,
|
||||||
|
id: Uuid,
|
||||||
|
status: ActionStatus,
|
||||||
|
error: Option<Value>,
|
||||||
|
_fsm: PhantomData<S>) -> Result<()> where
|
||||||
|
C: GenericConnection,
|
||||||
|
S: FSM {
|
||||||
|
let query = r#"
|
||||||
|
UPDATE actions SET status = $1, error = $2
|
||||||
|
WHERE id = $3 AND fsm = $4
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME])
|
||||||
|
.expect("TODO");
|
||||||
|
|
||||||
|
if result != 1 {
|
||||||
|
// TODO: Fail in the most gruesome way!
|
||||||
|
unimplemented!()
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Advance a persisted state machine by applying an event, and
|
/// Advance a persisted state machine by applying an event, and
|
||||||
/// storing the event as well as all resulting actions.
|
/// storing the event as well as all resulting actions.
|
||||||
///
|
///
|
||||||
|
@ -287,24 +329,84 @@ pub fn advance<C, S>(conn: &C,
|
||||||
C: GenericConnection,
|
C: GenericConnection,
|
||||||
S: FSM + Serialize + DeserializeOwned,
|
S: FSM + Serialize + DeserializeOwned,
|
||||||
S::Event: Serialize,
|
S::Event: Serialize,
|
||||||
S::Action: Serialize {
|
S::Action: Serialize + DeserializeOwned {
|
||||||
let tx = conn.transaction().expect("TODO");
|
let tx = conn.transaction().expect("TODO");
|
||||||
let state = get_machine(&tx, id, true).expect("TODO");
|
let state = get_machine(&tx, id, true)?;
|
||||||
|
|
||||||
// Advancing the FSM consumes the event, so it is persisted first:
|
// Advancing the FSM consumes the event, so it is persisted first:
|
||||||
let event_id = insert_event(&tx, id, &event).expect("TODO");
|
let event_id = insert_event(&tx, id, &event)?;
|
||||||
|
|
||||||
// Core advancing logic is run:
|
// Core advancing logic is run:
|
||||||
let (new_state, actions) = finito::advance(state, event);
|
let (new_state, actions) = finito::advance(state, event);
|
||||||
|
|
||||||
// Resulting actions are persisted (TODO: and interpreted)
|
// Resulting actions are persisted (TODO: and interpreted)
|
||||||
|
let mut action_ids = vec![];
|
||||||
for action in actions {
|
for action in actions {
|
||||||
insert_action(&tx, id, event_id, &action).expect("TODO");
|
let action_id = insert_action(&tx, id, event_id, &action)?;
|
||||||
|
action_ids.push(action_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// And finally the state is updated:
|
// And finally the state is updated:
|
||||||
update_state(&tx, id, &new_state).expect("TODO");
|
update_state(&tx, id, &new_state).expect("TODO");
|
||||||
tx.commit().expect("TODO");
|
tx.commit().expect("TODO");
|
||||||
|
|
||||||
|
run_actions(conn, id, action_ids);
|
||||||
|
|
||||||
Ok(new_state)
|
Ok(new_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Execute a single action in case it is pending or retryable. Holds
|
||||||
|
/// a lock on the action's database row while performing the action
|
||||||
|
/// and writes back the status afterwards.
|
||||||
|
///
|
||||||
|
/// Should the execution of an action fail cleanly (i.e. without a
|
||||||
|
/// panic), the error will be persisted. Should it fail by panicking
|
||||||
|
/// (which developers should never do explicitly in action
|
||||||
|
/// interpreters) its status will not be changed.
|
||||||
|
fn run_action<S>(tx: Transaction, id: Uuid, _fsm: PhantomData<S>)
|
||||||
|
-> Result<Vec<S::Event>> where
|
||||||
|
S: FSM,
|
||||||
|
S::Action: DeserializeOwned {
|
||||||
|
let (status, action) = get_action::<Transaction, S>(&tx, id)?;
|
||||||
|
|
||||||
|
let result = match status {
|
||||||
|
ActionStatus::Pending => {
|
||||||
|
let events = <S as FSM>::act(action);
|
||||||
|
update_action_status(
|
||||||
|
&tx, id, ActionStatus::Completed, None, PhantomData::<S>
|
||||||
|
)?;
|
||||||
|
|
||||||
|
events
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => {
|
||||||
|
// TODO: Currently only pending actions are run because
|
||||||
|
// retryable actions are not yet implemented.
|
||||||
|
vec![]
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
tx.commit().expect("TODO");
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute several actions at the same time, each in a separate
|
||||||
|
/// thread. Note that actions returning further events, causing
|
||||||
|
/// further transitions, returning further actions and so on will
|
||||||
|
/// potentially cause multiple threads to get created.
|
||||||
|
fn run_actions<C, S>(conn: &C, fsm_id: &MachineId<S>, action_ids: Vec<Uuid>) where
|
||||||
|
C: GenericConnection,
|
||||||
|
S: FSM + Serialize + DeserializeOwned,
|
||||||
|
S::Event: Serialize,
|
||||||
|
S::Action: Serialize + DeserializeOwned {
|
||||||
|
for action_id in action_ids {
|
||||||
|
let tx = conn.transaction().expect("TODO");
|
||||||
|
|
||||||
|
// TODO: Determine which concurrency setup we actually want.
|
||||||
|
if let Ok(events) = run_action(tx, action_id, PhantomData::<S>) {
|
||||||
|
for event in events {
|
||||||
|
advance(conn, fsm_id, event).expect("TODO");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue