From 2957c16a533e4021024d2f0c262fca5a520e19bf Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sun, 15 Oct 2017 22:05:56 +0200 Subject: [PATCH 01/23] chore: Initial commit From a89b7255ee88e5c761b11c34bffc64a30f5a3c3e Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sun, 15 Oct 2017 22:06:22 +0200 Subject: [PATCH 02/23] chore: Add project scaffolding --- .gitignore | 4 ++++ CODE_OF_CONDUCT.md | 20 ++++++++++++++++++++ Cargo.toml | 6 ++++++ LICENSE | 21 +++++++++++++++++++++ src/lib.rs | 7 +++++++ 5 files changed, 58 insertions(+) create mode 100644 .gitignore create mode 100644 CODE_OF_CONDUCT.md create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 src/lib.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..05d50257f --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +/target/ +**/*.rs.bk +Cargo.lock +.idea/ diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md new file mode 100644 index 000000000..c4013ac13 --- /dev/null +++ b/CODE_OF_CONDUCT.md @@ -0,0 +1,20 @@ +A SERMON ON ETHICS AND LOVE +=========================== + +One day Mal-2 asked the messenger spirit Saint Gulik to approach the Goddess and request Her presence for some desperate advice. Shortly afterwards the radio came on by itself, and an ethereal female Voice said **YES?** + +"O! Eris! Blessed Mother of Man! Queen of Chaos! Daughter of Discord! Concubine of Confusion! O! Exquisite Lady, I beseech You to lift a heavy burden from my heart!" + +**WHAT BOTHERS YOU, MAL? YOU DON'T SOUND WELL.** + +"I am filled with fear and tormented with terrible visions of pain. Everywhere people are hurting one another, the planet is rampant with injustices, whole societies plunder groups of their own people, mothers imprison sons, children perish while brothers war. O, woe." + +**WHAT IS THE MATTER WITH THAT, IF IT IS WHAT YOU WANT TO DO?** + +"But nobody Wants it! Everybody hates it." + +**OH. WELL, THEN *STOP*.** + +At which moment She turned herself into an aspirin commercial and left The Polyfather stranded alone with his species. + +SINISTER DEXTER HAS A BROKEN SPIROMETER. diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 000000000..c9ca1bdaa --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "posix_mq" +version = "0.1.0" +authors = ["Vincent Ambo "] + +[dependencies] diff --git a/LICENSE b/LICENSE new file mode 100644 index 000000000..b1b8e03c8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Vincent Ambo + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 000000000..31e1bb209 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,7 @@ +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} From 1f1a74108e74a50c39eeac37bb3a91cb49c1d35d Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Sun, 15 Oct 2017 23:22:18 +0200 Subject: [PATCH 03/23] feat(error): Add error-mapping from C calls Implements an error enum with mappings from the low-level C calls and appropriate error descriptions. --- Cargo.toml | 2 ++ src/error.rs | 97 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/error.rs diff --git a/Cargo.toml b/Cargo.toml index c9ca1bdaa..ab5d30215 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,3 +4,5 @@ version = "0.1.0" authors = ["Vincent Ambo "] [dependencies] +nix = "0.9" +libc = "0.2" diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 000000000..b288723e5 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,97 @@ +use nix; +use std::error; +use std::fmt; + +/// This module implements a simple error type to match the errors that can be thrown from the C +/// functions as well as some extra errors resulting from internal validations. +/// +/// As this crate exposes an opinionated API to the POSIX queues certain errors have been +/// ignored: +/// +/// * ETIMEDOUT: The low-level timed functions are not exported and this error can not occur. +/// * EAGAIN: Non-blocking queue calls are not supported. +/// * EINVAL: Same reason as ETIMEDOUT +/// * EMSGSIZE: The message size is immutable after queue creation and this crate checks it. +/// * ENAMETOOLONG: This crate performs name validation +/// +/// If an unexpected error is encountered it will be wrapped appropriately and should be reported +/// as a bug on https://github.com/aprilabank/posix_mq.rs + +#[derive(Debug)] +pub enum Error { + // These errors match what is described in the man pages (from mq_overview(7) onwards). + PermissionDenied(), + InvalidQueueDescriptor(), + QueueCallInterrupted(), + QueueAlreadyExists(), + QueueNotFound(), + InsufficientMemory(), + InsufficientSpace(), + + // These two are (hopefully) unlikely in modern systems + ProcessFileDescriptorLimitReached(), + SystemFileDescriptorLimitReached(), + + // If an unhandled / unknown / unexpected error occurs this error will be used. + // In those cases bug reports would be welcome! + UnknownForeignError(nix::Errno), + + // Some other unexpected / unknown error occured. This is probably an error from + // the nix crate. Bug reports also welcome for this! + UnknownInternalError(Option), +} + +impl error::Error for Error { + fn description(&self) -> &str { + use Error::*; + match *self { + PermissionDenied() => "permission to the specified queue was denied", + InvalidQueueDescriptor() => "the internal queue descriptor was invalid", + QueueCallInterrupted() => "queue method interrupted by signal", + QueueAlreadyExists() => "the specified queue already exists", + QueueNotFound() => "the specified queue could not be found", + InsufficientMemory() => "insufficient memory to call queue method", + InsufficientSpace() => "insufficient space to call queue method", + ProcessFileDescriptorLimitReached() => "max. number of process file descriptors reached", + SystemFileDescriptorLimitReached() => "max. number of system file descriptors reached", + UnknownForeignError(_) => "unknown foreign error occured: please report a bug!", + UnknownInternalError(_) => "unknown internal error occured: please report a bug!", + } + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + // Explicitly import this to gain access to Error::description() + use std::error::Error; + f.write_str(self.description()) + } +} + +/// This from implementation is used to translate errors from the lower-level +/// C-calls into sensible Rust errors. +impl From for Error { + fn from(e: nix::Error) -> Self { + match e { + nix::Error::Sys(e) => match_errno(e), + _ => Error::UnknownInternalError(Some(e)), + } + } +} + +fn match_errno(err: nix::Errno) -> Error { + use nix::errno::*; + + match err { + EACCES => Error::PermissionDenied(), + EBADF => Error::InvalidQueueDescriptor(), + EINTR => Error::QueueCallInterrupted(), + EEXIST => Error::QueueAlreadyExists(), + EMFILE => Error::ProcessFileDescriptorLimitReached(), + ENFILE => Error::SystemFileDescriptorLimitReached(), + ENOENT => Error::QueueNotFound(), + ENOMEM => Error::InsufficientMemory(), + ENOSPC => Error::InsufficientSpace(), + _ => Error::UnknownForeignError(err), + } +} From 7dc6144e3fe53d611a8a83ad78fed1dc37c785e6 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:08:08 +0200 Subject: [PATCH 04/23] feat: Implement high-level POSIX message queue API Implements a high-level API on top of POSIX message queues (mq_overview(7)). This API can be used to perform local RPC between processes that need to exchange messages *fast* (or *easy*) with priority ordering. The methods are mostly documented but there are still two corner cases that need to be looked at and a lot of tests missing. --- src/error.rs | 37 +++++++- src/lib.rs | 261 ++++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 292 insertions(+), 6 deletions(-) diff --git a/src/error.rs b/src/error.rs index b288723e5..1a0c069a8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,8 @@ use nix; use std::error; use std::fmt; +use std::io; +use std::num; /// This module implements a simple error type to match the errors that can be thrown from the C /// functions as well as some extra errors resulting from internal validations. @@ -19,6 +21,13 @@ use std::fmt; #[derive(Debug)] pub enum Error { + // These errors are raised inside of the library + InvalidQueueName(&'static str), + ValueReadingError(io::Error), + MessageSizeExceeded(), + MaximumMessageSizeExceeded(), + MaximumMessageCountExceeded(), + // These errors match what is described in the man pages (from mq_overview(7) onwards). PermissionDenied(), InvalidQueueDescriptor(), @@ -45,6 +54,12 @@ impl error::Error for Error { fn description(&self) -> &str { use Error::*; match *self { + // This error contains more sensible description strings already + InvalidQueueName(e) => e, + ValueReadingError(_) => "error reading system configuration for message queues", + MessageSizeExceeded() => "message is larger than maximum size for specified queue", + MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum", + MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum", PermissionDenied() => "permission to the specified queue was denied", InvalidQueueDescriptor() => "the internal queue descriptor was invalid", QueueCallInterrupted() => "queue method interrupted by signal", @@ -52,8 +67,10 @@ impl error::Error for Error { QueueNotFound() => "the specified queue could not be found", InsufficientMemory() => "insufficient memory to call queue method", InsufficientSpace() => "insufficient space to call queue method", - ProcessFileDescriptorLimitReached() => "max. number of process file descriptors reached", - SystemFileDescriptorLimitReached() => "max. number of system file descriptors reached", + ProcessFileDescriptorLimitReached() => + "maximum number of process file descriptors reached", + SystemFileDescriptorLimitReached() => + "maximum number of system file descriptors reached", UnknownForeignError(_) => "unknown foreign error occured: please report a bug!", UnknownInternalError(_) => "unknown internal error occured: please report a bug!", } @@ -79,6 +96,22 @@ impl From for Error { } } +// This implementation is used when reading system queue settings. +impl From for Error { + fn from(e: io::Error) -> Self { + Error::ValueReadingError(e) + } +} + +// This implementation is used when parsing system queue settings. The unknown error is returned +// here because the system is probably seriously broken if those files don't contain numbers. +impl From for Error { + fn from(_: num::ParseIntError) -> Self { + Error::UnknownInternalError(None) + } +} + + fn match_errno(err: nix::Errno) -> Error { use nix::errno::*; diff --git a/src/lib.rs b/src/lib.rs index 31e1bb209..d84d84526 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,260 @@ +extern crate nix; +extern crate libc; + +use nix::mqueue; +use nix::sys::stat; +use std::ffi::CString; +use libc::mqd_t; +use error::Error; +use std::string::ToString; +use std::fs::File; +use std::io::Read; + +mod error; + #[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); +mod tests; + +/* +TODO: + +* what happens if permissions change after FD was opened? +* drop dependency on nix crate? + +*/ + +/// Wrapper type for queue names that performs basic validation of queue names before calling +/// out to C code. +pub struct Name(CString); + +impl Name { + pub fn new(s: S) -> Result { + let string = s.to_string(); + + if !string.starts_with('/') { + return Err(Error::InvalidQueueName("Queue name must not start with '/'")) + } + + // The C library has a special error return for this case, so I assume people must actually + // have tried just using '/' as a queue name. + if string.len() == 1 { + return Err(Error::InvalidQueueName( + "Queue name must be a slash followed by one or more characters" + )) + } + + if string.len() > 255 { + return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters")) + } + + if string.matches('/').count() > 1 { + return Err(Error::InvalidQueueName("Queue name can not contain more than one slash")) + } + + // TODO: What error is being thrown away here? Is it possible? + Ok(Name(CString::new(string).unwrap())) } } + +#[derive(Debug)] +pub struct Message { + pub data: Vec, + pub priority: u32, +} + +/// Represents an open queue descriptor to a POSIX message queue. This carries information +/// about the queue's limitations (i.e. maximum message size and maximum message count). +pub struct Queue { + name: Name, + + /// Internal file/queue descriptor. + queue_descriptor: mqd_t, + + /// Maximum number of pending messages in this queue. + max_pending: i64, + + /// Maximum size of this queue. + max_size: usize, +} + +impl Queue { + /// Creates a new queue and fails if it already exists. + /// By default the queue will be read/writable by the current user with no access for other + /// users. + /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue. + pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result { + if max_pending > read_i64_from_file(MSG_MAX)? { + return Err(Error::MaximumMessageCountExceeded()) + } + + if max_size > read_i64_from_file(MSGSIZE_MAX)? { + return Err(Error::MaximumMessageSizeExceeded()) + } + + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::O_CREAT); + // Fail if queue exists already + flags.toggle(mqueue::O_EXCL); + flags + }; + + let attr = mqueue::MqAttr::new( + 0, max_pending, max_size, 0 + ); + + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + Some(&attr), + )?; + + Ok(Queue { + name, + queue_descriptor, + max_pending, + max_size: max_size as usize, + }) + } + + /// Opens an existing queue. + pub fn open(name: Name) -> Result { + // No extra flags need to be constructed as the default is to open and fail if the + // queue does not exist yet - which is what we want here. + let oflags = mqueue::O_RDWR; + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + None, + )?; + + let attr = mq_getattr(queue_descriptor)?; + + Ok(Queue{ + name, + queue_descriptor, + max_pending: attr.mq_maxmsg, + max_size: attr.mq_msgsize as usize, + }) + } + + /// Opens an existing queue or creates a new queue with the OS default settings. + pub fn open_or_create(name: Name) -> Result { + let oflags = { + let mut flags = mqueue::MQ_OFlag::empty(); + // Put queue in r/w mode + flags.toggle(mqueue::O_RDWR); + // Enable queue creation + flags.toggle(mqueue::O_CREAT); + flags + }; + + let default_pending = read_i64_from_file(MSG_DEFAULT)?; + let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?; + let attr = mqueue::MqAttr::new( + 0, default_pending, default_size, 0 + ); + + let queue_descriptor = mqueue::mq_open( + &name.0, + oflags, + default_mode(), + Some(&attr), + )?; + + let actual_attr = mq_getattr(queue_descriptor)?; + + Ok(Queue { + name, + queue_descriptor, + max_pending: actual_attr.mq_maxmsg, + max_size: actual_attr.mq_msgsize as usize, + }) + } + + /// Delete a message queue from the system. This method will make the queue unavailable for + /// other processes, too! + pub fn delete(self) -> Result<(), Error> { + mqueue::mq_unlink(&self.name.0)?; + Ok(()) + } + + pub fn send(self, msg: Message) -> Result<(), Error> { + if msg.data.len() > self.max_size as usize { + return Err(Error::MessageSizeExceeded()) + } + + mqueue::mq_send( + self.queue_descriptor.clone(), + msg.data.as_ref(), + msg.priority, + ).map_err(|e| e.into()) + } + + pub fn receive(self) -> Result { + let mut data: Vec = vec![0; self.max_size as usize]; + let mut priority: u32 = 0; + + let msg_size = mqueue::mq_receive( + self.queue_descriptor.clone(), + data.as_mut(), + &mut priority, + )?; + + data.truncate(msg_size); + Ok(Message { data, priority }) + } + + pub fn max_pending(&self) -> i64 { + self.max_pending + } + + pub fn max_size(&self) -> usize { + self.max_size + } +} + +// Creates the default queue mode (0600). +fn default_mode() -> stat::Mode { + let mut mode = stat::Mode::empty(); + mode.toggle(stat::S_IRUSR); + mode.toggle(stat::S_IWUSR); + mode +} + +/// This file defines the default number of maximum pending messages in a queue. +const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default"; + +/// This file defines the system maximum number of pending messages in a queue. +const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max"; + +/// This file defines the default maximum size of messages in a queue. +const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default"; + +/// This file defines the system maximum size for messages in a queue. +const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max"; + +/// This method is used in combination with the above constants to find system limits. +fn read_i64_from_file(name: &str) -> Result { + let mut file = File::open(name.to_string())?; + let mut content = String::new(); + file.read_to_string(&mut content)?; + Ok(content.parse()?) +} + +/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which +/// is very impractical. +/// To work around it, this method calls the C-function directly. +fn mq_getattr(mqd: mqd_t) -> Result { + use std::mem; + let mut attr = unsafe { mem::uninitialized::() }; + let res = unsafe { libc::mq_getattr(mqd, &mut attr) }; + nix::Errno::result(res) + .map(|_| attr) + .map_err(|e| e.into()) +} From d6fa62a96847ab447622cbd3db123e58dc320d5a Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:11:33 +0200 Subject: [PATCH 05/23] style: Apply code format --- src/lib.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d84d84526..51d901ec8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ extern crate nix; extern crate libc; +use error::Error; +use libc::mqd_t; use nix::mqueue; use nix::sys::stat; use std::ffi::CString; -use libc::mqd_t; -use error::Error; -use std::string::ToString; use std::fs::File; use std::io::Read; +use std::string::ToString; mod error; @@ -32,7 +32,7 @@ impl Name { let string = s.to_string(); if !string.starts_with('/') { - return Err(Error::InvalidQueueName("Queue name must not start with '/'")) + return Err(Error::InvalidQueueName("Queue name must not start with '/'")); } // The C library has a special error return for this case, so I assume people must actually @@ -40,15 +40,15 @@ impl Name { if string.len() == 1 { return Err(Error::InvalidQueueName( "Queue name must be a slash followed by one or more characters" - )) + )); } if string.len() > 255 { - return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters")) + return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters")); } if string.matches('/').count() > 1 { - return Err(Error::InvalidQueueName("Queue name can not contain more than one slash")) + return Err(Error::InvalidQueueName("Queue name can not contain more than one slash")); } // TODO: What error is being thrown away here? Is it possible? @@ -84,11 +84,11 @@ impl Queue { /// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue. pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result { if max_pending > read_i64_from_file(MSG_MAX)? { - return Err(Error::MaximumMessageCountExceeded()) + return Err(Error::MaximumMessageCountExceeded()); } if max_size > read_i64_from_file(MSGSIZE_MAX)? { - return Err(Error::MaximumMessageSizeExceeded()) + return Err(Error::MaximumMessageSizeExceeded()); } let oflags = { @@ -135,7 +135,7 @@ impl Queue { let attr = mq_getattr(queue_descriptor)?; - Ok(Queue{ + Ok(Queue { name, queue_descriptor, max_pending: attr.mq_maxmsg, @@ -186,7 +186,7 @@ impl Queue { pub fn send(self, msg: Message) -> Result<(), Error> { if msg.data.len() > self.max_size as usize { - return Err(Error::MessageSizeExceeded()) + return Err(Error::MessageSizeExceeded()); } mqueue::mq_send( From 557655ee1d918579a40f0684cced80aa0390045c Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:14:27 +0200 Subject: [PATCH 06/23] fix(lib): drop(self) after delete() call --- src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/lib.rs b/src/lib.rs index 51d901ec8..ad0ead59e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,6 +181,7 @@ impl Queue { /// other processes, too! pub fn delete(self) -> Result<(), Error> { mqueue::mq_unlink(&self.name.0)?; + drop(self); Ok(()) } From 3ee616a25ac3fdd602befd2f91d7e29b7a7fa14c Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:17:05 +0200 Subject: [PATCH 07/23] fix(lib): Borrow &self in send/receive --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ad0ead59e..2ae22f162 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,7 +185,7 @@ impl Queue { Ok(()) } - pub fn send(self, msg: Message) -> Result<(), Error> { + pub fn send(&self, msg: Message) -> Result<(), Error> { if msg.data.len() > self.max_size as usize { return Err(Error::MessageSizeExceeded()); } @@ -197,7 +197,7 @@ impl Queue { ).map_err(|e| e.into()) } - pub fn receive(self) -> Result { + pub fn receive(&self) -> Result { let mut data: Vec = vec![0; self.max_size as usize]; let mut priority: u32 = 0; From c3cc663ab1eafec38033a845c37d8766b334d321 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:52:38 +0200 Subject: [PATCH 08/23] fix(lib): Trim whitespace from OS limit files --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2ae22f162..877017e89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,7 +185,7 @@ impl Queue { Ok(()) } - pub fn send(&self, msg: Message) -> Result<(), Error> { + pub fn send(&self, msg: &Message) -> Result<(), Error> { if msg.data.len() > self.max_size as usize { return Err(Error::MessageSizeExceeded()); } @@ -245,7 +245,7 @@ fn read_i64_from_file(name: &str) -> Result { let mut file = File::open(name.to_string())?; let mut content = String::new(); file.read_to_string(&mut content)?; - Ok(content.parse()?) + Ok(content.trim().parse()?) } /// The mq_getattr implementation in the nix crate hides the maximum message size and count, which From 6be954393b452275445108098d3ea24af25248b7 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 01:52:56 +0200 Subject: [PATCH 09/23] feat(tests): Add a simple send/receive test --- src/lib.rs | 2 +- src/tests.rs | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) create mode 100644 src/tests.rs diff --git a/src/lib.rs b/src/lib.rs index 877017e89..475b2c0f4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,7 @@ impl Name { } } -#[derive(Debug)] +#[derive(Debug, PartialEq)] pub struct Message { pub data: Vec, pub priority: u32, diff --git a/src/tests.rs b/src/tests.rs new file mode 100644 index 000000000..0018e40da --- /dev/null +++ b/src/tests.rs @@ -0,0 +1,22 @@ +use super::*; + +#[test] +fn test_open_delete() { + // Simple test with default queue settings + let name = Name::new("/test-queue").unwrap(); + let queue = Queue::open_or_create(name) + .expect("Opening queue failed"); + + let message = Message { + data: "test-message".as_bytes().to_vec(), + priority: 0, + }; + + queue.send(&message).expect("message sending failed"); + + let result = queue.receive().expect("message receiving failed"); + + assert_eq!(message, result); + + queue.delete(); +} From 3144b61ccc2ec024c19ccb41ff6964a86203ee14 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 02:00:32 +0200 Subject: [PATCH 10/23] feat(lib): Implement Drop trait for Queue Implements the Drop trait to take care of closing the queue descriptor when a Queue instance is dropped. --- src/lib.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 475b2c0f4..cc3939d94 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ use std::ffi::CString; use std::fs::File; use std::io::Read; use std::string::ToString; +use std::ops::Drop; mod error; @@ -220,6 +221,15 @@ impl Queue { } } +impl Drop for Queue { + fn drop(&mut self) { + // Attempt to close the queue descriptor and discard any possible errors. + // The only error thrown in the C-code is EINVAL, which would mean that the + // descriptor has already been closed. + mqueue::mq_close(self.queue_descriptor).ok(); + } +} + // Creates the default queue mode (0600). fn default_mode() -> stat::Mode { let mut mode = stat::Mode::empty(); From 45d23efb986f344d556333135b6cc80380f5e3da Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 02:09:15 +0200 Subject: [PATCH 11/23] docs: Add README --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 000000000..de112e101 --- /dev/null +++ b/README.md @@ -0,0 +1,27 @@ +posix_mq +======== + +This is a simple, relatively high-level library for the POSIX [message queue API][]. It wraps the lower-level API in a +simpler interface with more robust error handling. + +Usage example: + +```rust +// Values that need to undergo validation are wrapped in safe types: +let name = Name::new("/test-queue").unwrap(); + +// Queue creation with system defaults is simple: +let queue = Queue::open_or_create(name).expect("Opening queue failed"); + +// Sending a message: +let message = Message { + data: "test-message".as_bytes().to_vec(), + priority: 0, +}; +queue.send(&message).expect("message sending failed"); + +// ... and receiving it! +let result = queue.receive().expect("message receiving failed"); +``` + +[message queue API]: https://linux.die.net/man/7/mq_overview From 629268ef4aaeb1886d6669903dbb1a812f2f3a6c Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 02:13:44 +0200 Subject: [PATCH 12/23] feat(build): Add automated Travis builds --- .travis.yml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..a493f815a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,3 @@ +language: rust +rust: + - stable From 8ea424f5337c4cce5822b494db6123d4a690cd29 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 02:29:50 +0200 Subject: [PATCH 13/23] docs(Cargo): Add more info to Cargo manifest --- Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index ab5d30215..26c668f57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,9 @@ name = "posix_mq" version = "0.1.0" authors = ["Vincent Ambo "] +description = "(Higher-level) Rust bindings to POSIX message queues" +license = "MIT" +repository = "https://github.com/aprilabank/posix_mq.rs" [dependencies] nix = "0.9" From 773d6eec9dc5d2a20e50b18d2f428fa0919310b6 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 13:59:27 +0200 Subject: [PATCH 14/23] docs(README): Add Travis-CI & crates.io badges --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index de112e101..af7920b13 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ posix_mq ======== +[![Build Status](https://travis-ci.org/aprilabank/posix_mq.rs.svg?branch=master)](https://travis-ci.org/aprilabank/posix_mq.rs) +[![crates.io](https://img.shields.io/crates/v/posix_mq.svg)](https://crates.io/crates/posix_mq) + This is a simple, relatively high-level library for the POSIX [message queue API][]. It wraps the lower-level API in a simpler interface with more robust error handling. From 1b795840880c9a2ae0afc320040e2e634b8e596b Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 17:42:03 +0200 Subject: [PATCH 15/23] fix(lib): Fix incorrect error message for name validation The message should have been the exact opposite, duh! --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index cc3939d94..87ca9af78 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,7 @@ impl Name { let string = s.to_string(); if !string.starts_with('/') { - return Err(Error::InvalidQueueName("Queue name must not start with '/'")); + return Err(Error::InvalidQueueName("Queue name must start with '/'")); } // The C library has a special error return for this case, so I assume people must actually From b75306e58fad9f4456e141436205ce0a4caa00e8 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 17:42:38 +0200 Subject: [PATCH 16/23] chore(lib): Derive Debug traits on Queue, Message --- src/lib.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/lib.rs b/src/lib.rs index 87ca9af78..75846a6de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,6 +26,7 @@ TODO: /// Wrapper type for queue names that performs basic validation of queue names before calling /// out to C code. +#[derive(Debug)] pub struct Name(CString); impl Name { @@ -65,6 +66,7 @@ pub struct Message { /// Represents an open queue descriptor to a POSIX message queue. This carries information /// about the queue's limitations (i.e. maximum message size and maximum message count). +#[derive(Debug)] pub struct Queue { name: Name, From 1b7464f9cff886ba397e1335a0e50d7e45860a5a Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 17:43:41 +0200 Subject: [PATCH 17/23] refactor(lib): Add some additional documentation * improve delete() docs * add docs for send() & receive() * remove unnecessary clone() calls --- src/lib.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 75846a6de..6cb595a4d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -181,31 +181,35 @@ impl Queue { } /// Delete a message queue from the system. This method will make the queue unavailable for - /// other processes, too! + /// other processes after their current queue descriptors have been closed. pub fn delete(self) -> Result<(), Error> { mqueue::mq_unlink(&self.name.0)?; drop(self); Ok(()) } + /// Send a message to the message queue. + /// If the queue is full this call will block until a message has been consumed. pub fn send(&self, msg: &Message) -> Result<(), Error> { if msg.data.len() > self.max_size as usize { return Err(Error::MessageSizeExceeded()); } mqueue::mq_send( - self.queue_descriptor.clone(), + self.queue_descriptor, msg.data.as_ref(), msg.priority, ).map_err(|e| e.into()) } + /// Receive a message from the message queue. + /// If the queue is empty this call will block until a message arrives. pub fn receive(&self) -> Result { let mut data: Vec = vec![0; self.max_size as usize]; let mut priority: u32 = 0; let msg_size = mqueue::mq_receive( - self.queue_descriptor.clone(), + self.queue_descriptor, data.as_mut(), &mut priority, )?; From 043c80848d1e23b4c40a8b04e2c41cf72b539fbf Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 17:44:54 +0200 Subject: [PATCH 18/23] chore: Bump version to 0.1.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 26c668f57..1b8cf800f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "posix_mq" -version = "0.1.0" +version = "0.1.1" authors = ["Vincent Ambo "] description = "(Higher-level) Rust bindings to POSIX message queues" license = "MIT" From 17e6d52cd689be1ab42220d3125c3396a2963a5d Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Mon, 16 Oct 2017 17:49:29 +0200 Subject: [PATCH 19/23] docs(README): Add link to Kotlin library --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index af7920b13..9370c6c08 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,8 @@ posix_mq This is a simple, relatively high-level library for the POSIX [message queue API][]. It wraps the lower-level API in a simpler interface with more robust error handling. +Check out this project's [sister library][] in Kotlin. + Usage example: ```rust @@ -28,3 +30,4 @@ let result = queue.receive().expect("message receiving failed"); ``` [message queue API]: https://linux.die.net/man/7/mq_overview +[sister library]: https://github.com/aprilabank/posix_mq.kt From e22ad153293cfa9c1d88e212959377814fe50a03 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Fri, 20 Oct 2017 17:18:59 +0200 Subject: [PATCH 20/23] fix(error): Module should be public --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 6cb595a4d..a45322278 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ use std::io::Read; use std::string::ToString; use std::ops::Drop; -mod error; +pub mod error; #[cfg(test)] mod tests; From 232cf36f20fdd2ea46a4be14b6ba12ad85033c17 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Fri, 20 Oct 2017 17:19:46 +0200 Subject: [PATCH 21/23] chore(cargo): Bump version to 0.1.2 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 1b8cf800f..8ca6c6cc5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "posix_mq" -version = "0.1.1" +version = "0.1.2" authors = ["Vincent Ambo "] description = "(Higher-level) Rust bindings to POSIX message queues" license = "MIT" From d8af25d58b971e885baf7b5437e909d70fa15d98 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 3 May 2018 12:40:06 +0200 Subject: [PATCH 22/23] chore(lib): Make Name & Message types Clone and PartialEq --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a45322278..b8f2fed10 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ TODO: /// Wrapper type for queue names that performs basic validation of queue names before calling /// out to C code. -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq)] pub struct Name(CString); impl Name { @@ -58,7 +58,7 @@ impl Name { } } -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] pub struct Message { pub data: Vec, pub priority: u32, From ae1a57f29d0cc6ec9bb0eac4536c212b712bd611 Mon Sep 17 00:00:00 2001 From: Vincent Ambo Date: Thu, 3 May 2018 12:45:33 +0200 Subject: [PATCH 23/23] chore(cargo): Bump crate minor version --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8ca6c6cc5..db471168c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "posix_mq" -version = "0.1.2" -authors = ["Vincent Ambo "] +version = "0.1.3" +authors = ["Vincent Ambo "] description = "(Higher-level) Rust bindings to POSIX message queues" license = "MIT" repository = "https://github.com/aprilabank/posix_mq.rs"