1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
// SPDX-FileCopyrightText: edef <edef@unfathomable.blue>
// SPDX-License-Identifier: OSL-3.0
use std::{
sync::{Arc, Condvar, Mutex},
thread::{self, JoinHandle},
};
#[derive(Debug)]
pub struct Transactor {
link: Arc<Link>,
syncer: Option<JoinHandle<()>>,
}
impl Transactor {
pub fn new(tail: u64, f: impl FnMut(&State) + Send + 'static) -> Arc<Self> {
let link = Link::new(tail);
let syncer = {
let link = link.clone();
thread::spawn(move || link.run(f))
};
Arc::new(Self {
link,
syncer: Some(syncer),
})
}
pub fn write(self: &Arc<Self>, tx: u64, tail: u64) {
self.link.write(tx, tail)
}
pub fn wait(self: &Arc<Self>, tx: u64) {
self.link.wait(tx)
}
}
impl Drop for Transactor {
fn drop(&mut self) {
if let Some(syncer) = self.syncer.take() {
self.link.close();
syncer.join().unwrap();
}
}
}
#[derive(Debug)]
struct Link {
state: Mutex<State>,
written: Condvar,
synced: Condvar,
}
#[derive(Debug, Clone)]
pub struct State {
pub synced_tx: u64,
pub written_tx: u64,
pub written_tail: u64,
}
impl Link {
fn new(tail: u64) -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(State {
synced_tx: 0,
written_tx: 0,
written_tail: tail,
}),
written: Condvar::new(),
synced: Condvar::new(),
})
}
fn write(self: &Arc<Self>, tx: u64, tail: u64) {
{
let mut state = self.state.lock().unwrap();
assert!(state.written_tx < !0, "already closed");
assert!(state.written_tx < tx, "duplicate transaction ID");
state.written_tx = tx;
state.written_tail = tail;
}
self.written.notify_one();
}
fn close(self: &Arc<Self>) {
{
let mut state = self.state.lock().unwrap();
state.written_tx = !0;
}
self.written.notify_one();
}
fn wait(self: &Arc<Self>, tx: u64) {
let state = self.state.lock().unwrap();
drop(
self.synced
.wait_while(state, |state| state.synced_tx < tx)
.unwrap(),
);
}
fn run(self: &Arc<Self>, mut f: impl FnMut(&State)) {
let mut state = self.state.lock().unwrap();
while state.synced_tx < !0 {
state = self
.written
.wait_while(state, |state| state.synced_tx == state.written_tx)
.unwrap();
let state_seen = state.clone();
drop(state);
f(&state_seen);
state = self.state.lock().unwrap();
state.synced_tx = state_seen.written_tx;
self.synced.notify_all();
}
}
}
|