From f8ae63c2da00d5031c0aaed7277659f16262f6f5 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 11:29:48 +0200 Subject: [PATCH 01/30] chore: Initial commit From bc873b9666a64ee2a5df8a8981878c18d7155563 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 11:33:52 +0200 Subject: [PATCH 02/30] chore: Add Rust-specific .gitignore file --- .gitignore | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..143b1ca01 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ + +/target/ +**/*.rs.bk +Cargo.lock From 6d11928efe83feb9d011cad43418199afa0df319 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 11:34:08 +0200 Subject: [PATCH 03/30] docs: Add initial README --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..5acd67d3b --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +Finito +====== + +This is a Rust port of the Haskell state-machine library Finito. It is +slightly less featureful because it loses the ability to ensure that +side-effects are contained and because of a slight reduction in +expressivity, which makes it a bit more restrictive. + +However, it still implements the FSM model well enough. + +# Components + +Finito is split up into multiple independent components (note: not all +of these exist yet), separating functionality related to FSM +persistence from other things. + +* `finito`: Core abstraction implemented by Finito +* `finito-door`: Example implementation of a simple, lockable door +* `finito-postgres`: Persistent state-machines using Postgres + +**Note**: The `finito` core library does not contain any tests. Its +coverage is instead provided by the `finito-door` library, which +actually implements an example FSM. + +These are split out because the documentation for `finito-door` is +interesting regardless and because other Finito packages also need an +example implementation. From da66599696dce1378e6fcef6a5149ba60e5006a2 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 11:34:37 +0200 Subject: [PATCH 04/30] feat(core): Check in Finito core library The implementation of this library is closely modeled after the core abstraction in the Haskell library. This does not at all concern itself with persistence, interpretation of effects and so on. --- Cargo.toml | 4 + finito-core/Cargo.toml | 6 ++ finito-core/src/lib.rs | 176 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 186 insertions(+) create mode 100644 Cargo.toml create mode 100644 finito-core/Cargo.toml create mode 100644 finito-core/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..bb5cb6b67 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,4 @@ +[workspace] +members = [ + "finito-core" +] diff --git a/finito-core/Cargo.toml b/finito-core/Cargo.toml new file mode 100644 index 000000000..00b00a9a7 --- /dev/null +++ b/finito-core/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "finito" +version = "0.1.0" +authors = ["Vincent Ambo "] + +[dependencies] diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs new file mode 100644 index 000000000..2b3258e08 --- /dev/null +++ b/finito-core/src/lib.rs @@ -0,0 +1,176 @@ +//! # What & why? +//! +//! Most processes that occur in software applications can be modeled +//! as finite-state machines (FSMs), however the actual states, the +//! transitions between them and the model's interaction with the +//! external world is often implicit. +//! +//! Making the states of a process explicit using a simple language +//! that works for both software developers and other people who may +//! have opinions on processes makes it easier to synchronise thoughts, +//! extend software and keep a good level of control over what is going +//! on. +//! +//! This library aims to provide functionality for implementing +//! finite-state machines in a way that balances expressivity and +//! safety. +//! +//! Finito does not aim to prevent every possible incorrect +//! transition, but aims for somewhere "safe-enough" (please don't +//! lynch me) that is still easily understood. +//! +//! # Conceptual overview +//! +//! The core idea behind Finito can be expressed in a single line and +//! will potentially look familiar if you have used Erlang in a +//! previous life. The syntax used here is the type-signature notation +//! of Haskell. +//! +//! ```text +//! advance :: state -> event -> (state, [action]) +//! ``` +//! +//! In short, every FSM is made up of three distinct types: +//! +//! * a state type representing all possible states of the machine +//! +//! * an event type representing all possible events in the machine +//! +//! * an action type representing a description of all possible +//! side-effects of the machine +//! +//! Using the definition above we can now say that a transition in a +//! state-machine, involving these three types, takes an initial state +//! and an event to apply it to and returns a new state and a list of +//! actions to execute. +//! +//! With this definition most processes can already be modeled quite +//! well. Two additional functions are required to make it all work: +//! +//! ```text +//! -- | The ability to cause additional side-effects after entering +//! -- a new state. +//! > enter :: state -> [action] +//! ``` +//! +//! as well as +//! +//! ```text +//! -- | An interpreter for side-effects +//! act :: action -> m [event] +//! ``` +//! +//! **Note**: This library is based on an original Haskell library. In +//! Haskell, side-effects can be controlled via the type system which +//! is impossible in Rust. +//! +//! Some parts of Finito make assumptions about the programmer not +//! making certain kinds of mistakes, which are pointed out in the +//! documentation. Unfortunately those assumptions are not +//! automatically verifiable in Rust. +//! +//! == Example +//! +//! Please consult `finito-door` for an example representing a simple, +//! lockable door as a finite-state machine. This gives an overview +//! over Finito's primary features. +//! +//! If you happen to be the kind of person who likes to learn about +//! libraries by reading code, you should familiarise yourself with the +//! door as it shows up as the example in other finito-related +//! libraries, too. +//! +//! # Persistence, side-effects and mud +//! +//! These three things are inescapable in the fateful realm of +//! computers, but Finito separates them out into separate libraries +//! that you can drag in as you need them. +//! +//! Currently, those libraries include: +//! +//! * @finito@: Core components and classes of Finito +//! +//! * @finito-in-mem@: In-memory implementation of state machines +//! that do not need to live longer than an application using +//! standard library concurrency primitives. +//! +//! * @finito-postgres@: Postgres-backed, persistent implementation +//! of state machines that, well, do need to live longer. Uses +//! Postgres for concurrency synchronisation, so keep that in mind. +//! +//! Which should cover most use-cases. Okay, enough prose, lets dive +//! in. +//! +//! # Does Finito make you want to scream? +//! +//! Please reach out! I want to know why! + +use std::mem; + +/// Primary trait that needs to be implemented for every state type +/// representing the states of an FSM. +/// +/// This trait is used to implement transition logic and to "tie the +/// room together", with the room being our triplet of types. +pub trait FSM where Self: Sized { + /// The associated event type of an FSM represents all possible + /// events that can occur in the state-machine. + type Event; + + /// The associated action type of an FSM represents all possible + /// actions that can occur in the state-machine. + type Action; + + /// `handle` deals with any incoming events to cause state + /// transitions and emit actions. This function is the core logic + /// of any state machine. + /// + /// Implementations of this function **must not** cause any + /// side-effects to avoid breaking the guarantees of Finitos + /// conceptual model. + fn handle(self, event: Self::Event) -> (Self, Vec); + + /// `enter` is called when a new state is entered, allowing a + /// state to produce additional side-effects. + /// + /// This is useful for side-effects that event handlers do not + /// need to know about and for resting assured that a certain + /// action has been caused when a state is entered. + /// + /// FSM state types are expected to be enum (i.e. sum) types. A + /// state is considered "new" and enter calls are run if is of a + /// different enum variant. + fn enter(&self) -> Vec; + + /// `act` interprets and executes FSM actions. This is the only + /// part of an FSM in which side-effects are allowed. + fn act(Self::Action) -> Vec; +} + +/// This function is the primary function used to advance a state +/// machine. It takes care of both running the event handler as well +/// as possible state-enter calls and returning the result. +/// +/// Users of Finito should basically always use this function when +/// advancing state-machines manually, and never call FSM-trait +/// methods directly. +pub fn advance(state: S, event: S::Event) -> (S, Vec) { + // Determine the enum variant of the initial state (used to + // trigger enter calls). + let old_discriminant = mem::discriminant(&state); + + let (new_state, mut actions) = state.handle(event); + + // Compare the enum variant of the resulting state to the old one + // and run `enter` if they differ. + let new_discriminant = mem::discriminant(&new_state); + let mut enter_actions = if old_discriminant != new_discriminant { + new_state.enter() + } else { + vec![] + }; + + actions.append(&mut enter_actions); + + (new_state, actions) +} From 60824a06f1db981a07d738c71ba65c965a473838 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 11:36:08 +0200 Subject: [PATCH 05/30] feat(door): Check in example door implementation Checks in my classic, lockable door example implemented in Finito. This does not yet contain the documentation of the door in the Haskell version of Finito. --- Cargo.toml | 3 +- finito-door/Cargo.toml | 7 ++ finito-door/src/lib.rs | 144 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 153 insertions(+), 1 deletion(-) create mode 100644 finito-door/Cargo.toml create mode 100644 finito-door/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index bb5cb6b67..5d62cea6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,5 @@ [workspace] members = [ - "finito-core" + "finito-core", + "finito-door" ] diff --git a/finito-door/Cargo.toml b/finito-door/Cargo.toml new file mode 100644 index 000000000..3497a1c04 --- /dev/null +++ b/finito-door/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "finito-door" +version = "0.1.0" +authors = ["Vincent Ambo "] + +[dependencies.finito] +path = "../finito-core" diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs new file mode 100644 index 000000000..a4aad8dcb --- /dev/null +++ b/finito-door/src/lib.rs @@ -0,0 +1,144 @@ +//! TODO: port the door docs + +extern crate finito; + +use finito::FSM; + +type Code = usize; +type Attempts = usize; + +#[derive(Debug, PartialEq)] +pub enum DoorState { + /// This state represents an open door. + Opened, + + /// This state represents a closed door. + Closed, + + /// This state represents a locked door on which a given code + /// is set. It also carries a number of remaining attempts + /// before the door is permanently disabled. + Locked { code: Code, attempts: Attempts }, + + /// This state represents a disabled door. The police will + /// need to unlock it manually! + Disabled, +} + +#[derive(Debug)] +pub enum DoorEvent { + Open, + Close, + Lock(Code), + Unlock(Code), +} + +#[derive(Debug, PartialEq)] +pub enum DoorAction { + NotifyIRC(String), + CallThePolice, +} + +impl FSM for DoorState { + type Event = DoorEvent; + type Action = DoorAction; + + fn handle(self, event: DoorEvent) -> (Self, Vec) { + match (self, event) { + (DoorState::Opened, DoorEvent::Close) => return (DoorState::Closed, vec![]), + + (DoorState::Closed, DoorEvent::Open) => return (DoorState::Opened, vec![]), + + (DoorState::Closed, DoorEvent::Lock(code)) => { + return (DoorState::Locked { code, attempts: 3 }, vec![]) + } + + (DoorState::Locked { code, attempts }, DoorEvent::Unlock(unlock_code)) => { + if code == unlock_code { + return (DoorState::Closed, vec![]); + } + + if attempts == 1 { + return (DoorState::Disabled, vec![DoorAction::CallThePolice]); + } + + return ( + DoorState::Locked { + code, + attempts: attempts - 1, + }, + vec![DoorAction::NotifyIRC("invalid code entered".into())], + ); + } + + (current, _) => (current, vec![]), + } + } + + fn enter(&self) -> Vec { + let msg = match self { + DoorState::Opened => "door was opened", + DoorState::Closed => "door was closed", + DoorState::Locked { .. } => "door was locked", + DoorState::Disabled => "door was disabled", + }; + + vec![DoorAction::NotifyIRC(msg.into())] + } + + fn act(action: DoorAction) -> Vec { + match action { + DoorAction::NotifyIRC(msg) => { + // TODO: write to file in example + println!("IRC: {}", msg); + vec![] + } + + DoorAction::CallThePolice => { + // TODO: call the police + println!("The police was called! For real!"); + vec![] + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use finito::advance; + + fn test_fsm(initial: S, events: Vec) -> (S, Vec) { + events.into_iter().fold((initial, vec![]), |(state, mut actions), event| { + let (new_state, mut new_actions) = advance(state, event); + actions.append(&mut new_actions); + (new_state, actions) + }) + } + + #[test] + fn test_door() { + let initial = DoorState::Opened; + let events = vec![ + DoorEvent::Close, + DoorEvent::Open, + DoorEvent::Close, + DoorEvent::Lock(1234), + DoorEvent::Unlock(1234), + DoorEvent::Lock(4567), + DoorEvent::Unlock(1234), + ]; + let (final_state, actions) = test_fsm(initial, events); + + assert_eq!(final_state, DoorState::Locked { code: 4567, attempts: 2 }); + assert_eq!(actions, vec![ + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was opened".into()), + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was locked".into()), + DoorAction::NotifyIRC("door was closed".into()), + DoorAction::NotifyIRC("door was locked".into()), + DoorAction::NotifyIRC("invalid code entered".into()), + ]); + } +} From c03e14758f53eeeecfe8067dd2eff9e9d32c1edd Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 16:51:33 +0200 Subject: [PATCH 06/30] fix(core): Add missing 'FSM_NAME' associated constant This one got lost while moving from the prototype code to the proper library. --- finito-core/src/lib.rs | 5 +++++ finito-door/src/lib.rs | 1 + 2 files changed, 6 insertions(+) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 2b3258e08..a622134a1 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -113,6 +113,11 @@ use std::mem; /// This trait is used to implement transition logic and to "tie the /// room together", with the room being our triplet of types. pub trait FSM where Self: Sized { + /// A human-readable string uniquely describing what this FSM + /// models. This is used in log messages, database tables and + /// various other things throughout Finito. + const FSM_NAME: &'static str; + /// The associated event type of an FSM represents all possible /// events that can occur in the state-machine. type Event; diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index a4aad8dcb..cfbaa4abe 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -40,6 +40,7 @@ pub enum DoorAction { } impl FSM for DoorState { + const FSM_NAME: &'static str = "door"; type Event = DoorEvent; type Action = DoorAction; From b1e00ff0264fec4f3a5b87470980aebd94db81cf Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 16:53:04 +0200 Subject: [PATCH 07/30] feat(postgres): Bootstrap Postgres persistence implementation Adds the initial finito-postgres crate with type definitions for the tables and initial functions to interact with persisted FSMs. This is far from feature complete at this commit. --- Cargo.toml | 3 +- finito-postgres/Cargo.toml | 21 +++ finito-postgres/src/error.rs | 8 ++ finito-postgres/src/lib.rs | 241 +++++++++++++++++++++++++++++++++++ 4 files changed, 272 insertions(+), 1 deletion(-) create mode 100644 finito-postgres/Cargo.toml create mode 100644 finito-postgres/src/error.rs create mode 100644 finito-postgres/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 5d62cea6e..310133abe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,6 @@ [workspace] members = [ "finito-core", - "finito-door" + "finito-door", + "finito-postgres" ] diff --git a/finito-postgres/Cargo.toml b/finito-postgres/Cargo.toml new file mode 100644 index 000000000..c9f8476db --- /dev/null +++ b/finito-postgres/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "finito-postgres" +version = "0.1.0" +authors = ["Vincent Ambo "] + +[dependencies] +chrono = "0.4" +serde = "1.0" +serde_json = "1.0" +postgres-derive = "0.3" + +[dependencies.postgres] +version = "0.15" +features = [ "with-uuid", "with-chrono", "with-serde_json" ] + +[dependencies.uuid] +version = "0.5" +features = [ "v4" ] + +[dependencies.finito] +path = "../finito-core" diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs new file mode 100644 index 000000000..ccbf0c107 --- /dev/null +++ b/finito-postgres/src/error.rs @@ -0,0 +1,8 @@ +//! This module defines error types and conversions for issue that can +//! occur while dealing with persisted state machines. + +use std::result; + +pub type Result = result::Result; + +pub enum Error { SomeError } diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs new file mode 100644 index 000000000..aee8cf9ad --- /dev/null +++ b/finito-postgres/src/lib.rs @@ -0,0 +1,241 @@ +//! This module implements ... TODO when I can write again. + +#[macro_use] extern crate postgres; +#[macro_use] extern crate postgres_derive; + +extern crate chrono; +extern crate finito; +extern crate serde; +extern crate serde_json; +extern crate uuid; + +mod error; +pub use error::{Result, Error}; + +use chrono::prelude::{DateTime, Utc}; +use finito::FSM; +use postgres::GenericConnection; +use serde::Serialize; +use serde::de::DeserializeOwned; +use serde_json::Value; +use std::fmt; +use std::marker::PhantomData; +use uuid::Uuid; + +/// This struct represents rows in the database table in which +/// machines (i.e. the current state of a Finito state machine) are +/// persisted. +#[derive(Debug, ToSql, FromSql)] +struct MachineT { + /// ID of the persisted state machine. + id: Uuid, + + /// Time at which the FSM was first created. + created: DateTime, + + /// Name of the type of FSM represented by this state. + fsm: String, + + /// Current state of the FSM (TODO: Can the serialised FSM type be + /// used?) + state: Value, +} + +/// This struct represents rows in the database table in which events +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct EventT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// Serialised content of the event. + event: Value, +} + +/// This enum represents the possible statuses an action can be in. +#[derive(Debug, ToSql, FromSql)] +enum ActionStatus { + /// The action was requested but has not run yet. + Pending, + + /// The action completed successfully. + Completed, + + /// The action failed to run. Information about the error will + /// have been persisted in Postgres. + Failed, +} + +/// This struct represents rows in the database table in which actions +/// are persisted. +#[derive(Debug, ToSql, FromSql)] +struct ActionT { + /// ID of the persisted event. + id: Uuid, + + /// Timestamp at which the event was stored. + created: DateTime, + + /// Name of the type of FSM that this state belongs to. + fsm: String, + + /// ID of the state machine belonging to this event. + fsm_id: Uuid, + + /// Serialised content of the action. + action: Value, + + /// Current status of the action. + status: ActionStatus, + + /// Serialised error representation, if an error occured during + /// processing. TODO: Use some actual error type. Maybe failure + /// has serialisation support? + error: Option, +} + +// The following functions implement the public interface of +// `finito-postgres`. + +/// This type is used as a type-safe wrapper around the ID of a state +/// machine. It carries information about the FSM type and is intended +/// to add a layer of checking to prevent IDs from being mixed up. +#[derive(Clone)] +pub struct MachineId { + uuid: Uuid, + phantom: PhantomData, +} + +/// Custom debug implementation to format machine IDs using the name +/// of the FSM and their UUID. +impl fmt::Debug for MachineId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated()) + } +} + +impl MachineId { + /// Convert a UUID into a strongly typed machine ID. + pub fn from_uuid(uuid: Uuid) -> Self { + MachineId { + uuid, + phantom: PhantomData, + } + } + + /// Return the UUID contained in a machine ID. + pub fn to_uuid(&self) -> Uuid { + self.uuid + } +} + +/// Insert a single state-machine into the database and return its +/// newly allocated, random UUID. +pub fn insert_machine(conn: &C, initial: S) -> Result> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + INSERT INTO machines (id, created, fsm, state) + VALUES ($1, NOW(), $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + + return Ok(MachineId::from_uuid(id)); +} + +/// Insert a single event into the database and return its UUID. +pub fn insert_event(conn: &C, fsm_id: MachineId, event: S::Event) -> Result +where + C: GenericConnection, + S: FSM, + S::Event: Serialize { + let query = r#" + INSERT INTO events (id, created, fsm, fsm_id, event) + VALUES ($1, NOW(), $2, $3, $4) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let event_value = serde_json::to_value(event).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO"); + return Ok(id) +} + +/// Insert a single action into the database and return its UUID. +pub fn insert_action(conn: &C, + fsm_id: MachineId, + action: S::Action) -> Result where + C: GenericConnection, + S: FSM, + S::Action: Serialize { + let query = r#" + INSERT INTO actions (id, created, fsm, fsm_id, action, status) + VALUES ($1, NOW(), $2, $3, $4, $5) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let action_value = serde_json::to_value(action).expect("TODO"); + + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, + &ActionStatus::Pending]).expect("TODO"); + return Ok(id) +} + +/// Retrieve the current state of a state machine from the database. +pub fn get_machine(conn: &C, id: MachineId) -> Result where + C: GenericConnection, + S: FSM + DeserializeOwned { + let query = r#" + SELECT (id, created, fsm, state) FROM machines WHERE id = $1 + "#; + + let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); + let mut machines = rows.into_iter().map(|row| MachineT { + id: row.get(0), + created: row.get(1), + fsm: row.get(2), + state: row.get(3), + }); + + if let Some(machine) = machines.next() { + Ok(serde_json::from_value(machine.state).expect("TODO")) + } else { + // TODO: return appropriate not found error + Err(Error::SomeError) + } +} + +/// Advance a persisted state machine by applying an event, and +/// storing the event as well as all resulting actions. +/// +/// This function holds a database-lock on the state's row while +/// advancing the machine. +/// +/// **Note**: This function returns the new state of the machine +/// immediately after applying the event, however this does not +/// necessarily equate to the state of the machine after all related +/// processing is finished as running actions may result in additional +/// transitions. +pub fn advance(conn: &C, + id: MachineId, + event: S::Event) -> Result where + C: GenericConnection, + S: FSM + DeserializeOwned { + unimplemented!() +} From 40caa5ffa23cdd482b7c97e74891fb41269e8076 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 17:28:45 +0200 Subject: [PATCH 08/30] feat(postgres): Implement Postgres-backed 'advance' function Transactionally updates a state machine with an incoming event. Note that this does not yet interpret actions. --- finito-postgres/src/error.rs | 1 + finito-postgres/src/lib.rs | 86 ++++++++++++++++++++++++++++++------ 2 files changed, 73 insertions(+), 14 deletions(-) diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index ccbf0c107..0fb30a99d 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -5,4 +5,5 @@ use std::result; pub type Result = result::Result; +#[derive(Debug)] pub enum Error { SomeError } diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index aee8cf9ad..152592405 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -91,6 +91,9 @@ struct ActionT { /// ID of the state machine belonging to this event. fsm_id: Uuid, + /// ID of the event that resulted in this action. + event_id: Uuid, + /// Serialised content of the action. action: Value, @@ -158,7 +161,9 @@ pub fn insert_machine(conn: &C, initial: S) -> Result> where } /// Insert a single event into the database and return its UUID. -pub fn insert_event(conn: &C, fsm_id: MachineId, event: S::Event) -> Result +fn insert_event(conn: &C, + fsm_id: &MachineId, + event: &S::Event) -> Result where C: GenericConnection, S: FSM, @@ -177,35 +182,70 @@ where } /// Insert a single action into the database and return its UUID. -pub fn insert_action(conn: &C, - fsm_id: MachineId, - action: S::Action) -> Result where +fn insert_action(conn: &C, + fsm_id: &MachineId, + event_id: Uuid, + action: &S::Action) -> Result where C: GenericConnection, S: FSM, S::Action: Serialize { let query = r#" - INSERT INTO actions (id, created, fsm, fsm_id, action, status) - VALUES ($1, NOW(), $2, $3, $4, $5) + INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status) + VALUES ($1, NOW(), $2, $3, $4, $5, $6) "#; let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); let action_value = serde_json::to_value(action).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &action_value, - &ActionStatus::Pending]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id, + &action_value, &ActionStatus::Pending]).expect("TODO"); return Ok(id) } -/// Retrieve the current state of a state machine from the database. -pub fn get_machine(conn: &C, id: MachineId) -> Result where +/// Update the state of a specified machine. +fn update_state(conn: &C, + fsm_id: &MachineId, + state: &S) -> Result<()> where + C: GenericConnection, + S: FSM + Serialize { + let query = r#" + UPDATE machines SET state = $1 WHERE id = $2 + "#; + + let state_value = serde_json::to_value(state).expect("TODO"); + let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()]) + .expect("TODO"); + + if res_count != 1 { + // TODO: not found error! + unimplemented!() + } else { + Ok(()) + } +} + +/// Retrieve the current state of a state machine from the database, +/// optionally locking the machine state for the duration of some +/// enclosing transaction. +pub fn get_machine(conn: &C, + id: &MachineId, + for_update: bool) -> Result where C: GenericConnection, S: FSM + DeserializeOwned { let query = r#" SELECT (id, created, fsm, state) FROM machines WHERE id = $1 "#; - let rows = conn.query(query, &[&id.to_uuid()]).expect("TODO"); + // If the machine is being fetched in the context of a + // transaction, with the intention to update it, the relevant + // clause needs to be appended: + let query = match for_update { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + }; + + let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); let mut machines = rows.into_iter().map(|row| MachineT { id: row.get(0), created: row.get(1), @@ -233,9 +273,27 @@ pub fn get_machine(conn: &C, id: MachineId) -> Result where /// processing is finished as running actions may result in additional /// transitions. pub fn advance(conn: &C, - id: MachineId, + id: &MachineId, event: S::Event) -> Result where C: GenericConnection, - S: FSM + DeserializeOwned { - unimplemented!() + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize { + let tx = conn.transaction().expect("TODO"); + let state = get_machine(&tx, id, true).expect("TODO"); + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event(&tx, id, &event).expect("TODO"); + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + for action in actions { + insert_action(&tx, id, event_id, &action).expect("TODO"); + } + + // And finally the state is updated: + update_state(&tx, id, &new_state).expect("TODO"); + Ok(new_state) } From 401486d124154e8fb33e319814648e4a10b30e94 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:00:20 +0200 Subject: [PATCH 09/30] docs(door): Port over documentation from finito-hs --- finito-door/src/lib.rs | 180 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 172 insertions(+), 8 deletions(-) diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index cfbaa4abe..0137a6026 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -1,68 +1,210 @@ -//! TODO: port the door docs +//! # What & why? +//! +//! This module serves as a (hopefully simple) example of how to +//! implement finite-state machines using Finito. Note that the +//! concepts of Finito itself won't be explained in detail here, +//! consult its library documentation for that. +//! +//! Reading through this module should give you a rough idea of how to +//! work with Finito and get you up and running modeling things +//! *quickly*. +//! +//! Note: The generated documentation for this module will display the +//! various components of the door, but it will not inform you about +//! the actual transition logic and all that stuff. Read the source, +//! too! +//! +//! # The Door +//! +//! My favourite example when explaining these state-machines +//! conceptually has been to use a simple, lockable door. Our door has +//! a keypad next to it which can be used to lock the door by entering +//! a code, after which the same code must be entered to unlock it +//! again. +//! +//! The door can only be locked if it is closed. Oh, and it has a few +//! extra features: +//! +//! * whenever the door's state changes, an IRC channel receives a +//! message about that +//! +//! * the door calls the police if the code is intered incorrectly more +//! than a specified number of times (mhm, lets say, three) +//! +//! * if the police is called the door can not be interacted with +//! anymore (and honestly, for the sake of this example, we don't +//! care how its functionality is restored) +//! +//! ## The Door - Visualized +//! +//! Here's a rough attempt at drawing a state diagram in ASCII. The +//! bracketed words denote states, the arrows denote events: +//! +//! ```text +//! <--Open--- <--Unlock-- correct code? --Unlock--> +//! [Opened] [Closed] [Locked] [Disabled] +//! --Close--> ----Lock--> +//! ``` +//! +//! I'm so sorry for that drawing. +//! +//! ## The Door - Usage example +//! +//! An interaction session with our final door could look like this: +//! +//! ```rust,ignore +//! use finito_postgres::{insert_machine, advance}; +//! +//! let door = insert_machine(&conn, &DoorState::Opened)?; +//! +//! advance(&conn, &door, DoorEvent::Close)?; +//! advance(&conn, &door, DoorEvent::Lock(1337))?; +//! +//! format!("Door is now: {}", get_machine(&conn, &door)?); +//! ``` +//! +//! Here we have created, closed and then locked a door and inspected +//! its state. We will see that it is locked, has the locking code we +//! gave it and three remaining attempts to open it. +//! +//! Alright, enough foreplay, lets dive in! extern crate finito; use finito::FSM; +/// Type synonym to represent the code with which the door is locked. This +/// exists only for clarity in the signatures below and please do not email me +/// about the fact that an integer is not actually a good representation of +/// numerical digits. Thanks! type Code = usize; + +/// Type synonym to represent the remaining number of unlock attempts. type Attempts = usize; +/// This type represents the possible door states and the data that they carry. +/// We can infer this from the "diagram" in the documentation above. +/// +/// This type is the one for which `finito::FSM` will be implemented, making it +/// the wooden (?) heart of our door. #[derive(Debug, PartialEq)] pub enum DoorState { - /// This state represents an open door. + /// In `Opened` state, the door is wide open and anyone who fits through can + /// go through. Opened, - /// This state represents a closed door. + /// In `Closed` state, the door is shut but does not prevent anyone from + /// opening it. Closed, - /// This state represents a locked door on which a given code - /// is set. It also carries a number of remaining attempts - /// before the door is permanently disabled. + /// In `Locked` state, the door is locked and waiting for someone to enter + /// its locking code on the keypad. + /// + /// This state contains the code that the door is locked with, as well as + /// the remaining number of attempts before the door calls the police and + /// becomes unusable. Locked { code: Code, attempts: Attempts }, - /// This state represents a disabled door. The police will - /// need to unlock it manually! + /// This state represents a disabled door after the police has been called. + /// The police will need to unlock it manually! Disabled, } +/// This type represents the events that can occur in our door, i.e. the input +/// and interactions it receives. #[derive(Debug)] pub enum DoorEvent { + /// `Open` means someone is opening the door! Open, + + /// `Close` means, you guessed it, the exact opposite. Close, + + /// `Lock` means somebody has entered a locking code on the + /// keypad. Lock(Code), + + /// `Unlock` means someone has attempted to unlock the door. Unlock(Code), } +/// This type represents the possible actions, a.k.a. everything our door "does" +/// that does not just impact itself, a.k.a. side-effects. +/// +/// **Note**: This type by itself *is not* a collection of side-effects, it +/// merely describes the side-effects we want to occur (which are then +/// interpreted by the machinery later). #[derive(Debug, PartialEq)] pub enum DoorAction { + /// `NotifyIRC` is used to display some kind of message on the + /// aforementioned IRC channel that is, for some reason, very interested in + /// the state of the door. NotifyIRC(String), + + /// `CallThePolice` does what you think it does. + /// + /// **Note**: For safety reasons, causing this action is not recommended for + /// users inside the US! CallThePolice, } +/// This trait implementation turns our 'DoorState' into a type actually +/// representing a finite-state machine. To implement it, we need to do three +/// main things: +/// +/// * Define what our associated `Event` and `Action` type should be +/// +/// * Define the event-handling and state-entering logic (i.e. the meat of the +/// ... door) +/// +/// * Implement the interpretation of our actions, i.e. implement actual +/// side-effects impl FSM for DoorState { const FSM_NAME: &'static str = "door"; + + // As you might expect, our `Event` type is 'DoorEvent' and our `Action` + // type is 'DoorAction'. type Event = DoorEvent; type Action = DoorAction; + // The implementation of `handle` provides us with the actual transition + // logic of the door. + // + // The door is conceptually not that complicated so it is relatively short. fn handle(self, event: DoorEvent) -> (Self, Vec) { match (self, event) { + // An opened door can be closed: (DoorState::Opened, DoorEvent::Close) => return (DoorState::Closed, vec![]), + // A closed door can be opened: (DoorState::Closed, DoorEvent::Open) => return (DoorState::Opened, vec![]), + // A closed door can also be locked, in which case the locking code + // is stored with the next state and the unlock attempts default to + // three: (DoorState::Closed, DoorEvent::Lock(code)) => { return (DoorState::Locked { code, attempts: 3 }, vec![]) } + // A locked door receiving an `Unlock`-event can do several + // different things ... (DoorState::Locked { code, attempts }, DoorEvent::Unlock(unlock_code)) => { + // In the happy case, entry of a correct code leads to the door + // becoming unlocked (i.e. transitioning back to `Closed`). if code == unlock_code { return (DoorState::Closed, vec![]); } + // If the code wasn't correct and the fraudulent unlocker ran + // out of attempts (i.e. there was only one attempt remaining), + // it's time for some consequences. if attempts == 1 { return (DoorState::Disabled, vec![DoorAction::CallThePolice]); } + // If the code wasn't correct, but there are still some + // remaining attempts, the user doesn't have to face the police + // quite yet but IRC gets to laugh about it. return ( DoorState::Locked { code, @@ -72,10 +214,23 @@ impl FSM for DoorState { ); } + // This actually already concludes our event-handling logic. Our + // uncaring door does absolutely nothing if you attempt to do + // something with it that it doesn't support, so the last handler is + // a simple fallback. + // + // In a real-world state machine, especially one that receives + // events from external sources, you may want fallback handlers to + // actually do something. One example could be creating an action + // that logs information about unexpected events, alerts a + // monitoring service, or whatever else. (current, _) => (current, vec![]), } } + // The implementation of `enter` lets door states cause additional actions + // they are transitioned to. In the door example we use this only to notify + // IRC about what is going on. fn enter(&self) -> Vec { let msg = match self { DoorState::Opened => "door was opened", @@ -87,6 +242,15 @@ impl FSM for DoorState { vec![DoorAction::NotifyIRC(msg.into())] } + // The implementation of `act` lets us perform actual side-effects. + // + // Again, for the sake of educational simplicity, this does not deal with + // all potential (or in fact any) error cases that can occur during this toy + // implementation of actions. + // + // Additionally the `act` function can return new events. This is useful for + // a sort of "callback-like" pattern (cause an action to fetch some data, + // receive it as an event) but is not used in this example. fn act(action: DoorAction) -> Vec { match action { DoorAction::NotifyIRC(msg) => { From 965574913e835e746b8e71b62eb76898a6686ded Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:02:55 +0200 Subject: [PATCH 10/30] docs(core): Fix rustdoc syntax in several places --- finito-core/src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index a622134a1..0fdb2bd91 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -69,7 +69,7 @@ //! documentation. Unfortunately those assumptions are not //! automatically verifiable in Rust. //! -//! == Example +//! ## Example //! //! Please consult `finito-door` for an example representing a simple, //! lockable door as a finite-state machine. This gives an overview @@ -88,15 +88,16 @@ //! //! Currently, those libraries include: //! -//! * @finito@: Core components and classes of Finito +//! * `finito`: Core components and classes of Finito //! -//! * @finito-in-mem@: In-memory implementation of state machines +//! * `finito-in-mem`: In-memory implementation of state machines //! that do not need to live longer than an application using //! standard library concurrency primitives. //! -//! * @finito-postgres@: Postgres-backed, persistent implementation +//! * `finito-postgres`: Postgres-backed, persistent implementation //! of state machines that, well, do need to live longer. Uses -//! Postgres for concurrency synchronisation, so keep that in mind. +//! Postgres for concurrency synchronisation, so keep that in +//! mind. //! //! Which should cover most use-cases. Okay, enough prose, lets dive //! in. From fe97c712cc64582cdd33dda8845820e479f83a39 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:04:28 +0200 Subject: [PATCH 11/30] docs: Add rustdoc header lines as expected by the format These are rendered in the rustdoc crate overview sidebar. --- finito-core/src/lib.rs | 2 ++ finito-door/src/lib.rs | 2 ++ finito-postgres/src/lib.rs | 2 ++ 3 files changed, 6 insertions(+) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 0fdb2bd91..2f27a2bdf 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -1,3 +1,5 @@ +//! Finito's core finite-state machine abstraction. +//! //! # What & why? //! //! Most processes that occur in software applications can be modeled diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index 0137a6026..296ce72b2 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -1,3 +1,5 @@ +//! Example implementation of a lockable door in Finito +//! //! # What & why? //! //! This module serves as a (hopefully simple) example of how to diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 152592405..b7a0c4540 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -1,3 +1,5 @@ +//! PostgreSQL-backed persistence for Finito state machines +//! //! This module implements ... TODO when I can write again. #[macro_use] extern crate postgres; From cbb58fa6c2d60a72a454ba3c130bbcb990488643 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:19:10 +0200 Subject: [PATCH 12/30] feat(postgres): Add initial table schema for Finito tables --- .gitignore | 2 +- .../down.sql | 4 ++ .../up.sql | 37 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql create mode 100644 finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql diff --git a/.gitignore b/.gitignore index 143b1ca01..1aefdbbf6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ - +.envrc /target/ **/*.rs.bk Cargo.lock diff --git a/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql new file mode 100644 index 000000000..9b56f9d35 --- /dev/null +++ b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/down.sql @@ -0,0 +1,4 @@ +DROP TABLE actions; +DROP TYPE ActionStatus; +DROP TABLE events; +DROP TABLE machines; diff --git a/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql new file mode 100644 index 000000000..0de1a9e3c --- /dev/null +++ b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql @@ -0,0 +1,37 @@ +-- Creates the initial schema required by finito-postgres. + +CREATE TABLE machines ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + state JSONB NOT NULL +); + +CREATE TABLE events ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + fsm_id UUID NOT NULL REFERENCES machines(id), + event JSONB NOT NULL +); +CREATE INDEX idx_events_machines ON events(fsm_id); + +CREATE TYPE ActionStatus AS ENUM ( + 'Pending', + 'Completed', + 'Failed' +); + +CREATE TABLE actions ( + id UUID PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + fsm TEXT NOT NULL, + fsm_id UUID NOT NULL REFERENCES machines(id), + event_id UUID NOT NULL REFERENCES events(id), + content JSONB NOT NULL, + status ActionStatus NOT NULL, + error JSONB +); + +CREATE INDEX idx_actions_machines ON actions(fsm_id); +CREATE INDEX idx_actions_events ON actions(event_id); From 6e35c083bf95b3793c7b1800eae4357543fec363 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:20:04 +0200 Subject: [PATCH 13/30] refactor(postgres): Minor changes to match actual table schema --- finito-postgres/src/lib.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index b7a0c4540..6e1815a69 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -1,6 +1,8 @@ //! PostgreSQL-backed persistence for Finito state machines //! //! This module implements ... TODO when I can write again. +//! +//! TODO: events & actions should have `SERIAL` keys #[macro_use] extern crate postgres; #[macro_use] extern crate postgres_derive; @@ -97,6 +99,7 @@ struct ActionT { event_id: Uuid, /// Serialised content of the action. + #[postgres(name = "content")] // renamed because 'action' is a keyword in PG action: Value, /// Current status of the action. @@ -105,7 +108,7 @@ struct ActionT { /// Serialised error representation, if an error occured during /// processing. TODO: Use some actual error type. Maybe failure /// has serialisation support? - error: Option, + error: Option, } // The following functions implement the public interface of @@ -149,8 +152,8 @@ pub fn insert_machine(conn: &C, initial: S) -> Result> where C: GenericConnection, S: FSM + Serialize { let query = r#" - INSERT INTO machines (id, created, fsm, state) - VALUES ($1, NOW(), $2, $3) + INSERT INTO machines (id, fsm, state) + VALUES ($1, $2, $3) "#; let id = Uuid::new_v4(); @@ -171,8 +174,8 @@ where S: FSM, S::Event: Serialize { let query = r#" - INSERT INTO events (id, created, fsm, fsm_id, event) - VALUES ($1, NOW(), $2, $3, $4) + INSERT INTO events (id, fsm, fsm_id, event) + VALUES ($1, $2, $3, $4) "#; let id = Uuid::new_v4(); @@ -192,8 +195,8 @@ fn insert_action(conn: &C, S: FSM, S::Action: Serialize { let query = r#" - INSERT INTO actions (id, created, fsm, fsm_id, event_id, action, status) - VALUES ($1, NOW(), $2, $3, $4, $5, $6) + INSERT INTO actions (id, fsm, fsm_id, event_id, action, status) + VALUES ($1, $2, $3, $4, $5, $6) "#; let id = Uuid::new_v4(); From 6254d056204b41b346367f9f03a4407b6c2a1a1a Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:43:24 +0200 Subject: [PATCH 14/30] feat(door): Add serde instances for door FSM types --- finito-door/Cargo.toml | 4 ++++ finito-door/src/lib.rs | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/finito-door/Cargo.toml b/finito-door/Cargo.toml index 3497a1c04..ad1cf22d4 100644 --- a/finito-door/Cargo.toml +++ b/finito-door/Cargo.toml @@ -3,5 +3,9 @@ name = "finito-door" version = "0.1.0" authors = ["Vincent Ambo "] +[dependencies] +serde = "1.0" +serde_derive = "1.0" + [dependencies.finito] path = "../finito-core" diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index 296ce72b2..074556c76 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -71,6 +71,7 @@ //! //! Alright, enough foreplay, lets dive in! +#[macro_use] extern crate serde_derive; extern crate finito; use finito::FSM; @@ -89,7 +90,7 @@ type Attempts = usize; /// /// This type is the one for which `finito::FSM` will be implemented, making it /// the wooden (?) heart of our door. -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum DoorState { /// In `Opened` state, the door is wide open and anyone who fits through can /// go through. @@ -114,7 +115,7 @@ pub enum DoorState { /// This type represents the events that can occur in our door, i.e. the input /// and interactions it receives. -#[derive(Debug)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum DoorEvent { /// `Open` means someone is opening the door! Open, @@ -136,7 +137,7 @@ pub enum DoorEvent { /// **Note**: This type by itself *is not* a collection of side-effects, it /// merely describes the side-effects we want to occur (which are then /// interpreted by the machinery later). -#[derive(Debug, PartialEq)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub enum DoorAction { /// `NotifyIRC` is used to display some kind of message on the /// aforementioned IRC channel that is, for some reason, very interested in From 7e5592f0d186dde6e3e10be51efce2bdfc7483d0 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:43:53 +0200 Subject: [PATCH 15/30] fix(postgres): Minor fixes in Postgres queries and handling --- finito-postgres/Cargo.toml | 3 +++ finito-postgres/src/lib.rs | 9 ++++++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/finito-postgres/Cargo.toml b/finito-postgres/Cargo.toml index c9f8476db..f32a30d5b 100644 --- a/finito-postgres/Cargo.toml +++ b/finito-postgres/Cargo.toml @@ -19,3 +19,6 @@ features = [ "v4" ] [dependencies.finito] path = "../finito-core" + +[dev-dependencies.finito-door] +path = "../finito-door" diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 6e1815a69..80200f8a4 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -67,6 +67,7 @@ struct EventT { /// This enum represents the possible statuses an action can be in. #[derive(Debug, ToSql, FromSql)] +#[postgres(name = "actionstatus")] enum ActionStatus { /// The action was requested but has not run yet. Pending, @@ -195,7 +196,7 @@ fn insert_action(conn: &C, S: FSM, S::Action: Serialize { let query = r#" - INSERT INTO actions (id, fsm, fsm_id, event_id, action, status) + INSERT INTO actions (id, fsm, fsm_id, event_id, content, status) VALUES ($1, $2, $3, $4, $5, $6) "#; @@ -239,7 +240,7 @@ pub fn get_machine(conn: &C, C: GenericConnection, S: FSM + DeserializeOwned { let query = r#" - SELECT (id, created, fsm, state) FROM machines WHERE id = $1 + SELECT id, created, fsm, state FROM machines WHERE id = $1 "#; // If the machine is being fetched in the context of a @@ -252,7 +253,7 @@ pub fn get_machine(conn: &C, let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); let mut machines = rows.into_iter().map(|row| MachineT { - id: row.get(0), + id: id.to_uuid(), created: row.get(1), fsm: row.get(2), state: row.get(3), @@ -300,5 +301,7 @@ pub fn advance(conn: &C, // And finally the state is updated: update_state(&tx, id, &new_state).expect("TODO"); + tx.commit().expect("TODO"); + Ok(new_state) } From 3891ba84d5dbf335442c1c9e263823820f2a3327 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 18:44:16 +0200 Subject: [PATCH 16/30] test(postgres): Add test for insert_machine and advance Adds a test for the two most important functions in Finito's PostgreSQL backend. These actually require a local Postgres database to be available when running. Currently the connection details are hardcoded in the test. --- finito-postgres/src/lib.rs | 3 +++ finito-postgres/src/tests.rs | 47 ++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) create mode 100644 finito-postgres/src/tests.rs diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 80200f8a4..5ed9693bc 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -13,6 +13,9 @@ extern crate serde; extern crate serde_json; extern crate uuid; +#[cfg(test)] mod tests; +#[cfg(test)] extern crate finito_door; + mod error; pub use error::{Result, Error}; diff --git a/finito-postgres/src/tests.rs b/finito-postgres/src/tests.rs new file mode 100644 index 000000000..55ada1329 --- /dev/null +++ b/finito-postgres/src/tests.rs @@ -0,0 +1,47 @@ +use super::*; +use finito; +use finito_door::*; +use postgres::{Connection, TlsMode}; + +// TODO: read config from environment +fn open_test_connection() -> Connection { + Connection::connect("postgres://finito:finito@localhost/finito", TlsMode::None) + .expect("Failed to connect to test database") +} + +#[test] +fn test_insert_machine() { + let conn = open_test_connection(); + let initial = DoorState::Opened; + let door = insert_machine(&conn, initial).expect("Failed to insert door"); + let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); + + assert_eq!(result, DoorState::Opened, "Inserted door state should match"); +} + +#[test] +fn test_advance() { + let conn = open_test_connection(); + + let initial = DoorState::Opened; + let events = vec![ + DoorEvent::Close, + DoorEvent::Open, + DoorEvent::Close, + DoorEvent::Lock(1234), + DoorEvent::Unlock(1234), + DoorEvent::Lock(4567), + DoorEvent::Unlock(1234), + ]; + + let door = insert_machine(&conn, initial).expect("Failed to insert door"); + + for event in events { + advance(&conn, &door, event).expect("Failed to advance door FSM"); + } + + let result = get_machine(&conn, &door, false).expect("Failed to fetch door"); + let expected = DoorState::Locked { code: 4567, attempts: 2 }; + + assert_eq!(result, expected, "Advanced door state should match"); +} From 406a90e8d696835f4b68e35f60f585af710c49c8 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 22:31:42 +0200 Subject: [PATCH 17/30] feat(postgres): Implement initial (synchronous) actoin execution Implements a simple model for executing actions which will run them in sequence, synchronously, after advancing an FSM and committing the initial transaction. Note that multiple things are not yet taken into account: * Error handling of actions (they can not currently fail) * Retrying of actions * Concurrency model I started out by implementing the concurrency model similarly to the green-threading method used in Hamingja (but using OS threads), but slowly noticed that it may not be the best way to do that. It needs a little bit of discussion. Either way for most actions this method is fast enough to work for implementing things on top of Finito's model. --- finito-postgres/src/lib.rs | 150 +++++++++++++++++++++++++++++++------ 1 file changed, 126 insertions(+), 24 deletions(-) diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 5ed9693bc..af8013142 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -22,6 +22,7 @@ pub use error::{Result, Error}; use chrono::prelude::{DateTime, Utc}; use finito::FSM; use postgres::GenericConnection; +use postgres::transaction::Transaction; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; @@ -69,7 +70,7 @@ struct EventT { } /// This enum represents the possible statuses an action can be in. -#[derive(Debug, ToSql, FromSql)] +#[derive(Debug, PartialEq, ToSql, FromSql)] #[postgres(name = "actionstatus")] enum ActionStatus { /// The action was requested but has not run yet. @@ -234,6 +235,15 @@ fn update_state(conn: &C, } } +/// Conditionally alter SQL statement to append locking clause inside +/// of a transaction. +fn alter_for_update(alter: bool, query: &str) -> String { + match alter { + false => query.to_string(), + true => format!("{} FOR UPDATE", query), + } +} + /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. @@ -242,34 +252,66 @@ pub fn get_machine(conn: &C, for_update: bool) -> Result where C: GenericConnection, S: FSM + DeserializeOwned { - let query = r#" - SELECT id, created, fsm, state FROM machines WHERE id = $1 - "#; - - // If the machine is being fetched in the context of a - // transaction, with the intention to update it, the relevant - // clause needs to be appended: - let query = match for_update { - false => query.to_string(), - true => format!("{} FOR UPDATE", query), - }; + let query = alter_for_update(for_update, r#" + SELECT state FROM machines WHERE id = $1 + "#); let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); - let mut machines = rows.into_iter().map(|row| MachineT { - id: id.to_uuid(), - created: row.get(1), - fsm: row.get(2), - state: row.get(3), - }); - if let Some(machine) = machines.next() { - Ok(serde_json::from_value(machine.state).expect("TODO")) + if let Some(row) = rows.into_iter().next() { + Ok(serde_json::from_value(row.get(0)).expect("TODO")) } else { // TODO: return appropriate not found error Err(Error::SomeError) } } +/// Retrieve an action from the database, optionally locking it for +/// the duration of some enclosing transaction. +fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> where + C: GenericConnection, + S: FSM, + S::Action: DeserializeOwned { + let query = alter_for_update(true, r#" + SELECT status, content FROM actions + WHERE id = $1 AND fsm = $2 + "#); + + let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO"); + + if let Some(row) = rows.into_iter().next() { + let action = serde_json::from_value(row.get(1)).expect("TODO"); + Ok((row.get(0), action)) + } else { + // TODO: return appropriate not found error + Err(Error::SomeError) + } +} + +/// Update the status of an action after an attempt to run it. +fn update_action_status(conn: &C, + id: Uuid, + status: ActionStatus, + error: Option, + _fsm: PhantomData) -> Result<()> where + C: GenericConnection, + S: FSM { + let query = r#" + UPDATE actions SET status = $1, error = $2 + WHERE id = $3 AND fsm = $4 + "#; + + let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME]) + .expect("TODO"); + + if result != 1 { + // TODO: Fail in the most gruesome way! + unimplemented!() + } + + Ok(()) +} + /// Advance a persisted state machine by applying an event, and /// storing the event as well as all resulting actions. /// @@ -287,24 +329,84 @@ pub fn advance(conn: &C, C: GenericConnection, S: FSM + Serialize + DeserializeOwned, S::Event: Serialize, - S::Action: Serialize { + S::Action: Serialize + DeserializeOwned { let tx = conn.transaction().expect("TODO"); - let state = get_machine(&tx, id, true).expect("TODO"); + let state = get_machine(&tx, id, true)?; // Advancing the FSM consumes the event, so it is persisted first: - let event_id = insert_event(&tx, id, &event).expect("TODO"); + let event_id = insert_event(&tx, id, &event)?; // Core advancing logic is run: let (new_state, actions) = finito::advance(state, event); // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; for action in actions { - insert_action(&tx, id, event_id, &action).expect("TODO"); + let action_id = insert_action(&tx, id, event_id, &action)?; + action_ids.push(action_id); } // And finally the state is updated: update_state(&tx, id, &new_state).expect("TODO"); tx.commit().expect("TODO"); + run_actions(conn, id, action_ids); + Ok(new_state) } + +/// Execute a single action in case it is pending or retryable. Holds +/// a lock on the action's database row while performing the action +/// and writes back the status afterwards. +/// +/// Should the execution of an action fail cleanly (i.e. without a +/// panic), the error will be persisted. Should it fail by panicking +/// (which developers should never do explicitly in action +/// interpreters) its status will not be changed. +fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) + -> Result> where + S: FSM, + S::Action: DeserializeOwned { + let (status, action) = get_action::(&tx, id)?; + + let result = match status { + ActionStatus::Pending => { + let events = ::act(action); + update_action_status( + &tx, id, ActionStatus::Completed, None, PhantomData:: + )?; + + events + }, + + _ => { + // TODO: Currently only pending actions are run because + // retryable actions are not yet implemented. + vec![] + }, + }; + + tx.commit().expect("TODO"); + Ok(result) +} + +/// Execute several actions at the same time, each in a separate +/// thread. Note that actions returning further events, causing +/// further transitions, returning further actions and so on will +/// potentially cause multiple threads to get created. +fn run_actions(conn: &C, fsm_id: &MachineId, action_ids: Vec) where + C: GenericConnection, + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize, + S::Action: Serialize + DeserializeOwned { + for action_id in action_ids { + let tx = conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, PhantomData::) { + for event in events { + advance(conn, fsm_id, event).expect("TODO"); + } + } + } +} From 37590ae0f61187f080fe5b09d037ad428b4b1db4 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 23:05:50 +0200 Subject: [PATCH 18/30] feat(core): Add associated 'Error' type to FSM trait Adds an associated 'Error' type that can be returned by actions when an interpretation fails. --- finito-core/src/lib.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 2f27a2bdf..8770a239d 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -108,6 +108,7 @@ //! //! Please reach out! I want to know why! +use std::fmt::Debug; use std::mem; /// Primary trait that needs to be implemented for every state type @@ -129,6 +130,10 @@ pub trait FSM where Self: Sized { /// actions that can occur in the state-machine. type Action; + /// The associated error type of an FSM represents failures that + /// can occur during action processing. + type Error: Debug; + /// `handle` deals with any incoming events to cause state /// transitions and emit actions. This function is the core logic /// of any state machine. @@ -152,7 +157,7 @@ pub trait FSM where Self: Sized { /// `act` interprets and executes FSM actions. This is the only /// part of an FSM in which side-effects are allowed. - fn act(Self::Action) -> Vec; + fn act(Self::Action) -> Result, Self::Error>; } /// This function is the primary function used to advance a state From c4b94d8d2dc00c0742cf7af1c0fd2c1256f75078 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 23:18:26 +0200 Subject: [PATCH 19/30] feat(door): Use failure::Error as associated error type Implements the associated error type for the FSM trait as failure::Error. This makes it possible to fail gracefully in all actions, for example in the updated definition of the `NotifyIRC` action which now appends to a file. --- finito-door/Cargo.toml | 1 + finito-door/src/lib.rs | 24 +++++++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/finito-door/Cargo.toml b/finito-door/Cargo.toml index ad1cf22d4..d632d1108 100644 --- a/finito-door/Cargo.toml +++ b/finito-door/Cargo.toml @@ -4,6 +4,7 @@ version = "0.1.0" authors = ["Vincent Ambo "] [dependencies] +failure = "0.1" serde = "1.0" serde_derive = "1.0" diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index 074556c76..e6a3099e9 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -72,6 +72,8 @@ //! Alright, enough foreplay, lets dive in! #[macro_use] extern crate serde_derive; + +extern crate failure; extern crate finito; use finito::FSM; @@ -170,6 +172,11 @@ impl FSM for DoorState { type Event = DoorEvent; type Action = DoorAction; + // For error handling, the door simply uses `failure` which provides a + // generic, chainable error type. In real-world implementations you may want + // to use a custom error type or similar. + type Error = failure::Error; + // The implementation of `handle` provides us with the actual transition // logic of the door. // @@ -254,18 +261,25 @@ impl FSM for DoorState { // Additionally the `act` function can return new events. This is useful for // a sort of "callback-like" pattern (cause an action to fetch some data, // receive it as an event) but is not used in this example. - fn act(action: DoorAction) -> Vec { + fn act(action: DoorAction) -> Result, failure::Error> { match action { DoorAction::NotifyIRC(msg) => { - // TODO: write to file in example - println!("IRC: {}", msg); - vec![] + use std::fs::OpenOptions; + use std::io::Write; + + let mut file = OpenOptions::new() + .append(true) + .create(true) + .open("/tmp/door-irc.log")?; + + write!(file, " {}\n", msg)?; + Ok(vec![]) } DoorAction::CallThePolice => { // TODO: call the police println!("The police was called! For real!"); - vec![] + Ok(vec![]) } } } From 45afa18846556c3486e6922108ed6e8fcf6ec125 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 26 Sep 2018 23:19:34 +0200 Subject: [PATCH 20/30] feat(postgres): Compatibility with new associated error type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Changes the implementation of action execution to deal with the returned associated errors. The only sensible constraint on those errors that I could think of for now is `Debug`, meaning that errors are now persisted as debug messages. This is not as nice to work with for a future implementation of retryable actions as the equivalent in Haskell, but maybe an idea shows up underway. The main issue is that most of the common error types will not be implementing Serde traits, so serialization to/from the same error type is difficult. Adding an implementation constraint for JSON serialisation on error types (i.e. `S::Error: Serialize + Deserialize`) would probably cause headaches for users, especially if they are trying to use an out-of-the-box error type or an error type wrapping foreign errors. Det ska'kke være lett ... --- .../up.sql | 2 +- finito-postgres/src/lib.rs | 33 +++++++++++++------ finito-postgres/src/tests.rs | 2 +- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql index 0de1a9e3c..18ace393b 100644 --- a/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql +++ b/finito-postgres/migrations/2018-09-26-160621_bootstrap_finito_schema/up.sql @@ -30,7 +30,7 @@ CREATE TABLE actions ( event_id UUID NOT NULL REFERENCES events(id), content JSONB NOT NULL, status ActionStatus NOT NULL, - error JSONB + error TEXT ); CREATE INDEX idx_actions_machines ON actions(fsm_id); diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index af8013142..ffb2532e5 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -110,10 +110,9 @@ struct ActionT { /// Current status of the action. status: ActionStatus, - /// Serialised error representation, if an error occured during - /// processing. TODO: Use some actual error type. Maybe failure - /// has serialisation support? - error: Option, + /// Detailed (i.e. Debug-trait formatted) error message, if an + /// error occured during action processing. + error: Option, } // The following functions implement the public interface of @@ -292,7 +291,7 @@ fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe fn update_action_status(conn: &C, id: Uuid, status: ActionStatus, - error: Option, + error: Option, _fsm: PhantomData) -> Result<()> where C: GenericConnection, S: FSM { @@ -371,12 +370,26 @@ fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) let result = match status { ActionStatus::Pending => { - let events = ::act(action); - update_action_status( - &tx, id, ActionStatus::Completed, None, PhantomData:: - )?; + match ::act(action) { + // If the action succeeded, update its status to + // completed and return the created events. + Ok(events) => { + update_action_status( + &tx, id, ActionStatus::Completed, None, PhantomData:: + )?; + events + }, - events + // If the action failed, persist the debug message and + // return nothing. + Err(err) => { + let msg = Some(format!("{:?}", err)); + update_action_status( + &tx, id, ActionStatus::Failed, msg, PhantomData:: + )?; + vec![] + }, + } }, _ => { diff --git a/finito-postgres/src/tests.rs b/finito-postgres/src/tests.rs index 55ada1329..b1b5821be 100644 --- a/finito-postgres/src/tests.rs +++ b/finito-postgres/src/tests.rs @@ -1,5 +1,5 @@ use super::*; -use finito; + use finito_door::*; use postgres::{Connection, TlsMode}; From 6ff56f860b9798d56d7e8eec7e019594375f5e96 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Wed, 24 Oct 2018 08:50:53 +0200 Subject: [PATCH 21/30] feat(core): Add a trait representing backend implementations First version of a trait to abstract over backend implementations. Currently the different backends still have a bit of specific behaviour, but it should be possible to boil this down to a single trait. The primary question that is still open is how to best deal with the interpretation of actions, as it is sort of up to the backend to decide how to do that. --- finito-core/src/lib.rs | 47 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 8770a239d..0ce17cc28 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -134,6 +134,11 @@ pub trait FSM where Self: Sized { /// can occur during action processing. type Error: Debug; + /// The associated state type of an FSM describes the state that + /// is made available to the implementation of action + /// interpretations. + type State; + /// `handle` deals with any incoming events to cause state /// transitions and emit actions. This function is the core logic /// of any state machine. @@ -157,7 +162,7 @@ pub trait FSM where Self: Sized { /// `act` interprets and executes FSM actions. This is the only /// part of an FSM in which side-effects are allowed. - fn act(Self::Action) -> Result, Self::Error>; + fn act(Self::Action, Self::State) -> Result, Self::Error>; } /// This function is the primary function used to advance a state @@ -187,3 +192,43 @@ pub fn advance(state: S, event: S::Event) -> (S, Vec) { (new_state, actions) } + +/// This trait is implemented by Finito backends. Backends are +/// expected to be able to keep track of the current state of an FSM +/// and retrieve it / apply updates transactionally. +/// +/// See the `finito-postgres` and `finito-in-mem` crates for example +/// implementations of this trait. +pub trait FSMBackend { + /// Custom state type that is made available to action handlers by + /// the backend. + /// + /// TODO: Something something `Into for State`. + type State; + + /// Key type used to identify individual state machines in this + /// backend. + /// + /// TODO: Should be parameterised over FSM type after rustc + /// #44265. + type Key; + + /// Error type for all potential failures that can occur when + /// interacting with this backend. + type Error: Debug; + + /// Insert a new state-machine into the backend's storage and + /// return its newly allocated key. + fn insert_machine(&self, initial: S) -> Result; + + /// Retrieve the current state of an FSM by its key. + fn get_machine(&self, key: Self::Key) -> Result; + + /// Advance a state machine by applying an event and persisting it + /// as well as any resulting actions. + /// + /// **Note**: Whether actions are automatically executed depends + /// on the backend used. Please consult the backend's + /// documentation for details. + fn advance(&self, key: Self::Key, event: S::Event) -> Result; +} From 8c5ab60ac3a6af2d6bc8da904a55fbe84baf3c25 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 19 Nov 2018 13:51:14 +0100 Subject: [PATCH 22/30] chore: Fix email address in Cargo files --- finito-core/Cargo.toml | 2 +- finito-door/Cargo.toml | 2 +- finito-postgres/Cargo.toml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/finito-core/Cargo.toml b/finito-core/Cargo.toml index 00b00a9a7..c3a377dd2 100644 --- a/finito-core/Cargo.toml +++ b/finito-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "finito" version = "0.1.0" -authors = ["Vincent Ambo "] +authors = ["Vincent Ambo "] [dependencies] diff --git a/finito-door/Cargo.toml b/finito-door/Cargo.toml index d632d1108..32c0a5a7c 100644 --- a/finito-door/Cargo.toml +++ b/finito-door/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "finito-door" version = "0.1.0" -authors = ["Vincent Ambo "] +authors = ["Vincent Ambo "] [dependencies] failure = "0.1" diff --git a/finito-postgres/Cargo.toml b/finito-postgres/Cargo.toml index f32a30d5b..12bcef8c2 100644 --- a/finito-postgres/Cargo.toml +++ b/finito-postgres/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "finito-postgres" version = "0.1.0" -authors = ["Vincent Ambo "] +authors = ["Vincent Ambo "] [dependencies] chrono = "0.4" From 536793dbbb6eba468cf7bf98d1798a28d196cd6f Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 22 Nov 2018 16:59:42 +0100 Subject: [PATCH 23/30] fix(door): Update trait impl with `State` type The door does not actually require any state, so it just uses an empty tuple. --- finito-door/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index e6a3099e9..2066ed81e 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -171,6 +171,7 @@ impl FSM for DoorState { // type is 'DoorAction'. type Event = DoorEvent; type Action = DoorAction; + type State = (); // For error handling, the door simply uses `failure` which provides a // generic, chainable error type. In real-world implementations you may want @@ -261,7 +262,7 @@ impl FSM for DoorState { // Additionally the `act` function can return new events. This is useful for // a sort of "callback-like" pattern (cause an action to fetch some data, // receive it as an event) but is not used in this example. - fn act(action: DoorAction) -> Result, failure::Error> { + fn act(action: DoorAction, _state: ()) -> Result, failure::Error> { match action { DoorAction::NotifyIRC(msg) => { use std::fs::OpenOptions; From 1a90856ba45a26dccfd9e68d3a9b036d7ceeff82 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 12:08:57 +0100 Subject: [PATCH 24/30] feat(core): Add serde trait bounds to FSM types These are required by multiple backend implementations, and any attempt at making them optional will result in less appreciable APIs. --- finito-core/Cargo.toml | 1 + finito-core/src/lib.rs | 29 +++++++++++++++++++---------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/finito-core/Cargo.toml b/finito-core/Cargo.toml index c3a377dd2..1d7bdb8b0 100644 --- a/finito-core/Cargo.toml +++ b/finito-core/Cargo.toml @@ -4,3 +4,4 @@ version = "0.1.0" authors = ["Vincent Ambo "] [dependencies] +serde = "1.0" diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 0ce17cc28..0dda30ae9 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -108,6 +108,10 @@ //! //! Please reach out! I want to know why! +extern crate serde; + +use serde::Serialize; +use serde::de::DeserializeOwned; use std::fmt::Debug; use std::mem; @@ -199,13 +203,12 @@ pub fn advance(state: S, event: S::Event) -> (S, Vec) { /// /// See the `finito-postgres` and `finito-in-mem` crates for example /// implementations of this trait. -pub trait FSMBackend { - /// Custom state type that is made available to action handlers by - /// the backend. - /// - /// TODO: Something something `Into for State`. - type State; - +/// +/// Backends must be parameterised over an additional (user-supplied) +/// state type which can be used to track application state that must +/// be made available to action handlers, for example to pass along +/// database connections. +pub trait FSMBackend { /// Key type used to identify individual state machines in this /// backend. /// @@ -219,10 +222,12 @@ pub trait FSMBackend { /// Insert a new state-machine into the backend's storage and /// return its newly allocated key. - fn insert_machine(&self, initial: S) -> Result; + fn insert_machine(&self, initial: F) -> Result + where F: FSM + Serialize + DeserializeOwned; /// Retrieve the current state of an FSM by its key. - fn get_machine(&self, key: Self::Key) -> Result; + fn get_machine(&self, key: Self::Key) -> Result + where F: FSM + Serialize + DeserializeOwned; /// Advance a state machine by applying an event and persisting it /// as well as any resulting actions. @@ -230,5 +235,9 @@ pub trait FSMBackend { /// **Note**: Whether actions are automatically executed depends /// on the backend used. Please consult the backend's /// documentation for details. - fn advance(&self, key: Self::Key, event: S::Event) -> Result; + fn advance(&self, key: Self::Key, event: F::Event) -> Result + where F: FSM + Serialize + DeserializeOwned, + F::State: From, + F::Event: Serialize + DeserializeOwned, + F::Action: Serialize + DeserializeOwned; } From 183ee2accc06680be470911aa0d1f6744e3e9ba7 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 13:39:16 +0100 Subject: [PATCH 25/30] fix(core): Ensure FSM state can be created from backend state ref The action interpreter can not own the backend state, hence it must be possible to create the required state from a reference to the backend's state. --- finito-core/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/finito-core/src/lib.rs b/finito-core/src/lib.rs index 0dda30ae9..517bfad2b 100644 --- a/finito-core/src/lib.rs +++ b/finito-core/src/lib.rs @@ -166,7 +166,7 @@ pub trait FSM where Self: Sized { /// `act` interprets and executes FSM actions. This is the only /// part of an FSM in which side-effects are allowed. - fn act(Self::Action, Self::State) -> Result, Self::Error>; + fn act(Self::Action, &Self::State) -> Result, Self::Error>; } /// This function is the primary function used to advance a state @@ -208,7 +208,7 @@ pub fn advance(state: S, event: S::Event) -> (S, Vec) { /// state type which can be used to track application state that must /// be made available to action handlers, for example to pass along /// database connections. -pub trait FSMBackend { +pub trait FSMBackend { /// Key type used to identify individual state machines in this /// backend. /// @@ -235,9 +235,9 @@ pub trait FSMBackend { /// **Note**: Whether actions are automatically executed depends /// on the backend used. Please consult the backend's /// documentation for details. - fn advance(&self, key: Self::Key, event: F::Event) -> Result + fn advance<'a, F: FSM>(&'a self, key: Self::Key, event: F::Event) -> Result where F: FSM + Serialize + DeserializeOwned, - F::State: From, + F::State: From<&'a S>, F::Event: Serialize + DeserializeOwned, F::Action: Serialize + DeserializeOwned; } From 43f71ae82fa6d68abb0486601d9477b82fb42628 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 13:40:54 +0100 Subject: [PATCH 26/30] refactor(postgres): Implement FSMBackend trait for FinitoPostgres Implements the new backend trait for the FinitoPostgres type which now represents instances of the Postgres backend. This refactoring is not yet fully complete, as some restructuring of the code is in order. --- finito-postgres/src/lib.rs | 236 ++++++++++++++++++------------------- 1 file changed, 112 insertions(+), 124 deletions(-) diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index ffb2532e5..46309a04c 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -20,35 +20,15 @@ mod error; pub use error::{Result, Error}; use chrono::prelude::{DateTime, Utc}; -use finito::FSM; -use postgres::GenericConnection; +use finito::{FSM, FSMBackend}; +use postgres::{Connection, GenericConnection}; use postgres::transaction::Transaction; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; -use std::fmt; use std::marker::PhantomData; use uuid::Uuid; -/// This struct represents rows in the database table in which -/// machines (i.e. the current state of a Finito state machine) are -/// persisted. -#[derive(Debug, ToSql, FromSql)] -struct MachineT { - /// ID of the persisted state machine. - id: Uuid, - - /// Time at which the FSM was first created. - created: DateTime, - - /// Name of the type of FSM represented by this state. - fsm: String, - - /// Current state of the FSM (TODO: Can the serialised FSM type be - /// used?) - state: Value, -} - /// This struct represents rows in the database table in which events /// are persisted. #[derive(Debug, ToSql, FromSql)] @@ -118,41 +98,110 @@ struct ActionT { // The following functions implement the public interface of // `finito-postgres`. -/// This type is used as a type-safe wrapper around the ID of a state -/// machine. It carries information about the FSM type and is intended -/// to add a layer of checking to prevent IDs from being mixed up. -#[derive(Clone)] -pub struct MachineId { - uuid: Uuid, - phantom: PhantomData, +/// TODO: Write docs for this type, brain does not want to do it right +/// now. +pub struct FinitoPostgres { + state: S, + // TODO: Use connection pool? + conn: Connection, } -/// Custom debug implementation to format machine IDs using the name -/// of the FSM and their UUID. -impl fmt::Debug for MachineId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}:{}", S::FSM_NAME, self.uuid.hyphenated()) +impl FSMBackend for FinitoPostgres { + type Key = Uuid; + type Error = Error; + + fn insert_machine(&self, initial: S) -> Result { + let query = r#" + INSERT INTO machines (id, fsm, state) + VALUES ($1, $2, $3) + "#; + + let id = Uuid::new_v4(); + let fsm = S::FSM_NAME.to_string(); + let state = serde_json::to_value(initial).expect("TODO"); + + self.conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + + return Ok(id); + + } + + fn get_machine(&self, key: Uuid) -> Result { + get_machine_internal(&self.conn, key, false) + } + + /// Advance a persisted state machine by applying an event, and + /// storing the event as well as all resulting actions. + /// + /// This function holds a database-lock on the state's row while + /// advancing the machine. + /// + /// **Note**: This function returns the new state of the machine + /// immediately after applying the event, however this does not + /// necessarily equate to the state of the machine after all related + /// processing is finished as running actions may result in additional + /// transitions. + fn advance<'a, S>(&'a self, key: Uuid, event: S::Event) -> Result + where S: FSM + Serialize + DeserializeOwned, + S::State: From<&'a State>, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned { + let tx = self.conn.transaction().expect("TODO"); + let state = get_machine_internal(&tx, key, true)?; + + // Advancing the FSM consumes the event, so it is persisted first: + let event_id = insert_event::<_, S>(&tx, key, &event)?; + + // Core advancing logic is run: + let (new_state, actions) = finito::advance(state, event); + + // Resulting actions are persisted (TODO: and interpreted) + let mut action_ids = vec![]; + for action in actions { + let action_id = insert_action::<_, S>(&tx, key, event_id, &action)?; + action_ids.push(action_id); + } + + // And finally the state is updated: + update_state(&tx, key, &new_state).expect("TODO"); + tx.commit().expect("TODO"); + + self.run_actions::(key, action_ids); + + Ok(new_state) } } -impl MachineId { - /// Convert a UUID into a strongly typed machine ID. - pub fn from_uuid(uuid: Uuid) -> Self { - MachineId { - uuid, - phantom: PhantomData, +impl FinitoPostgres { + /// Execute several actions at the same time, each in a separate + /// thread. Note that actions returning further events, causing + /// further transitions, returning further actions and so on will + /// potentially cause multiple threads to get created. + fn run_actions<'a, S>(&'a self, fsm_id: Uuid, action_ids: Vec) where + S: FSM + Serialize + DeserializeOwned, + S::Event: Serialize + DeserializeOwned, + S::Action: Serialize + DeserializeOwned, + S::State: From<&'a State> { + let state: S::State = (&self.state).into(); + + for action_id in action_ids { + let tx = self.conn.transaction().expect("TODO"); + + // TODO: Determine which concurrency setup we actually want. + if let Ok(events) = run_action(tx, action_id, &state, PhantomData::) { + for event in events { + self.advance::(fsm_id, event).expect("TODO"); + } + } } } - - /// Return the UUID contained in a machine ID. - pub fn to_uuid(&self) -> Uuid { - self.uuid - } } + + /// Insert a single state-machine into the database and return its /// newly allocated, random UUID. -pub fn insert_machine(conn: &C, initial: S) -> Result> where +pub fn insert_machine(conn: &C, initial: S) -> Result where C: GenericConnection, S: FSM + Serialize { let query = r#" @@ -166,12 +215,12 @@ pub fn insert_machine(conn: &C, initial: S) -> Result> where conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); - return Ok(MachineId::from_uuid(id)); + return Ok(id); } /// Insert a single event into the database and return its UUID. fn insert_event(conn: &C, - fsm_id: &MachineId, + fsm_id: Uuid, event: &S::Event) -> Result where C: GenericConnection, @@ -186,13 +235,13 @@ where let fsm = S::FSM_NAME.to_string(); let event_value = serde_json::to_value(event).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_value]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id, &event_value]).expect("TODO"); return Ok(id) } /// Insert a single action into the database and return its UUID. fn insert_action(conn: &C, - fsm_id: &MachineId, + fsm_id: Uuid, event_id: Uuid, action: &S::Action) -> Result where C: GenericConnection, @@ -207,14 +256,17 @@ fn insert_action(conn: &C, let fsm = S::FSM_NAME.to_string(); let action_value = serde_json::to_value(action).expect("TODO"); - conn.execute(query, &[&id, &fsm, &fsm_id.to_uuid(), &event_id, - &action_value, &ActionStatus::Pending]).expect("TODO"); + conn.execute( + query, + &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] + ).expect("TODO"); + return Ok(id) } /// Update the state of a specified machine. fn update_state(conn: &C, - fsm_id: &MachineId, + fsm_id: Uuid, state: &S) -> Result<()> where C: GenericConnection, S: FSM + Serialize { @@ -223,7 +275,7 @@ fn update_state(conn: &C, "#; let state_value = serde_json::to_value(state).expect("TODO"); - let res_count = conn.execute(query, &[&state_value, &fsm_id.to_uuid()]) + let res_count = conn.execute(query, &[&state_value, &fsm_id]) .expect("TODO"); if res_count != 1 { @@ -246,16 +298,16 @@ fn alter_for_update(alter: bool, query: &str) -> String { /// Retrieve the current state of a state machine from the database, /// optionally locking the machine state for the duration of some /// enclosing transaction. -pub fn get_machine(conn: &C, - id: &MachineId, - for_update: bool) -> Result where +fn get_machine_internal(conn: &C, + id: Uuid, + for_update: bool) -> Result where C: GenericConnection, S: FSM + DeserializeOwned { let query = alter_for_update(for_update, r#" SELECT state FROM machines WHERE id = $1 "#); - let rows = conn.query(&query, &[&id.to_uuid()]).expect("TODO"); + let rows = conn.query(&query, &[&id]).expect("TODO"); if let Some(row) = rows.into_iter().next() { Ok(serde_json::from_value(row.get(0)).expect("TODO")) @@ -311,49 +363,6 @@ fn update_action_status(conn: &C, Ok(()) } -/// Advance a persisted state machine by applying an event, and -/// storing the event as well as all resulting actions. -/// -/// This function holds a database-lock on the state's row while -/// advancing the machine. -/// -/// **Note**: This function returns the new state of the machine -/// immediately after applying the event, however this does not -/// necessarily equate to the state of the machine after all related -/// processing is finished as running actions may result in additional -/// transitions. -pub fn advance(conn: &C, - id: &MachineId, - event: S::Event) -> Result where - C: GenericConnection, - S: FSM + Serialize + DeserializeOwned, - S::Event: Serialize, - S::Action: Serialize + DeserializeOwned { - let tx = conn.transaction().expect("TODO"); - let state = get_machine(&tx, id, true)?; - - // Advancing the FSM consumes the event, so it is persisted first: - let event_id = insert_event(&tx, id, &event)?; - - // Core advancing logic is run: - let (new_state, actions) = finito::advance(state, event); - - // Resulting actions are persisted (TODO: and interpreted) - let mut action_ids = vec![]; - for action in actions { - let action_id = insert_action(&tx, id, event_id, &action)?; - action_ids.push(action_id); - } - - // And finally the state is updated: - update_state(&tx, id, &new_state).expect("TODO"); - tx.commit().expect("TODO"); - - run_actions(conn, id, action_ids); - - Ok(new_state) -} - /// Execute a single action in case it is pending or retryable. Holds /// a lock on the action's database row while performing the action /// and writes back the status afterwards. @@ -362,7 +371,7 @@ pub fn advance(conn: &C, /// panic), the error will be persisted. Should it fail by panicking /// (which developers should never do explicitly in action /// interpreters) its status will not be changed. -fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) +fn run_action(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData) -> Result> where S: FSM, S::Action: DeserializeOwned { @@ -370,7 +379,7 @@ fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) let result = match status { ActionStatus::Pending => { - match ::act(action) { + match S::act(action, state) { // If the action succeeded, update its status to // completed and return the created events. Ok(events) => { @@ -402,24 +411,3 @@ fn run_action(tx: Transaction, id: Uuid, _fsm: PhantomData) tx.commit().expect("TODO"); Ok(result) } - -/// Execute several actions at the same time, each in a separate -/// thread. Note that actions returning further events, causing -/// further transitions, returning further actions and so on will -/// potentially cause multiple threads to get created. -fn run_actions(conn: &C, fsm_id: &MachineId, action_ids: Vec) where - C: GenericConnection, - S: FSM + Serialize + DeserializeOwned, - S::Event: Serialize, - S::Action: Serialize + DeserializeOwned { - for action_id in action_ids { - let tx = conn.transaction().expect("TODO"); - - // TODO: Determine which concurrency setup we actually want. - if let Ok(events) = run_action(tx, action_id, PhantomData::) { - for event in events { - advance(conn, fsm_id, event).expect("TODO"); - } - } - } -} From 68060fea13cdc26568b1a51e1bf4326885c23d63 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 14:40:59 +0100 Subject: [PATCH 27/30] feat(postgres): Introduce chained error variants Introduces error variants for external crate errors and internal errors. Additional context can be provided at sites where errors occur using a simple `.context()` call. --- finito-postgres/src/error.rs | 62 +++++++++++++++++++++++++++++++++++- finito-postgres/src/lib.rs | 62 +++++++++++++++++------------------- 2 files changed, 91 insertions(+), 33 deletions(-) diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index 0fb30a99d..aacc219f0 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -2,8 +2,68 @@ //! occur while dealing with persisted state machines. use std::result; +use std::fmt::Display; +use uuid::Uuid; + +// errors to chain: +use serde_json::Error as JsonError; +use postgres::Error as PgError; pub type Result = result::Result; #[derive(Debug)] -pub enum Error { SomeError } +pub struct Error { + pub kind: ErrorKind, + pub context: Option, +} + +#[derive(Debug)] +pub enum ErrorKind { + /// Errors occuring during JSON serialization of FSM types. + Serialization(String), + + /// Errors occuring during communication with the database. + Database(String), + + /// State machine could not be found. + FSMNotFound(Uuid), + + /// Action could not be found. + ActionNotFound(Uuid), +} + +impl > From for Error { + fn from(err: E) -> Error { + Error { + kind: err.into(), + context: None, + } + } +} + +impl From for ErrorKind { + fn from(err: JsonError) -> ErrorKind { + ErrorKind::Serialization(err.to_string()) + } +} + +impl From for ErrorKind { + fn from(err: PgError) -> ErrorKind { + ErrorKind::Database(err.to_string()) + } +} + +/// Helper trait that makes it possible to supply contextual +/// information with an error. +pub trait ResultExt { + fn context(self, ctx: C) -> Result; +} + +impl > ResultExt for result::Result { + fn context(self, ctx: C) -> Result { + self.map_err(|err| Error { + context: Some(format!("{}", ctx)), + .. err.into() + }) + } +} diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 46309a04c..844e8f79f 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -17,8 +17,9 @@ extern crate uuid; #[cfg(test)] extern crate finito_door; mod error; -pub use error::{Result, Error}; +pub use error::{Result, Error, ErrorKind}; +use error::ResultExt; use chrono::prelude::{DateTime, Utc}; use finito::{FSM, FSMBackend}; use postgres::{Connection, GenericConnection}; @@ -118,9 +119,9 @@ impl FSMBackend for FinitoPostgres { let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let state = serde_json::to_value(initial).expect("TODO"); + let state = serde_json::to_value(initial).context("failed to serialise FSM")?; - self.conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + self.conn.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; return Ok(id); @@ -146,7 +147,7 @@ impl FSMBackend for FinitoPostgres { S::State: From<&'a State>, S::Event: Serialize + DeserializeOwned, S::Action: Serialize + DeserializeOwned { - let tx = self.conn.transaction().expect("TODO"); + let tx = self.conn.transaction().context("could not begin transaction")?; let state = get_machine_internal(&tx, key, true)?; // Advancing the FSM consumes the event, so it is persisted first: @@ -163,8 +164,8 @@ impl FSMBackend for FinitoPostgres { } // And finally the state is updated: - update_state(&tx, key, &new_state).expect("TODO"); - tx.commit().expect("TODO"); + update_state(&tx, key, &new_state)?; + tx.commit().context("could not commit transaction")?; self.run_actions::(key, action_ids); @@ -211,9 +212,9 @@ pub fn insert_machine(conn: &C, initial: S) -> Result where let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let state = serde_json::to_value(initial).expect("TODO"); + let state = serde_json::to_value(initial).context("failed to serialize FSM")?; - conn.execute(query, &[&id, &fsm, &state]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &state])?; return Ok(id); } @@ -233,9 +234,10 @@ where let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let event_value = serde_json::to_value(event).expect("TODO"); + let event_value = serde_json::to_value(event) + .context("failed to serialize event")?; - conn.execute(query, &[&id, &fsm, &fsm_id, &event_value]).expect("TODO"); + conn.execute(query, &[&id, &fsm, &fsm_id, &event_value])?; return Ok(id) } @@ -254,12 +256,13 @@ fn insert_action(conn: &C, let id = Uuid::new_v4(); let fsm = S::FSM_NAME.to_string(); - let action_value = serde_json::to_value(action).expect("TODO"); + let action_value = serde_json::to_value(action) + .context("failed to serialize action")?; conn.execute( query, &[&id, &fsm, &fsm_id, &event_id, &action_value, &ActionStatus::Pending] - ).expect("TODO"); + )?; return Ok(id) } @@ -274,13 +277,11 @@ fn update_state(conn: &C, UPDATE machines SET state = $1 WHERE id = $2 "#; - let state_value = serde_json::to_value(state).expect("TODO"); - let res_count = conn.execute(query, &[&state_value, &fsm_id]) - .expect("TODO"); + let state_value = serde_json::to_value(state).context("failed to serialize FSM")?; + let res_count = conn.execute(query, &[&state_value, &fsm_id])?; if res_count != 1 { - // TODO: not found error! - unimplemented!() + Err(ErrorKind::FSMNotFound(fsm_id).into()) } else { Ok(()) } @@ -307,13 +308,12 @@ fn get_machine_internal(conn: &C, SELECT state FROM machines WHERE id = $1 "#); - let rows = conn.query(&query, &[&id]).expect("TODO"); + let rows = conn.query(&query, &[&id]).context("failed to retrieve FSM")?; if let Some(row) = rows.into_iter().next() { - Ok(serde_json::from_value(row.get(0)).expect("TODO")) + Ok(serde_json::from_value(row.get(0)).context("failed to deserialize FSM")?) } else { - // TODO: return appropriate not found error - Err(Error::SomeError) + Err(ErrorKind::FSMNotFound(id).into()) } } @@ -328,14 +328,14 @@ fn get_action(conn: &C, id: Uuid) -> Result<(ActionStatus, S::Action)> whe WHERE id = $1 AND fsm = $2 "#); - let rows = conn.query(&query, &[&id, &S::FSM_NAME]).expect("TODO"); + let rows = conn.query(&query, &[&id, &S::FSM_NAME])?; if let Some(row) = rows.into_iter().next() { - let action = serde_json::from_value(row.get(1)).expect("TODO"); + let action = serde_json::from_value(row.get(1)) + .context("failed to deserialize FSM action")?; Ok((row.get(0), action)) } else { - // TODO: return appropriate not found error - Err(Error::SomeError) + Err(ErrorKind::ActionNotFound(id).into()) } } @@ -352,15 +352,13 @@ fn update_action_status(conn: &C, WHERE id = $3 AND fsm = $4 "#; - let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME]) - .expect("TODO"); + let result = conn.execute(&query, &[&status, &error, &id, &S::FSM_NAME])?; if result != 1 { - // TODO: Fail in the most gruesome way! - unimplemented!() + Err(ErrorKind::ActionNotFound(id).into()) + } else { + Ok(()) } - - Ok(()) } /// Execute a single action in case it is pending or retryable. Holds @@ -408,6 +406,6 @@ fn run_action(tx: Transaction, id: Uuid, state: &S::State, _fsm: PhantomData< }, }; - tx.commit().expect("TODO"); + tx.commit().context("failed to commit transaction")?; Ok(result) } From e801b5853c1d06d1dc5cb43eadba0069b7e6fa85 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 15:01:05 +0100 Subject: [PATCH 28/30] feat(postgres): Add human-readable Display implementation for errors --- finito-postgres/src/error.rs | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index aacc219f0..0bf7f4018 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -2,8 +2,9 @@ //! occur while dealing with persisted state machines. use std::result; -use std::fmt::Display; +use std::fmt; use uuid::Uuid; +use std::error::Error as StdError; // errors to chain: use serde_json::Error as JsonError; @@ -32,6 +33,32 @@ pub enum ErrorKind { ActionNotFound(Uuid), } +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + use ErrorKind::*; + let msg = match &self.kind { + Serialization(err) => + format!("JSON serialization error: {}", err), + + Database(err) => + format!("PostgreSQL error: {}", err), + + FSMNotFound(id) => + format!("FSM with ID {} not found", id), + + ActionNotFound(id) => + format!("Action with ID {} not found", id), + }; + + match &self.context { + None => write!(f, "{}", msg), + Some(ctx) => write!(f, "{}: {}", ctx, msg), + } + } +} + +impl StdError for Error {} + impl > From for Error { fn from(err: E) -> Error { Error { @@ -56,11 +83,11 @@ impl From for ErrorKind { /// Helper trait that makes it possible to supply contextual /// information with an error. pub trait ResultExt { - fn context(self, ctx: C) -> Result; + fn context(self, ctx: C) -> Result; } impl > ResultExt for result::Result { - fn context(self, ctx: C) -> Result { + fn context(self, ctx: C) -> Result { self.map_err(|err| Error { context: Some(format!("{}", ctx)), .. err.into() From 42d56713bd6bbaf213cac37ff85fb1c64188fec5 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 15:08:26 +0100 Subject: [PATCH 29/30] fix(door): Ensure compatibility with updated FSM trait --- finito-door/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/finito-door/src/lib.rs b/finito-door/src/lib.rs index 2066ed81e..68542c0bc 100644 --- a/finito-door/src/lib.rs +++ b/finito-door/src/lib.rs @@ -262,7 +262,7 @@ impl FSM for DoorState { // Additionally the `act` function can return new events. This is useful for // a sort of "callback-like" pattern (cause an action to fetch some data, // receive it as an event) but is not used in this example. - fn act(action: DoorAction, _state: ()) -> Result, failure::Error> { + fn act(action: DoorAction, _state: &()) -> Result, failure::Error> { match action { DoorAction::NotifyIRC(msg) => { use std::fs::OpenOptions; From b7481172252d6f00546e94534b05d011b4105843 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 13 Dec 2018 16:53:45 +0100 Subject: [PATCH 30/30] feat(postgres): Introduce database connection pool Adds an r2d2 database connection pool to the backend type. This makes it possible for separate FSMs to run at the same time through the same backend, by retrieving separate connections. --- finito-postgres/Cargo.toml | 3 ++- finito-postgres/src/error.rs | 15 ++++++++++++++- finito-postgres/src/lib.rs | 36 ++++++++++++++++++++++++++++-------- 3 files changed, 44 insertions(+), 10 deletions(-) diff --git a/finito-postgres/Cargo.toml b/finito-postgres/Cargo.toml index 12bcef8c2..dd8d1d000 100644 --- a/finito-postgres/Cargo.toml +++ b/finito-postgres/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Vincent Ambo "] [dependencies] chrono = "0.4" +postgres-derive = "0.3" serde = "1.0" serde_json = "1.0" -postgres-derive = "0.3" +r2d2_postgres = "0.14" [dependencies.postgres] version = "0.15" diff --git a/finito-postgres/src/error.rs b/finito-postgres/src/error.rs index 0bf7f4018..e130d1836 100644 --- a/finito-postgres/src/error.rs +++ b/finito-postgres/src/error.rs @@ -7,8 +7,9 @@ use uuid::Uuid; use std::error::Error as StdError; // errors to chain: -use serde_json::Error as JsonError; use postgres::Error as PgError; +use r2d2_postgres::r2d2::Error as PoolError; +use serde_json::Error as JsonError; pub type Result = result::Result; @@ -26,6 +27,9 @@ pub enum ErrorKind { /// Errors occuring during communication with the database. Database(String), + /// Errors with the database connection pool. + DBPool(String), + /// State machine could not be found. FSMNotFound(Uuid), @@ -43,6 +47,9 @@ impl fmt::Display for Error { Database(err) => format!("PostgreSQL error: {}", err), + DBPool(err) => + format!("Database connection pool error: {}", err), + FSMNotFound(id) => format!("FSM with ID {} not found", id), @@ -80,6 +87,12 @@ impl From for ErrorKind { } } +impl From for ErrorKind { + fn from(err: PoolError) -> ErrorKind { + ErrorKind::DBPool(err.to_string()) + } +} + /// Helper trait that makes it possible to supply contextual /// information with an error. pub trait ResultExt { diff --git a/finito-postgres/src/lib.rs b/finito-postgres/src/lib.rs index 844e8f79f..eea6405c6 100644 --- a/finito-postgres/src/lib.rs +++ b/finito-postgres/src/lib.rs @@ -9,6 +9,7 @@ extern crate chrono; extern crate finito; +extern crate r2d2_postgres; extern crate serde; extern crate serde_json; extern crate uuid; @@ -19,16 +20,20 @@ extern crate uuid; mod error; pub use error::{Result, Error, ErrorKind}; -use error::ResultExt; use chrono::prelude::{DateTime, Utc}; +use error::ResultExt; use finito::{FSM, FSMBackend}; -use postgres::{Connection, GenericConnection}; use postgres::transaction::Transaction; +use postgres::GenericConnection; use serde::Serialize; use serde::de::DeserializeOwned; use serde_json::Value; use std::marker::PhantomData; use uuid::Uuid; +use r2d2_postgres::{r2d2, PostgresConnectionManager}; + +type DBPool = r2d2::Pool; +type DBConn = r2d2::PooledConnection; /// This struct represents rows in the database table in which events /// are persisted. @@ -103,8 +108,16 @@ struct ActionT { /// now. pub struct FinitoPostgres { state: S, - // TODO: Use connection pool? - conn: Connection, + + db_pool: DBPool, +} + +impl FinitoPostgres { + pub fn new(state: S, db_pool: DBPool, pool_size: usize) -> Self { + FinitoPostgres { + state, db_pool, + } + } } impl FSMBackend for FinitoPostgres { @@ -121,14 +134,14 @@ impl FSMBackend for FinitoPostgres { let fsm = S::FSM_NAME.to_string(); let state = serde_json::to_value(initial).context("failed to serialise FSM")?; - self.conn.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; + self.conn()?.execute(query, &[&id, &fsm, &state]).context("failed to insert FSM")?; return Ok(id); } fn get_machine(&self, key: Uuid) -> Result { - get_machine_internal(&self.conn, key, false) + get_machine_internal(&*self.conn()?, key, false) } /// Advance a persisted state machine by applying an event, and @@ -147,7 +160,8 @@ impl FSMBackend for FinitoPostgres { S::State: From<&'a State>, S::Event: Serialize + DeserializeOwned, S::Action: Serialize + DeserializeOwned { - let tx = self.conn.transaction().context("could not begin transaction")?; + let conn = self.conn()?; + let tx = conn.transaction().context("could not begin transaction")?; let state = get_machine_internal(&tx, key, true)?; // Advancing the FSM consumes the event, so it is persisted first: @@ -184,9 +198,10 @@ impl FinitoPostgres { S::Action: Serialize + DeserializeOwned, S::State: From<&'a State> { let state: S::State = (&self.state).into(); + let conn = self.conn().expect("TODO"); for action_id in action_ids { - let tx = self.conn.transaction().expect("TODO"); + let tx = conn.transaction().expect("TODO"); // TODO: Determine which concurrency setup we actually want. if let Ok(events) = run_action(tx, action_id, &state, PhantomData::) { @@ -196,6 +211,11 @@ impl FinitoPostgres { } } } + + /// Retrieve a single connection from the database connection pool. + fn conn(&self) -> Result { + self.db_pool.get().context("failed to retrieve connection from pool") + } }