From d49fe57776e6d19181c8ccb8d5332ed2c62d5ca8 Mon Sep 17 00:00:00 2001 From: V Date: Fri, 27 Aug 2021 06:08:09 +0200 Subject: Root commit Co-authored-by: edef --- session.go | 69 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 session.go (limited to 'session.go') diff --git a/session.go b/session.go new file mode 100644 index 0000000..7a65230 --- /dev/null +++ b/session.go @@ -0,0 +1,69 @@ +// SPDX-FileCopyrightText: V +// SPDX-FileCopyrightText: edef +// SPDX-License-Identifier: OSL-3.0 + +package main + +import ( + "time" +) + +type Session struct { + store *Store + id SessionID + q chan Message + done chan struct{} +} + +func OpenSession(store *Store, host string) *Session { + session := &Session{ + store: store, + id: store.WriteOpen(now(), host), + q: make(chan Message, 1024), + done: make(chan struct{}), + } + + go session.batcher() + return session +} + +func (s *Session) Write(from Side, data string) { + s.q <- Message{now(), from, data} +} + +func (s *Session) Close(by Side, reason string) { + close(s.q) + <-s.done + s.store.WriteClose(s.id, now(), by, reason) +} + +func now() Timestamp { + return Timestamp(time.Now().UnixNano()) +} + +func (s *Session) batcher() { + batch := make([]Message, cap(s.q)) + for msg := range s.q { + batch = append(batch[:0], msg) + out: + for len(batch) < cap(batch) { + select { + case msg, ok := <-s.q: + if !ok { + break out + } + batch = append(batch, msg) + default: + break out + } + } + + s.store.WriteBatch(s.id, batch) + + for i := range batch { + batch[i] = Message{} + } + } + + close(s.done) +} -- cgit 1.4.1