1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
use std::fs;
use std::path::Path;
use std::str::FromStr;
use chrono::Utc;
use cron::Schedule;
use tokio::sync::broadcast;
use tokio::time::{interval, sleep, Duration};
use crate::config::OnlineBackup;
use crate::CoreAction;
use crate::actors::v1_read::QueryServerReadV1;
use crate::actors::v1_write::QueryServerWriteV1;
use kanidmd_lib::constants::PURGE_FREQUENCY;
use kanidmd_lib::event::{OnlineBackupEvent, PurgeRecycledEvent, PurgeTombstoneEvent};
pub struct IntervalActor;
impl IntervalActor {
pub fn start(
server: &'static QueryServerWriteV1,
mut rx: broadcast::Receiver<CoreAction>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut inter = interval(Duration::from_secs(PURGE_FREQUENCY));
loop {
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
_ = inter.tick() => {
server
.handle_purgetombstoneevent(PurgeTombstoneEvent::new())
.await;
server
.handle_purgerecycledevent(PurgeRecycledEvent::new())
.await;
}
}
}
info!("Stopped IntervalActor");
})
}
#[allow(clippy::result_unit_err)]
pub fn start_online_backup(
server: &'static QueryServerReadV1,
cfg: &OnlineBackup,
mut rx: broadcast::Receiver<CoreAction>,
) -> Result<tokio::task::JoinHandle<()>, ()> {
let outpath = cfg.path.to_owned();
let versions = cfg.versions;
let cron_expr = Schedule::from_str(cfg.schedule.as_str()).map_err(|e| {
error!("Online backup schedule parse error: {}", e);
error!("valid formats are:");
error!("sec min hour day of month month day of week year");
error!("@hourly | @daily | @weekly");
})?;
info!("Online backup schedule parsed as: {}", cron_expr);
if cron_expr.upcoming(Utc).next().is_none() {
error!(
"Online backup schedule error: '{}' will not match any date.",
cron_expr
);
return Err(());
}
let op = Path::new(&outpath);
if !op.exists() {
info!(
"Online backup output folder '{}' does not exist, trying to create it.",
outpath
);
fs::create_dir_all(&outpath).map_err(|e| {
error!(
"Online backup failed to create output directory '{}': {}",
outpath.clone(),
e
)
})?;
}
if !op.is_dir() {
error!("Online backup output '{}' is not a directory or we are missing permissions to access it.", outpath);
return Err(());
}
let handle = tokio::spawn(async move {
for next_time in cron_expr.upcoming(Utc) {
let wait_seconds = 1 + (next_time - Utc::now()).num_seconds() as u64;
info!(
"Online backup next run on {}, wait_time = {}s",
next_time, wait_seconds
);
tokio::select! {
Ok(action) = rx.recv() => {
match action {
CoreAction::Shutdown => break,
}
}
_ = sleep(Duration::from_secs(wait_seconds)) => {
if let Err(e) = server
.handle_online_backup(
OnlineBackupEvent::new(),
outpath.clone().as_str(),
versions,
)
.await
{
error!(?e, "An online backup error occurred.");
}
}
}
}
info!("Stopped OnlineBackupActor");
});
Ok(handle)
}
}