about summary refs log tree commit diff
path: root/store.go
diff options
context:
space:
mode:
authorV <v@anomalous.eu>2021-08-27 06:08:09 +0200
committerV <v@anomalous.eu>2021-08-27 06:08:09 +0200
commitd49fe57776e6d19181c8ccb8d5332ed2c62d5ca8 (patch)
treee3036d32e318bfaef519c316de7e7f44b9351fef /store.go
downloadloxy-trunk.tar.zst
Root commit HEAD v0.2.0 trunk
Co-authored-by: edef <edef@edef.eu>
Diffstat (limited to 'store.go')
-rw-r--r--store.go135
1 files changed, 135 insertions, 0 deletions
diff --git a/store.go b/store.go
new file mode 100644
index 0000000..5f228ae
--- /dev/null
+++ b/store.go
@@ -0,0 +1,135 @@
+// SPDX-FileCopyrightText: V <v@anomalous.eu>
+// SPDX-FileCopyrightText: edef <edef@edef.eu>
+// SPDX-License-Identifier: OSL-3.0
+
+package main
+
+import (
+	"database/sql"
+
+	_ "github.com/mattn/go-sqlite3"
+)
+
+const schema = `
+	CREATE TABLE IF NOT EXISTS sessions (
+		id             INTEGER PRIMARY KEY,
+		opened_at      INTEGER NOT NULL,
+		host           TEXT    NOT NULL,
+		closed_at      INTEGER,
+		closed_by      INTEGER,
+		closed_because TEXT
+	);
+
+	CREATE TABLE IF NOT EXISTS messages (
+		session INTEGER NOT NULL REFERENCES sessions(id),
+		time    INTEGER NOT NULL,
+		side    INTEGER NOT NULL,
+		data    TEXT    NOT NULL
+	);
+`
+
+type SessionID int64
+type Timestamp int64
+type Side byte
+
+const (
+	SideProxy Side = iota
+	SideClient
+	SideServer
+)
+
+type Message struct {
+	when Timestamp
+	side Side
+	data string
+}
+
+type Store struct {
+	open, batch, close *sql.Stmt
+
+	q    chan func(*sql.DB)
+	done chan struct{}
+}
+
+func OpenStore(path string) *Store {
+	db, err := sql.Open("sqlite3", path+"?_foreign_keys=yes")
+	check(err)
+
+	must(db.Exec(schema))
+	must(db.Exec(`UPDATE sessions SET closed_at = ? WHERE closed_at IS NULL`, now()))
+
+	prepare := func(query string) *sql.Stmt {
+		stmt, err := db.Prepare(query)
+		check(err)
+		return stmt
+	}
+
+	s := &Store{
+		open:  prepare(`INSERT INTO sessions(opened_at, host) VALUES(?, ?)`),
+		batch: prepare(`INSERT INTO messages(session, time, side, data) VALUES(?, ?, ?, ?)`),
+		close: prepare(`UPDATE sessions SET closed_at = ?, closed_by = ?, closed_because = ? WHERE id = ?`),
+
+		q:    make(chan func(*sql.DB)),
+		done: make(chan struct{}),
+	}
+
+	go func() {
+		for op := range s.q {
+			op(db)
+		}
+		check(db.Close())
+		close(s.done)
+	}()
+
+	return s
+}
+
+func (s *Store) WriteOpen(when Timestamp, host string) SessionID {
+	ch := make(chan int64, 1)
+	s.q <- func(*sql.DB) {
+		id, err := must(s.open.Exec(when, host)).LastInsertId()
+		check(err)
+		ch <- id
+	}
+	return SessionID(<-ch)
+}
+
+func (s *Store) WriteBatch(id SessionID, batch []Message) {
+	ch := make(chan struct{})
+	s.q <- func(db *sql.DB) {
+		tx, err := db.Begin()
+		check(err)
+
+		stmt := tx.Stmt(s.batch)
+		for _, msg := range batch {
+			must(stmt.Exec(id, msg.when, msg.side, msg.data))
+		}
+
+		check(tx.Commit())
+
+		close(ch)
+	}
+	<-ch
+}
+
+func (s *Store) WriteClose(id SessionID, when Timestamp, by Side, reason string) {
+	s.q <- func(*sql.DB) {
+		must(s.close.Exec(when, by, reason, id))
+	}
+}
+
+func (s *Store) Close() {
+	close(s.q)
+	<-s.done
+}
+
+func check(err error) {
+	if err != nil {
+		panic(err)
+	}
+}
+
+func must(res sql.Result, err error) sql.Result {
+	check(err)
+	return res
+}