1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#![warn(missing_docs, unreachable_pub, rust_2018_idioms)]
#![forbid(unsafe_code)]
use futures::prelude::*;
use migrate_state::StateLock;
use std::time;
const STATE_LOCK_MIN_DURATION: time::Duration = time::Duration::from_secs(3);
const TEST_TIMEOUT: time::Duration = time::Duration::from_secs(30);
async fn expect_within_timeout<F: Future>(fut: F) -> F::Output {
futures::select! {
_ = tokio::time::sleep(TEST_TIMEOUT).fuse() => {
panic!("Timed out ({:?}) waiting for the future to resolve", TEST_TIMEOUT)
}
res = fut.fuse() => res,
}
}
pub async fn run_all<F>(mut create_state_lock_factory: impl FnMut() -> F)
where
F: Fn() -> Box<dyn StateLock>,
{
let factories = (create_state_lock_factory(), create_state_lock_factory());
futures::join!(storage(factories.0()), locking(&factories.1));
}
pub async fn storage(state_lock: Box<dyn StateLock>) {
let mut state = expect_within_timeout(state_lock.lock(false)).await.unwrap();
let client = state.client();
let initial_state = client.fetch().await.unwrap();
assert_eq!(initial_state, vec![]);
let new_state = vec![42];
client.update(new_state.clone()).await.unwrap();
let saved_state = client.fetch().await.unwrap();
assert_eq!(saved_state, new_state);
state.unlock().await.unwrap();
}
pub async fn locking(create_state_lock: &dyn Fn() -> Box<dyn StateLock>) {
let lock_state = |force| expect_within_timeout(create_state_lock().lock(force));
let lock = lock_state(false).await.unwrap();
futures::select! {
_ = tokio::time::sleep(STATE_LOCK_MIN_DURATION).fuse() => {}
state = lock_state(false).fuse() => {
let state = match state {
Ok(_) => "<resolved state lock>".to_owned(),
Err(err) => format!("{:?}", err),
};
panic!("Unexpected resolution of the state lock future: {}", state);
}
}
lock.unlock().await.unwrap();
let force_lock = || async {
let lock = lock_state(false).await.unwrap();
let forced_lock = futures::select! {
_ = tokio::time::sleep(STATE_LOCK_MIN_DURATION).fuse() => {
panic!("Force-lock the state hung up ({:?})", STATE_LOCK_MIN_DURATION);
}
state = lock_state(true).fuse() => state.unwrap(),
};
(lock, forced_lock)
};
let (lock, forced_lock) = force_lock().await;
lock.unlock().await.unwrap();
forced_lock.unlock().await.unwrap();
let (lock, forced_lock) = force_lock().await;
forced_lock.unlock().await.unwrap();
lock.unlock().await.unwrap();
}