From d49fe57776e6d19181c8ccb8d5332ed2c62d5ca8 Mon Sep 17 00:00:00 2001 From: V Date: Fri, 27 Aug 2021 06:08:09 +0200 Subject: Root commit Co-authored-by: edef --- store.go | 135 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 store.go (limited to 'store.go') diff --git a/store.go b/store.go new file mode 100644 index 0000000..5f228ae --- /dev/null +++ b/store.go @@ -0,0 +1,135 @@ +// SPDX-FileCopyrightText: V +// SPDX-FileCopyrightText: edef +// SPDX-License-Identifier: OSL-3.0 + +package main + +import ( + "database/sql" + + _ "github.com/mattn/go-sqlite3" +) + +const schema = ` + CREATE TABLE IF NOT EXISTS sessions ( + id INTEGER PRIMARY KEY, + opened_at INTEGER NOT NULL, + host TEXT NOT NULL, + closed_at INTEGER, + closed_by INTEGER, + closed_because TEXT + ); + + CREATE TABLE IF NOT EXISTS messages ( + session INTEGER NOT NULL REFERENCES sessions(id), + time INTEGER NOT NULL, + side INTEGER NOT NULL, + data TEXT NOT NULL + ); +` + +type SessionID int64 +type Timestamp int64 +type Side byte + +const ( + SideProxy Side = iota + SideClient + SideServer +) + +type Message struct { + when Timestamp + side Side + data string +} + +type Store struct { + open, batch, close *sql.Stmt + + q chan func(*sql.DB) + done chan struct{} +} + +func OpenStore(path string) *Store { + db, err := sql.Open("sqlite3", path+"?_foreign_keys=yes") + check(err) + + must(db.Exec(schema)) + must(db.Exec(`UPDATE sessions SET closed_at = ? WHERE closed_at IS NULL`, now())) + + prepare := func(query string) *sql.Stmt { + stmt, err := db.Prepare(query) + check(err) + return stmt + } + + s := &Store{ + open: prepare(`INSERT INTO sessions(opened_at, host) VALUES(?, ?)`), + batch: prepare(`INSERT INTO messages(session, time, side, data) VALUES(?, ?, ?, ?)`), + close: prepare(`UPDATE sessions SET closed_at = ?, closed_by = ?, closed_because = ? WHERE id = ?`), + + q: make(chan func(*sql.DB)), + done: make(chan struct{}), + } + + go func() { + for op := range s.q { + op(db) + } + check(db.Close()) + close(s.done) + }() + + return s +} + +func (s *Store) WriteOpen(when Timestamp, host string) SessionID { + ch := make(chan int64, 1) + s.q <- func(*sql.DB) { + id, err := must(s.open.Exec(when, host)).LastInsertId() + check(err) + ch <- id + } + return SessionID(<-ch) +} + +func (s *Store) WriteBatch(id SessionID, batch []Message) { + ch := make(chan struct{}) + s.q <- func(db *sql.DB) { + tx, err := db.Begin() + check(err) + + stmt := tx.Stmt(s.batch) + for _, msg := range batch { + must(stmt.Exec(id, msg.when, msg.side, msg.data)) + } + + check(tx.Commit()) + + close(ch) + } + <-ch +} + +func (s *Store) WriteClose(id SessionID, when Timestamp, by Side, reason string) { + s.q <- func(*sql.DB) { + must(s.close.Exec(when, by, reason, id)) + } +} + +func (s *Store) Close() { + close(s.q) + <-s.done +} + +func check(err error) { + if err != nil { + panic(err) + } +} + +func must(res sql.Result, err error) sql.Result { + check(err) + return res +} -- cgit 1.4.1