// connect to WS server, send info, connecto to bootstrap peer // once connected to bootstrap peer, // Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync. // how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP. import { generateID } from "IDUtils"; import { log, logID } from "log"; // Use a broadcast channel to only have one peer manager for multiple tabs, // then we won't need to have a session ID as all queries for a peerID will be coming from the same peer manager type PeerID = string; export enum PeerEventTypes { PEER_CONNECTED, PEER_DISCONNECTED, } export class PeerManager { routingTable: Map; peers: Map; // private signaler: Signaler; searchQueryFunctions: Map = new Map(); RPC_remote: Map = new Map(); rpc: { [key: string]: Function } = {}; isBootstrapPeer: boolean = false; bootstrapPeerConnection: PeerConnection | null = null; sessionID = generateID(); userID: string; peerID: PeerID; websocket: WebSocket | null = null; bootstrapPeerID: string | null = null; connectPromiseCallbacks: { resolve: Function, reject: Function } | null = null; connectPromise: Promise | null = null; pingPeers: RTCPeerConnection[] = []; watchdogPeriodSeconds: number = 10; eventListeners: Map = new Map(); reconnectPeriod: number = 10; messageSuperlog = false; watchdogInterval: number = 0; reconnectTimer: number | null = null; // async watchdog() { // // Check that we're connected to at least N peers. If not, reconnect to the bootstrap server. // if (this.peers.size === 0) { // await this.sendHello2(); // } // } websocketSend(message: any) { if (!this.websocket) { throw new Error(); } let messageJSON = ""; try { messageJSON = JSON.stringify(message); } catch (e) { log(e); return; } this.messageSuperlog && console.log.apply(null, log("<-signaler:", message)); this.websocket.send(messageJSON); } onWebsocketMessage(event: MessageEvent) { let messageJSON = event.data; let message: any = null; try { message = JSON.parse(messageJSON); } catch (e) { log(e); throw new Error(); } this.messageSuperlog && console.log.apply(null, log("->signaler:", message)); if (message.type === "hello2") { if (!this.isBootstrapPeer) { this.bootstrapPeerID = message.bootstrapPeers[0]; } this.onHello2Received(this.bootstrapPeerID as string); } if (message.type === "peer_message") { let peerConnection = this.peers.get(message.from); if (message.message.type === "rtc_description") { // let existingConnection = this.peers.get(message.from); // // We're already connected, so delete the existing connection and make a new one. if (peerConnection?.rtcPeer?.connectionState === "connected") { console.log.apply(null, log("Connecting peer is already connected. Deleting existing peer connection and reconnecting.")); peerConnection.disconnect(); this.peers.delete(message.from); peerConnection = undefined; } if (!peerConnection) { let remotePeerID = message.from; let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); if (this.isBootstrapPeer) { newPeer.setPolite(false); } peerConnection = newPeer; this.peers.set(newPeer.remotePeerID, newPeer); this.onConnectRequest(newPeer); } } if (!peerConnection) { console.log.apply(null, log("Can't find peer for peer message:", message)); return; } peerConnection.onWebsocketMessage(message.message); } } async onConnectRequest(newPeer: PeerConnection) { // let remotePeerID = message.from; // let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); // if (this.isBootstrapPeer) { // newPeer.setPolite(false); // } await newPeer.connect(); this.onPeerConnected(newPeer.remotePeerID); return newPeer; } async onHello2Received(bootstrapPeerID: string) { if (this.isBootstrapPeer) { this.connectPromiseCallbacks?.resolve(); return; } if (!bootstrapPeerID) { // console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds...")); // let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() }; // setTimeout(callSendHello2OnTimeout, 5_000); return; } this.bootstrapPeerConnection = await this.connectToPeer(bootstrapPeerID); this.connectPromiseCallbacks?.resolve(); } sendHello2() { this.websocketSend({ type: "hello2", user_id: this.userID, // user_name: app.username, peer_id: this.peerID, session_id: this.sessionID, // peer_name: app.peername, is_bootstrap_peer: this.isBootstrapPeer, // peer_description: this.rtcPeerDescription }); } websocketSendPeerMessage(remotePeerID: string, peerMessage: { type: string; description: RTCSessionDescription; }) { this.websocketSend({ type: "peer_message", from: this.peerID, to: remotePeerID, from_username: "blah user", from_peername: "blah peer", message: peerMessage }); // let responseMessage = { type: "peer_message", // from: app.peerID, // to: data.from, // from_username: app.username, // from_peername: app.peername, // message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } } } constructor(userID: string, peerID: string, isBootstrapPeer: boolean) { this.isBootstrapPeer = isBootstrapPeer; this.peers = new Map(); this.routingTable = new Map(); this.userID = userID; this.peerID = peerID; } disconnect() { this.websocket?.close(); for (let peer of this.peers.values()) { peer.disconnect(); } } connectWebSocket() { try { let hostname = globalThis?.location?.hostname ?? 'ddln.app'; let port = globalThis?.location?.port ?? '443'; let wsURL = `wss://${hostname}:${port}/ws`; console.log.apply(null, log(`Attempting to connect websocket to URL: ${wsURL}`)); this.websocket = new WebSocket(wsURL); // this.websocket.onclose = (e: CloseEvent) => { // let closedUnexpectedly = !e.wasClean; // if (closedUnexpectedly) { // console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`)); // // let alreadyReconnecting = this.reconnectTimer !== null; // // if (!alreadyReconnecting) { // this.reconnectTimer = globalThis.setTimeout(() => { // console.log.apply(null, log(`Reconnecting web socket`)); // this.reconnectTimer = null; // this.connectWebSocket(); // }, this.reconnectPeriod * 1000) // }; // } // } } catch (error: any) { throw new Error(error.message); } this.websocket.onopen = async (event) => { console.log.apply(null, log("peermanager:ws:onopen")); this.sendHello2(); }; this.websocket.onmessage = this.onWebsocketMessage.bind(this); } connect() { // setInterval(this.watchdog.bind(this), this.watchdogPeriodSeconds * 1000); // Side effects :( if (!this.watchdogInterval) { this.watchdogInterval = setInterval(() => { if (!this.isBootstrapPeer && this.peers.size === 0) { console.log.apply(null, log(`No peers connected, will attempt to reconnect in ${this.reconnectPeriod} seconds...`)); // Websocket reconnect if (this.websocket?.readyState === WebSocket.OPEN) { this.sendHello2(); } if (this.websocket?.readyState === WebSocket.CLOSED) { this.connectWebSocket(); } } let output = `local peerID:${logID(this.peerID)}` + "\n"; for (let [peerID, peer] of this.peers) { output += `${logID(peerID)}: ${peer.rtcPeer?.connectionState}` + "\n"; } console.log.apply(null, log(output)); }, this.reconnectPeriod * 1000); } let connectPromise = this.connectPromise; if (!connectPromise) { connectPromise = new Promise((resolve, reject) => { this.connectPromiseCallbacks = { resolve, reject }; }); this.connectPromise = connectPromise; } this.connectWebSocket(); return connectPromise; // this.signaler = new Signaler(userID, peerID, isBootstrapPeer, this.onConnected.bind(this)); // Testing // let dummyPeer = new PeerConnection(this, "dummy_peer", this.websocketSendPeerMessage.bind(this)); // this.peers.set("dummy_peer", dummyPeer); } async connectToPeer(remotePeerID: string) { // Connect to the peer that has the peer id remotePeerID. // TODO how do multiple windows / tabs from the same peer and user work? // Need to decide if they should all get a unique connection. A peer should only be requesting and writing // Data once though, so it probably need to be solved on the client side as the data is shared obv // Maybe use BroadcastChannel to proxy all calls to peermanager? That will probably really complicate things. // What if we just user session+peerID for the connections? Then we might have two windows making requests // For IDs etc, it would probably be best to proxy everything. // Maybe once we put this logic in a web worker, we'll need an interface to it that works over postMessage // anyway, and at that point, we could just use that same interface over a broadcastChannel // let's keep it simple for now and ignore the problem :) let peerConnection = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); this.peers.set(remotePeerID, peerConnection); await peerConnection.connect(); this.onPeerConnected(remotePeerID); return peerConnection; } onPeerConnected(peerID: PeerID) { console.log.apply(null, log(`PeerManager: Successfully connected to peer ${peerID}`)); this.dispatchEvent(PeerEventTypes.PEER_CONNECTED, { peerID: peerID }); } dispatchEvent(event: PeerEventTypes, parameters: any) { let listeners = this.eventListeners.get(event); if (!listeners) { return; } for (let listener of listeners) { listener(parameters); } } addEventListener(eventName: PeerEventTypes, func: Function) { let listeners = this.eventListeners.get(eventName); if (!listeners) { this.eventListeners.set(eventName, [func]); } } onPeerDisconnected(remotePeerID: PeerID) { let deleted = this.peers.delete(remotePeerID); if (!deleted) { throw new Error(`Can't find peer that disconnected ${remotePeerID}`); } // TODO: What do we do if we lose connection to the bootstrap peer? // If we have other connections, it probably doesn't matter. // Eventually we want the bootstrap peer to be no different than any other peer anyway. // We should disconnect from the websocket once we connect to our intial peers. // If we have no peer connections, try to connect. If connection fails, start a timer to reconnect. if (remotePeerID === this.bootstrapPeerID) { this.bootstrapPeerID = null; this.bootstrapPeerConnection = null; } this.dispatchEvent(PeerEventTypes.PEER_DISCONNECTED, { peerID: remotePeerID }); } async disconnectFromPeer(remotePeerID: string) { let peer = this.peers.get(remotePeerID); if (!peer) { console.log.apply(null, log(`PeerManager.disconnect: couldn't find peer ${remotePeerID}`)); return; } console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`)); await peer.disconnect(); this.onPeerDisconnected(remotePeerID); } async call(peerID: string, functionName: string, args: any) { let peer = this.peers.get(peerID); if (!peer) { console.log.apply(null, log(`Can't find peer ${peerID}`)); return; } let returnValues = await peer.call(functionName, args); return returnValues; } async callFromRemote(functionName: string, args: any) { let func = this.RPC_remote.get(functionName); if (!func) { throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); } let returnValues = await func.apply(null, args); return returnValues; } registerRPC(functionName: string, func: Function) { this.rpc[functionName] = (peerID: string, ...args: any) => { return this.call(peerID, functionName, args); }; this.RPC_remote.set(functionName, func); } registerSearchQuery(searchType: string, queryFunction: Function) { this.searchQueryFunctions.set(searchType, queryFunction); } async search(type: string, message: any) { let promises = []; for (let peer of this.peers.values()) { promises.push(peer.call(type, message)); } return await Promise.allSettled(promises); } onMessage(remotePeerID: string, message: any) { console.log.apply(null, log(remotePeerID, message)); } } interface Message { type: string; from_peer: string; to_peer: string; from_username: string; from_peername: string; peer_message: any; } class PeerConnection { remotePeerID: string; // private signaler: Signaler; private peerManager: PeerManager; private dataChannel: RTCDataChannel | null = null; private messageHandlers: Map = new Map(); private sendPeerMessage: Function; private makingOffer: boolean = false; private ignoreOffer: boolean = false; private isSettingRemoteAnswerPending: boolean = false; private polite = true; private webRTCSuperlog = false; private dataChannelSuperlog = true; messageSuperlog: boolean = true; rpcSuperlog: boolean = false; pendingRPCs: Map< string, { resolve: Function; reject: Function; functionName: string } > = new Map(); connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null; // private makingOffer:boolean = false; // private ignoreOffer:boolean = false; rtcPeer: RTCPeerConnection | null = null; static config = { iceServers: [ { urls: "stun:ddln.app" }, { urls: "turn:ddln.app", username: "a", credential: "b" }, { urls: "stun:stun.l.google.com" }, // keeping this for now as my STUN server is not return ipv6 // { urls: "stun:stun1.l.google.com" }, // { urls: "stun:stun2.l.google.com" }, // { urls: "stun:stun3.l.google.com" }, // { urls: "stun:stun4.l.google.com" }, ], }; async RPCHandler(message: any) { } constructor( peerManager: PeerManager, remotePeerID: string, sendPeerMessage: Function, ) { this.sendPeerMessage = sendPeerMessage; this.peerManager = peerManager; this.remotePeerID = remotePeerID; // this.signaler = signaler; // this.signaler.route(remotePeerID, this); } setPolite(polite: boolean) { this.polite = polite; } setupDataChannel() { if (!this.dataChannel) { throw new Error(); } this.dataChannel.onopen = (e: any) => { if (!this.dataChannel) { throw new Error(); } this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID)); this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID }); // this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`); console.log.apply(null, log([...this.peerManager.peers.keys()])); if (this.peerManager.isBootstrapPeer) { this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) }); // this.dataChannel.send(JSON.stringify()); } this.connectionPromise?.resolve(this.remotePeerID); //globalThis.setTimeout(()=>this.connectionPromise?.resolve(this.remotePeerID), 5000); } this.dataChannel.onmessage = (e: MessageEvent) => { this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]: `, e.data)); this.onMessage(e.data); } this.dataChannel.onclose = (e: Event) => { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`)); this.peerManager.disconnectFromPeer(this.remotePeerID); } this.dataChannel.onerror = (e:RTCErrorEvent) => { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error)); } } async connect() { let connectionPromise = new Promise((resolve, reject) => { this.connectionPromise = { resolve, reject } }); this.rtcPeer = new RTCPeerConnection(PeerConnection.config); this.rtcPeer.onconnectionstatechange = async (e: any) => { this.webRTCSuperlog && console.log.apply(null, log(`rtcPeer: onconnectionstatechange: ${this.rtcPeer?.connectionState}: ${this.remotePeerID}`)); if (!this.rtcPeer) { throw new Error("onconnectionstatechange"); } // When the connection is closed, tell the peer manager that this connection has gone away if (this.rtcPeer.connectionState === "failed") { this.peerManager.onPeerDisconnected(this.remotePeerID); // globalThis.setTimeout(async () => { await this.peerManager.connectToPeer(this.remotePeerID) }, 10_000); } if (this.rtcPeer.connectionState === "connected") { // Check the selected candidates const stats = await this.rtcPeer.getStats() as any; let localIP = ''; let remoteIP = ''; for (const report of stats.values()) { if (report.type === 'transport') { let candidatePair = stats.get(report.selectedCandidatePairId) as RTCIceCandidatePairStats; let localCandidate = stats.get(candidatePair.localCandidateId); let remoteCandidate = stats.get(candidatePair.remoteCandidateId); this.webRTCSuperlog && console.log.apply(null, log("Connected candidates\n", localCandidate, remoteCandidate)); } } } } this.rtcPeer.ondatachannel = (e: any) => { let dataChannel = e.channel; this.dataChannel = dataChannel as RTCDataChannel; this.setupDataChannel(); } if (this.polite) { this.dataChannel = this.rtcPeer.createDataChannel("ddln_main"); this.setupDataChannel(); } if (this.rtcPeer === null) { return; } // this.rtcPeer.onicecandidate = ({ candidate }) => this.signaler.send(JSON.stringify({ candidate })); // this.rtcPeer.onicecandidate = ({ candidate }) => console.log.apply(null, log(candidate); this.rtcPeer.onicecandidate = ({ candidate }) => { this.webRTCSuperlog && console.log.apply(null, log(candidate)); this.sendPeerMessage(this.remotePeerID, { type: "rtc_candidate", candidate: candidate }); } this.rtcPeer.onnegotiationneeded = async (event) => { this.webRTCSuperlog && console.log.apply(null, log("on negotiation needed fired")); if (!this.rtcPeer) { throw new Error(); } try { this.makingOffer = true; await this.rtcPeer.setLocalDescription(); if (!this.rtcPeer.localDescription) { return; } this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription }); } catch (err) { console.error(err); } finally { this.makingOffer = false; } }; return connectionPromise; } async onWebsocketMessage(message: any) { if (message.type == "rtc_connect") { this.rtcPeer?.setRemoteDescription(message.description); } // /* // let ignoreOffer = false; // let isSettingRemoteAnswerPending = false; // signaler.onmessage = async ({ data: { description, candidate } }) => { if (!this.rtcPeer) { throw new Error(); } let description = null; if (message.type == "rtc_description") { description = message.description; } let candidate = null; if (message.type == "rtc_candidate") { candidate = message.candidate; } try { if (description) { const readyForOffer = !this.makingOffer && (this.rtcPeer.signalingState === "stable" || this.isSettingRemoteAnswerPending); const offerCollision = description.type === "offer" && !readyForOffer; this.ignoreOffer = !this.polite && offerCollision; if (this.ignoreOffer) { console.warn(">>>>>>>>>>>>>>>>>IGNORING OFFER"); return; } this.isSettingRemoteAnswerPending = description.type == "answer"; await this.rtcPeer.setRemoteDescription(description); this.isSettingRemoteAnswerPending = false; if (description.type === "offer") { await this.rtcPeer.setLocalDescription(); this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription }); } } else if (candidate) { try { await this.rtcPeer.addIceCandidate(candidate); } catch (err) { if (!this.ignoreOffer) { throw err; } } } } catch (err) { console.error(err); } // }; // */ } disconnect() { this.rtcPeer?.close(); this.rtcPeer = null; } send(message: any) { let messageJSON = JSON.stringify(message); this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); this.dataChannel?.send(messageJSON); // this.onMessage(messageJSON); } call(functionName: string, args: any) { let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer // Think about a timeout here to auto reject it after a while. let promise = new Promise((resolve, reject) => { this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); // setTimeout(() => reject("bad"), 1000); }); let message = { type: "rpc_call", transaction_id: transactionID, function_name: functionName, args: args, }; this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); this.send(message); return promise; } async onMessage(messageJSON: any) { let message: any = {}; try { message = JSON.parse(messageJSON); } catch (e) { console.log.apply(null, log("PeerConnection.onMessage:", e)); } this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); let type = message.type; if (type === "rpc_response") { this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); let pendingRPC = this.pendingRPCs.get(message.transaction_id); if (!pendingRPC) { throw new Error(); } pendingRPC.resolve(message.response); } if (type === "rpc_call") { this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); let response = await this.peerManager.callFromRemote(message.function_name, message.args); this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); if (response === undefined) { return; } let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; this.send(responseMessage); } if (type === "initial_peers") { for (let peerID of message.peers) { console.log(log("Connecting to initial peer ", peerID)); this.peerManager.connectToPeer(peerID); } } // this.peerManger.onMessage(this.remotePeerID, message); } }