1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
|
// SPDX-FileCopyrightText: V <v@anomalous.eu>
// SPDX-FileCopyrightText: edef <edef@edef.eu>
// SPDX-License-Identifier: OSL-3.0
package main
import (
"time"
)
type Session struct {
store *Store
id SessionID
q chan Message
done chan struct{}
}
func OpenSession(store *Store, host string) *Session {
session := &Session{
store: store,
id: store.WriteOpen(now(), host),
q: make(chan Message, 1024),
done: make(chan struct{}),
}
go session.batcher()
return session
}
func (s *Session) Write(from Side, data string) {
s.q <- Message{now(), from, data}
}
func (s *Session) Close(by Side, reason string) {
close(s.q)
<-s.done
s.store.WriteClose(s.id, now(), by, reason)
}
func now() Timestamp {
return Timestamp(time.Now().UnixNano())
}
func (s *Session) batcher() {
batch := make([]Message, cap(s.q))
for msg := range s.q {
batch = append(batch[:0], msg)
out:
for len(batch) < cap(batch) {
select {
case msg, ok := <-s.q:
if !ok {
break out
}
batch = append(batch, msg)
default:
break out
}
}
s.store.WriteBatch(s.id, batch)
for i := range batch {
batch[i] = Message{}
}
}
close(s.done)
}
|