peers syncing for a single user
This commit is contained in:
322
main.go
322
main.go
@@ -1,58 +1,20 @@
|
||||
// TODO
|
||||
// 1. Convery network messages to flatbuffers so we don't need to parse JSON in Go.
|
||||
// Use binary WS messages. Generate for Go and JS and use that everywhere. We can
|
||||
// also use this for WebRTC messages.
|
||||
// 2. Keep a list of all nodes that bootstrapped in the last N minutes.
|
||||
// 3. When a node bootstraps, send it a random list of the nodes we know about.
|
||||
// 4. Do the signalling to connect those nodes.
|
||||
// 5. Each node will know about N other nodes. To find get the data for a feed,
|
||||
// a node will need to find another node that has that data. To do that we'll need to
|
||||
// implement a search message that is sent to all currently connected nodes, and they
|
||||
// forward to all their nodes, passing back the address of the node that has the data.
|
||||
// Once we find it, we'll do the signalling to connect to it via Web RTC via our existing connected nodes.
|
||||
// -----
|
||||
// Feeds. People can curate feeds which can be any combination of hashtags, serch terms and users.
|
||||
// Invite-only communities. Just block everyone else even if they post to it.
|
||||
// Limit to friends and friends of friends
|
||||
|
||||
// MVP
|
||||
// You connect to the person you want to get the post from to get the post
|
||||
// they give you the post
|
||||
// If they're offline, you can't get their updates.
|
||||
// This is very stupid, but it's simplest thing.
|
||||
// The bootstrap server connects you to them directly via WebRTC
|
||||
// This will make the thing actually function as a little toy for people to play with.
|
||||
// This will let us test whether background tabs respond to webrtc requests.
|
||||
|
||||
// THEN
|
||||
// Need to have identity sorted out
|
||||
// When you read someone's posts, you also cache them locally
|
||||
// cache priority goes mutuals->people you follow->people who you folllow, follow, so you're always
|
||||
// caching your mutual's posts
|
||||
// Posts are samll, so caching per-post will work fine.
|
||||
// Then the process is for the bootstrap server to remember all nodes and what they're caching
|
||||
// This will allow distributed content delivery but put a memory and bendwidth strain on the
|
||||
// bootstrap sever. Look into Web Transport for the raspberry pi overhead. Could buy a few more RPIs
|
||||
// and make a little cluster
|
||||
|
||||
// ✅ Domain name so we can get a certificate and serve HTTPS / HTTP3
|
||||
|
||||
// Think about compiling Typescript on initial access and caching the JS in a service worker
|
||||
// so you don't need a build system to change things.
|
||||
// Think about self-hosting the client so the system can be completely self-hosted
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
// "strings"
|
||||
|
||||
"github.com/andybalholm/brotli"
|
||||
"github.com/gorilla/websocket"
|
||||
@@ -61,7 +23,7 @@ import (
|
||||
var upgrader = websocket.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool {
|
||||
origin := r.Header.Get("Origin")
|
||||
return origin == "https://ddlion.net"
|
||||
return origin == "https://ddlion.net" || origin == "https://ddln.app"
|
||||
},
|
||||
}
|
||||
|
||||
@@ -74,7 +36,7 @@ type Message struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
type MessageHandler func([]byte, *websocket.Conn) error
|
||||
type MessageHandler func([]byte, *Peer) ([]byte, error)
|
||||
|
||||
var messageHandlers = make(map[string]MessageHandler)
|
||||
|
||||
@@ -82,19 +44,45 @@ func registerHandler(messageType string, handler MessageHandler) {
|
||||
messageHandlers[messageType] = handler
|
||||
}
|
||||
|
||||
func dispatchMessage(message []byte, conn *websocket.Conn) error {
|
||||
func dispatchMessage(message []byte, peer *Peer) ([]byte, error) {
|
||||
var msg Message
|
||||
if err := json.Unmarshal(message, &msg); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
handler, ok := messageHandlers[msg.Type]
|
||||
if !ok {
|
||||
log.Printf("No handler registered for message type: %s", msg.Type)
|
||||
return nil
|
||||
err := fmt.Errorf("no handler registered for message type: %s", msg.Type)
|
||||
return []byte(fmt.Sprintf(`{"type":"error", "message": "%s"}`, err.Error())), nil
|
||||
}
|
||||
|
||||
return handler(message, conn)
|
||||
return handler(message, peer)
|
||||
}
|
||||
|
||||
const (
|
||||
writeWait = 10 * time.Second
|
||||
)
|
||||
|
||||
type Peer struct {
|
||||
conn *websocket.Conn
|
||||
send chan []byte
|
||||
lastActive time.Time
|
||||
}
|
||||
|
||||
func removePeer(peerID string, peer *Peer) {
|
||||
delete(peerConnections, peerID)
|
||||
|
||||
for userID, peers := range userPeers {
|
||||
delete(peers, peerID)
|
||||
if len(peers) == 0 {
|
||||
delete(userPeers, userID)
|
||||
}
|
||||
}
|
||||
|
||||
delete(connectionPeers, peer.conn)
|
||||
|
||||
// Close the peer's send channel
|
||||
close(peer.send)
|
||||
}
|
||||
|
||||
func handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -109,39 +97,93 @@ func handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
conn.SetCloseHandler(websocketCloseHandler)
|
||||
|
||||
// Create a Peer object with a buffered channel for sending messages
|
||||
peer := &Peer{
|
||||
conn: conn,
|
||||
send: make(chan []byte, 256),
|
||||
lastActive: time.Now(),
|
||||
}
|
||||
|
||||
// Start the write loop in a separate goroutine
|
||||
go writePump(peer)
|
||||
|
||||
for {
|
||||
_, message, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
log.Println("ReadMessage error:", err)
|
||||
break
|
||||
}
|
||||
log.Printf("recv: %s", message)
|
||||
|
||||
if err := dispatchMessage(message, conn); err != nil {
|
||||
peer.lastActive = time.Now()
|
||||
|
||||
fmt.Println("ws<-", connectionPeers[conn], ":", string(message[:min(80, len(message))]))
|
||||
|
||||
response, err := dispatchMessage(message, peer)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Error dispatching message: %v", err)
|
||||
}
|
||||
|
||||
if response != nil {
|
||||
// Send the response to the write loop
|
||||
peer.send <- response
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up when the connection is closed
|
||||
close(peer.send)
|
||||
peerID := connectionPeers[peer.conn]
|
||||
if peerID != "" {
|
||||
delete(peerConnections, peerID)
|
||||
}
|
||||
}
|
||||
|
||||
func writePump(peer *Peer) {
|
||||
defer func() {
|
||||
peer.conn.Close()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case message, ok := <-peer.send:
|
||||
if !ok {
|
||||
// Channel closed, close the connection
|
||||
peer.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
||||
return
|
||||
}
|
||||
peer.conn.SetWriteDeadline(time.Now().Add(writeWait))
|
||||
fmt.Println("ws->", connectionPeers[peer.conn], ":", string(message[:min(80, len(message))]))
|
||||
|
||||
err := peer.conn.WriteMessage(websocket.TextMessage, message)
|
||||
if err != nil {
|
||||
log.Println("WriteMessage error:", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Example handlers
|
||||
func handlePing(message []byte, conn *websocket.Conn) error {
|
||||
func handlePing(message []byte, peer *Peer) ([]byte, error) {
|
||||
var pingMsg struct {
|
||||
Type string `json:"type"`
|
||||
PeerID string `json:"peer_id"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(message, &pingMsg); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
log.Printf("Received ping from peer: %s", pingMsg.PeerID)
|
||||
return nil
|
||||
|
||||
// log.Printf("Received ping from peer: %s", pingMsg.PeerID)
|
||||
|
||||
return []byte(`{"type":"pong"}`), nil
|
||||
}
|
||||
|
||||
type PeerSet map[string]struct{}
|
||||
|
||||
var userPeers = make(map[string]PeerSet)
|
||||
var peerConnections = make(map[string]*websocket.Conn)
|
||||
var peerConnections = make(map[string]*Peer)
|
||||
var connectionPeers = make(map[*websocket.Conn]string)
|
||||
|
||||
func handleHello(message []byte, conn *websocket.Conn) error {
|
||||
func handleHello(message []byte, peer *Peer) ([]byte, error) {
|
||||
|
||||
var m struct {
|
||||
Type string `json:"type"`
|
||||
@@ -150,32 +192,65 @@ func handleHello(message []byte, conn *websocket.Conn) error {
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(message, &m); err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// log.Printf("Received hello from peer: %s, user:%s", m.PeerID, m.UserID)
|
||||
if userPeers[m.UserID] == nil {
|
||||
userPeers[m.UserID] = make(PeerSet)
|
||||
}
|
||||
|
||||
userPeers[m.UserID][m.PeerID] = struct{}{}
|
||||
peerConnections[m.PeerID] = conn
|
||||
peerConnections[m.PeerID] = peer
|
||||
connectionPeers[peer.conn] = m.PeerID
|
||||
|
||||
jsonData, _ := json.MarshalIndent(userPeers, "", " ")
|
||||
fmt.Println(string(jsonData), peerConnections)
|
||||
|
||||
log.Printf("Received connect from peer: %s, user:%s", m.PeerID, m.UserID)
|
||||
return nil
|
||||
// return all the peers we know about, with their user_id and peer_id
|
||||
|
||||
return []byte(fmt.Sprintf(`{"type":"hello", "userPeers": %s}`, string(jsonData))), nil
|
||||
}
|
||||
|
||||
// LoggingHandler logs requests and delegates them to the underlying handler.
|
||||
// type LoggingHandler struct {
|
||||
// handler http.Handler
|
||||
// }
|
||||
func handlePeerMessage(message []byte, peer *Peer) ([]byte, error) {
|
||||
|
||||
// func (lh *LoggingHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// log.Printf("Serving file: %s", r.URL.Path)
|
||||
// lh.handler.ServeHTTP(w, r)
|
||||
// }
|
||||
type InnerMessage struct {
|
||||
Type string `json:"type"`
|
||||
UserID string `json:"user_id"`
|
||||
}
|
||||
|
||||
type PeerMessage struct {
|
||||
Type string `json:"type"`
|
||||
From string `json:"from"`
|
||||
To string `json:"to"`
|
||||
Message InnerMessage `json:"message"`
|
||||
}
|
||||
var m PeerMessage
|
||||
|
||||
if err := json.Unmarshal(message, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Printf("peer message type %s from %s to %s with message length %d\n", m.Message.Type, m.From, m.To, len(message))
|
||||
|
||||
toPeer := peerConnections[m.To]
|
||||
|
||||
if toPeer == nil {
|
||||
fmt.Printf("Couldn't find peer %s\n", m.To)
|
||||
fmt.Println(peerConnections)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Send the message to the recipient's send channel
|
||||
select {
|
||||
case toPeer.send <- message:
|
||||
default:
|
||||
fmt.Println("Could not send message to peer; channel full or closed")
|
||||
}
|
||||
|
||||
// No response for this type of message
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// BrotliResponseWriter wraps http.ResponseWriter to support Brotli compression
|
||||
type brotliResponseWriter struct {
|
||||
@@ -207,8 +282,11 @@ func noDirListing(h http.Handler, root string) http.HandlerFunc {
|
||||
|
||||
log.Printf("Serving: %s to ip %s, useragent %s", r.URL.Path, r.RemoteAddr, r.UserAgent())
|
||||
|
||||
// w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
// Check if client supports Brotli encoding
|
||||
if strings.Contains(r.Header.Get("Accept-Encoding"), "br") {
|
||||
// if strings.Contains(r.Header.Get("Accept-Encoding"), "br") {
|
||||
if false {
|
||||
w.Header().Set("Content-Encoding", "br")
|
||||
w.Header().Del("Content-Length") // Cannot know content length with compressed data
|
||||
|
||||
@@ -231,31 +309,115 @@ func noDirListing(h http.Handler, root string) http.HandlerFunc {
|
||||
}
|
||||
}
|
||||
|
||||
func min(a, b int) int {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func main() {
|
||||
// Create a channel to receive OS signals for graceful shutdown
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Create a channel to signal when the program should shut down
|
||||
done := make(chan bool)
|
||||
|
||||
// Define the directory to serve and the port to listen on
|
||||
dir := "./"
|
||||
port := 6789
|
||||
|
||||
addr := ":" + strconv.Itoa(port)
|
||||
log.Printf("Starting server on %s", addr)
|
||||
|
||||
// Register handlers
|
||||
// Register message handlers
|
||||
registerHandler("hello", handleHello)
|
||||
registerHandler("ping", handlePing)
|
||||
registerHandler("peer_message", handlePeerMessage)
|
||||
|
||||
// Set up file server and WebSocket endpoint
|
||||
fs := http.FileServer(http.Dir(dir))
|
||||
// loggingHandler := &LoggingHandler{handler: fs}
|
||||
// http.Handle("/", loggingHandler)
|
||||
http.Handle("/", noDirListing(fs, dir))
|
||||
|
||||
http.HandleFunc("/ws", handleWebSocket)
|
||||
|
||||
// Configure and start the HTTP server
|
||||
// Configure the HTTP server
|
||||
server := &http.Server{
|
||||
Addr: addr,
|
||||
Handler: nil, // nil uses the default ServeMux, which we configured above
|
||||
Handler: nil, // Use the default ServeMux
|
||||
}
|
||||
|
||||
log.Printf("Server is configured and serving on port %d...", port)
|
||||
log.Fatal(server.ListenAndServeTLS("/etc/letsencrypt/live/ddlion.net/fullchain.pem", "/etc/letsencrypt/live/ddlion.net/privkey.pem"))
|
||||
// Start the inactivity monitor goroutine
|
||||
go func() {
|
||||
ticker := time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
now := time.Now()
|
||||
|
||||
// Collect inactive peers
|
||||
var inactivePeers []string
|
||||
for peerID, peer := range peerConnections {
|
||||
if now.Sub(peer.lastActive) > 60*time.Second {
|
||||
inactivePeers = append(inactivePeers, peerID)
|
||||
}
|
||||
}
|
||||
|
||||
// Remove inactive peers
|
||||
for _, peerID := range inactivePeers {
|
||||
peer := peerConnections[peerID]
|
||||
|
||||
if peer != nil {
|
||||
log.Printf("Peer %s inactive for more than 60 seconds. Closing connection.", peerID)
|
||||
peer.conn.Close()
|
||||
removePeer(peerID, peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Run a goroutine to handle graceful shutdown
|
||||
go func() {
|
||||
sig := <-sigChan
|
||||
fmt.Println()
|
||||
fmt.Println("Received signal:", sig)
|
||||
|
||||
// Perform cleanup here
|
||||
fmt.Println("Shutting down gracefully...")
|
||||
|
||||
// Create a context with timeout for the shutdown
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// Attempt to gracefully shut down the server
|
||||
if err := server.Shutdown(ctx); err != nil {
|
||||
log.Fatalf("Server Shutdown Failed:%+v", err)
|
||||
}
|
||||
|
||||
// Signal that shutdown is complete
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Start the HTTP server in a separate goroutine
|
||||
go func() {
|
||||
log.Printf("Server is configured and serving on port %d...", port)
|
||||
if err := server.ListenAndServeTLS(
|
||||
"/etc/letsencrypt/live/ddlion.net/fullchain.pem",
|
||||
"/etc/letsencrypt/live/ddlion.net/privkey.pem",
|
||||
); err != nil && err != http.ErrServerClosed {
|
||||
log.Fatalf("Could not listen on %s: %v\n", addr, err)
|
||||
}
|
||||
}()
|
||||
|
||||
fmt.Println("Program is running. Press Ctrl+C to exit.")
|
||||
|
||||
// Wait for the shutdown signal
|
||||
<-done
|
||||
fmt.Println("Program has exited.")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user