feat: Implement high-level POSIX message queue API
Implements a high-level API on top of POSIX message queues (mq_overview(7)). This API can be used to perform local RPC between processes that need to exchange messages *fast* (or *easy*) with priority ordering. The methods are mostly documented but there are still two corner cases that need to be looked at and a lot of tests missing.
This commit is contained in:
parent
1f1a74108e
commit
7dc6144e3f
2 changed files with 292 additions and 6 deletions
37
src/error.rs
37
src/error.rs
|
@ -1,6 +1,8 @@
|
||||||
use nix;
|
use nix;
|
||||||
use std::error;
|
use std::error;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
use std::io;
|
||||||
|
use std::num;
|
||||||
|
|
||||||
/// This module implements a simple error type to match the errors that can be thrown from the C
|
/// This module implements a simple error type to match the errors that can be thrown from the C
|
||||||
/// functions as well as some extra errors resulting from internal validations.
|
/// functions as well as some extra errors resulting from internal validations.
|
||||||
|
@ -19,6 +21,13 @@ use std::fmt;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
// These errors are raised inside of the library
|
||||||
|
InvalidQueueName(&'static str),
|
||||||
|
ValueReadingError(io::Error),
|
||||||
|
MessageSizeExceeded(),
|
||||||
|
MaximumMessageSizeExceeded(),
|
||||||
|
MaximumMessageCountExceeded(),
|
||||||
|
|
||||||
// These errors match what is described in the man pages (from mq_overview(7) onwards).
|
// These errors match what is described in the man pages (from mq_overview(7) onwards).
|
||||||
PermissionDenied(),
|
PermissionDenied(),
|
||||||
InvalidQueueDescriptor(),
|
InvalidQueueDescriptor(),
|
||||||
|
@ -45,6 +54,12 @@ impl error::Error for Error {
|
||||||
fn description(&self) -> &str {
|
fn description(&self) -> &str {
|
||||||
use Error::*;
|
use Error::*;
|
||||||
match *self {
|
match *self {
|
||||||
|
// This error contains more sensible description strings already
|
||||||
|
InvalidQueueName(e) => e,
|
||||||
|
ValueReadingError(_) => "error reading system configuration for message queues",
|
||||||
|
MessageSizeExceeded() => "message is larger than maximum size for specified queue",
|
||||||
|
MaximumMessageSizeExceeded() => "specified queue message size exceeds system maximum",
|
||||||
|
MaximumMessageCountExceeded() => "specified queue message count exceeds system maximum",
|
||||||
PermissionDenied() => "permission to the specified queue was denied",
|
PermissionDenied() => "permission to the specified queue was denied",
|
||||||
InvalidQueueDescriptor() => "the internal queue descriptor was invalid",
|
InvalidQueueDescriptor() => "the internal queue descriptor was invalid",
|
||||||
QueueCallInterrupted() => "queue method interrupted by signal",
|
QueueCallInterrupted() => "queue method interrupted by signal",
|
||||||
|
@ -52,8 +67,10 @@ impl error::Error for Error {
|
||||||
QueueNotFound() => "the specified queue could not be found",
|
QueueNotFound() => "the specified queue could not be found",
|
||||||
InsufficientMemory() => "insufficient memory to call queue method",
|
InsufficientMemory() => "insufficient memory to call queue method",
|
||||||
InsufficientSpace() => "insufficient space to call queue method",
|
InsufficientSpace() => "insufficient space to call queue method",
|
||||||
ProcessFileDescriptorLimitReached() => "max. number of process file descriptors reached",
|
ProcessFileDescriptorLimitReached() =>
|
||||||
SystemFileDescriptorLimitReached() => "max. number of system file descriptors reached",
|
"maximum number of process file descriptors reached",
|
||||||
|
SystemFileDescriptorLimitReached() =>
|
||||||
|
"maximum number of system file descriptors reached",
|
||||||
UnknownForeignError(_) => "unknown foreign error occured: please report a bug!",
|
UnknownForeignError(_) => "unknown foreign error occured: please report a bug!",
|
||||||
UnknownInternalError(_) => "unknown internal error occured: please report a bug!",
|
UnknownInternalError(_) => "unknown internal error occured: please report a bug!",
|
||||||
}
|
}
|
||||||
|
@ -79,6 +96,22 @@ impl From<nix::Error> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This implementation is used when reading system queue settings.
|
||||||
|
impl From<io::Error> for Error {
|
||||||
|
fn from(e: io::Error) -> Self {
|
||||||
|
Error::ValueReadingError(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This implementation is used when parsing system queue settings. The unknown error is returned
|
||||||
|
// here because the system is probably seriously broken if those files don't contain numbers.
|
||||||
|
impl From<num::ParseIntError> for Error {
|
||||||
|
fn from(_: num::ParseIntError) -> Self {
|
||||||
|
Error::UnknownInternalError(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
fn match_errno(err: nix::Errno) -> Error {
|
fn match_errno(err: nix::Errno) -> Error {
|
||||||
use nix::errno::*;
|
use nix::errno::*;
|
||||||
|
|
||||||
|
|
261
src/lib.rs
261
src/lib.rs
|
@ -1,7 +1,260 @@
|
||||||
|
extern crate nix;
|
||||||
|
extern crate libc;
|
||||||
|
|
||||||
|
use nix::mqueue;
|
||||||
|
use nix::sys::stat;
|
||||||
|
use std::ffi::CString;
|
||||||
|
use libc::mqd_t;
|
||||||
|
use error::Error;
|
||||||
|
use std::string::ToString;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
mod error;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests;
|
||||||
#[test]
|
|
||||||
fn it_works() {
|
/*
|
||||||
assert_eq!(2 + 2, 4);
|
TODO:
|
||||||
|
|
||||||
|
* what happens if permissions change after FD was opened?
|
||||||
|
* drop dependency on nix crate?
|
||||||
|
|
||||||
|
*/
|
||||||
|
|
||||||
|
/// Wrapper type for queue names that performs basic validation of queue names before calling
|
||||||
|
/// out to C code.
|
||||||
|
pub struct Name(CString);
|
||||||
|
|
||||||
|
impl Name {
|
||||||
|
pub fn new<S: ToString>(s: S) -> Result<Self, Error> {
|
||||||
|
let string = s.to_string();
|
||||||
|
|
||||||
|
if !string.starts_with('/') {
|
||||||
|
return Err(Error::InvalidQueueName("Queue name must not start with '/'"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// The C library has a special error return for this case, so I assume people must actually
|
||||||
|
// have tried just using '/' as a queue name.
|
||||||
|
if string.len() == 1 {
|
||||||
|
return Err(Error::InvalidQueueName(
|
||||||
|
"Queue name must be a slash followed by one or more characters"
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if string.len() > 255 {
|
||||||
|
return Err(Error::InvalidQueueName("Queue name must not exceed 255 characters"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if string.matches('/').count() > 1 {
|
||||||
|
return Err(Error::InvalidQueueName("Queue name can not contain more than one slash"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: What error is being thrown away here? Is it possible?
|
||||||
|
Ok(Name(CString::new(string).unwrap()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Message {
|
||||||
|
pub data: Vec<u8>,
|
||||||
|
pub priority: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents an open queue descriptor to a POSIX message queue. This carries information
|
||||||
|
/// about the queue's limitations (i.e. maximum message size and maximum message count).
|
||||||
|
pub struct Queue {
|
||||||
|
name: Name,
|
||||||
|
|
||||||
|
/// Internal file/queue descriptor.
|
||||||
|
queue_descriptor: mqd_t,
|
||||||
|
|
||||||
|
/// Maximum number of pending messages in this queue.
|
||||||
|
max_pending: i64,
|
||||||
|
|
||||||
|
/// Maximum size of this queue.
|
||||||
|
max_size: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Queue {
|
||||||
|
/// Creates a new queue and fails if it already exists.
|
||||||
|
/// By default the queue will be read/writable by the current user with no access for other
|
||||||
|
/// users.
|
||||||
|
/// Linux users can change this setting themselves by modifying the queue file in /dev/mqueue.
|
||||||
|
pub fn create(name: Name, max_pending: i64, max_size: i64) -> Result<Queue, Error> {
|
||||||
|
if max_pending > read_i64_from_file(MSG_MAX)? {
|
||||||
|
return Err(Error::MaximumMessageCountExceeded())
|
||||||
|
}
|
||||||
|
|
||||||
|
if max_size > read_i64_from_file(MSGSIZE_MAX)? {
|
||||||
|
return Err(Error::MaximumMessageSizeExceeded())
|
||||||
|
}
|
||||||
|
|
||||||
|
let oflags = {
|
||||||
|
let mut flags = mqueue::MQ_OFlag::empty();
|
||||||
|
// Put queue in r/w mode
|
||||||
|
flags.toggle(mqueue::O_RDWR);
|
||||||
|
// Enable queue creation
|
||||||
|
flags.toggle(mqueue::O_CREAT);
|
||||||
|
// Fail if queue exists already
|
||||||
|
flags.toggle(mqueue::O_EXCL);
|
||||||
|
flags
|
||||||
|
};
|
||||||
|
|
||||||
|
let attr = mqueue::MqAttr::new(
|
||||||
|
0, max_pending, max_size, 0
|
||||||
|
);
|
||||||
|
|
||||||
|
let queue_descriptor = mqueue::mq_open(
|
||||||
|
&name.0,
|
||||||
|
oflags,
|
||||||
|
default_mode(),
|
||||||
|
Some(&attr),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(Queue {
|
||||||
|
name,
|
||||||
|
queue_descriptor,
|
||||||
|
max_pending,
|
||||||
|
max_size: max_size as usize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opens an existing queue.
|
||||||
|
pub fn open(name: Name) -> Result<Queue, Error> {
|
||||||
|
// No extra flags need to be constructed as the default is to open and fail if the
|
||||||
|
// queue does not exist yet - which is what we want here.
|
||||||
|
let oflags = mqueue::O_RDWR;
|
||||||
|
let queue_descriptor = mqueue::mq_open(
|
||||||
|
&name.0,
|
||||||
|
oflags,
|
||||||
|
default_mode(),
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let attr = mq_getattr(queue_descriptor)?;
|
||||||
|
|
||||||
|
Ok(Queue{
|
||||||
|
name,
|
||||||
|
queue_descriptor,
|
||||||
|
max_pending: attr.mq_maxmsg,
|
||||||
|
max_size: attr.mq_msgsize as usize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Opens an existing queue or creates a new queue with the OS default settings.
|
||||||
|
pub fn open_or_create(name: Name) -> Result<Queue, Error> {
|
||||||
|
let oflags = {
|
||||||
|
let mut flags = mqueue::MQ_OFlag::empty();
|
||||||
|
// Put queue in r/w mode
|
||||||
|
flags.toggle(mqueue::O_RDWR);
|
||||||
|
// Enable queue creation
|
||||||
|
flags.toggle(mqueue::O_CREAT);
|
||||||
|
flags
|
||||||
|
};
|
||||||
|
|
||||||
|
let default_pending = read_i64_from_file(MSG_DEFAULT)?;
|
||||||
|
let default_size = read_i64_from_file(MSGSIZE_DEFAULT)?;
|
||||||
|
let attr = mqueue::MqAttr::new(
|
||||||
|
0, default_pending, default_size, 0
|
||||||
|
);
|
||||||
|
|
||||||
|
let queue_descriptor = mqueue::mq_open(
|
||||||
|
&name.0,
|
||||||
|
oflags,
|
||||||
|
default_mode(),
|
||||||
|
Some(&attr),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let actual_attr = mq_getattr(queue_descriptor)?;
|
||||||
|
|
||||||
|
Ok(Queue {
|
||||||
|
name,
|
||||||
|
queue_descriptor,
|
||||||
|
max_pending: actual_attr.mq_maxmsg,
|
||||||
|
max_size: actual_attr.mq_msgsize as usize,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete a message queue from the system. This method will make the queue unavailable for
|
||||||
|
/// other processes, too!
|
||||||
|
pub fn delete(self) -> Result<(), Error> {
|
||||||
|
mqueue::mq_unlink(&self.name.0)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(self, msg: Message) -> Result<(), Error> {
|
||||||
|
if msg.data.len() > self.max_size as usize {
|
||||||
|
return Err(Error::MessageSizeExceeded())
|
||||||
|
}
|
||||||
|
|
||||||
|
mqueue::mq_send(
|
||||||
|
self.queue_descriptor.clone(),
|
||||||
|
msg.data.as_ref(),
|
||||||
|
msg.priority,
|
||||||
|
).map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn receive(self) -> Result<Message, Error> {
|
||||||
|
let mut data: Vec<u8> = vec![0; self.max_size as usize];
|
||||||
|
let mut priority: u32 = 0;
|
||||||
|
|
||||||
|
let msg_size = mqueue::mq_receive(
|
||||||
|
self.queue_descriptor.clone(),
|
||||||
|
data.as_mut(),
|
||||||
|
&mut priority,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
data.truncate(msg_size);
|
||||||
|
Ok(Message { data, priority })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn max_pending(&self) -> i64 {
|
||||||
|
self.max_pending
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn max_size(&self) -> usize {
|
||||||
|
self.max_size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates the default queue mode (0600).
|
||||||
|
fn default_mode() -> stat::Mode {
|
||||||
|
let mut mode = stat::Mode::empty();
|
||||||
|
mode.toggle(stat::S_IRUSR);
|
||||||
|
mode.toggle(stat::S_IWUSR);
|
||||||
|
mode
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This file defines the default number of maximum pending messages in a queue.
|
||||||
|
const MSG_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msg_default";
|
||||||
|
|
||||||
|
/// This file defines the system maximum number of pending messages in a queue.
|
||||||
|
const MSG_MAX: &'static str = "/proc/sys/fs/mqueue/msg_max";
|
||||||
|
|
||||||
|
/// This file defines the default maximum size of messages in a queue.
|
||||||
|
const MSGSIZE_DEFAULT: &'static str = "/proc/sys/fs/mqueue/msgsize_default";
|
||||||
|
|
||||||
|
/// This file defines the system maximum size for messages in a queue.
|
||||||
|
const MSGSIZE_MAX: &'static str = "/proc/sys/fs/mqueue/msgsize_max";
|
||||||
|
|
||||||
|
/// This method is used in combination with the above constants to find system limits.
|
||||||
|
fn read_i64_from_file(name: &str) -> Result<i64, Error> {
|
||||||
|
let mut file = File::open(name.to_string())?;
|
||||||
|
let mut content = String::new();
|
||||||
|
file.read_to_string(&mut content)?;
|
||||||
|
Ok(content.parse()?)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The mq_getattr implementation in the nix crate hides the maximum message size and count, which
|
||||||
|
/// is very impractical.
|
||||||
|
/// To work around it, this method calls the C-function directly.
|
||||||
|
fn mq_getattr(mqd: mqd_t) -> Result<libc::mq_attr, Error> {
|
||||||
|
use std::mem;
|
||||||
|
let mut attr = unsafe { mem::uninitialized::<libc::mq_attr>() };
|
||||||
|
let res = unsafe { libc::mq_getattr(mqd, &mut attr) };
|
||||||
|
nix::Errno::result(res)
|
||||||
|
.map(|_| attr)
|
||||||
|
.map_err(|e| e.into())
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue