// connect to WS server, send info, connecto to bootstrap peer // once connected to bootstrap peer, // Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync. // how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP. import { generateID } from "IDUtils"; import { log, logID } from "log"; export var PeerEventTypes; (function (PeerEventTypes) { PeerEventTypes[PeerEventTypes["PEER_CONNECTED"] = 0] = "PEER_CONNECTED"; PeerEventTypes[PeerEventTypes["PEER_DISCONNECTED"] = 1] = "PEER_DISCONNECTED"; })(PeerEventTypes || (PeerEventTypes = {})); export class PeerManager { hashIdToIndices(id) { let indices = []; for (let char of id) { if (char !== '0' && char !== '-') { indices.push(parseInt(char, 16)); if (indices.length == 2) { break; } } } return [indices[0], indices[1]]; } funkyName(id, listOne, listTwo) { let [one, two] = this.hashIdToIndices(id); let first = listOne[one % listOne.length]; let second = listTwo[two % listTwo.length]; return { first, second }; } getPeername(peerID) { let { first: adjective, second: snake } = this.funkyName(peerID, this.adjectives, this.snakes); let peername = `${adjective}_${snake}`; return peername; } isBootstrapPeer(peerID) { return this.bootstrapPeerIDs?.has(peerID); } websocketSend(message) { if (!this.websocket) { throw new Error(); } let messageJSON = ""; try { messageJSON = JSON.stringify(message); } catch (e) { log(e); return; } this.messageSuperlog && console.log.apply(null, log("<-signaler:", message)); this.websocket.send(messageJSON); } onWebsocketMessage(event) { let messageJSON = event.data; let message = null; try { message = JSON.parse(messageJSON); } catch (e) { log(e); throw new Error(); } this.messageSuperlog && console.log.apply(null, log("->signaler:", message)); if (message.type === "hello2") { if (!this._isBootstrapPeer && Array.isArray(message?.bootstrapPeers)) { this.bootstrapPeerIDs = new Set(message.bootstrapPeers); } this.onHello2Received(this.bootstrapPeerIDs); } if (message.type === "peer_message") { let peerConnection = this.peers.get(message.from); if (message.message.type === "rtc_description") { // let existingConnection = this.peers.get(message.from); // // We're already connected, so delete the existing connection and make a new one. if (peerConnection?.rtcPeer?.connectionState === "connected") { console.log.apply(null, log("Connecting peer is already connected. Deleting existing peer connection and reconnecting.")); peerConnection.disconnect(); this.peers.delete(message.from); peerConnection = undefined; } if (!peerConnection) { let remotePeerID = message.from; let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); if (this._isBootstrapPeer) { newPeer.setPolite(false); } peerConnection = newPeer; this.peers.set(newPeer.remotePeerID, newPeer); this.onConnectRequest(newPeer); } } if (!peerConnection) { console.log.apply(null, log("Can't find peer for peer message:", message)); return; } peerConnection.onWebsocketMessage(message.message); } } async onConnectRequest(newPeer) { // let remotePeerID = message.from; // let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); // if (this.isBootstrapPeer) { // newPeer.setPolite(false); // } await newPeer.connect(); this.onPeerConnected(newPeer.remotePeerID); return newPeer; } async onHello2Received(bootstrapPeerIDs) { if (this._isBootstrapPeer) { this.connectPromiseCallbacks?.resolve(); return; } if (!bootstrapPeerIDs) { console.log.apply(null, log("Didn't get any bootstrap peer, waiting 10 seconds...")); // let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() }; // setTimeout(callSendHello2OnTimeout, 5_000); return; } let bootstrapPeerConnectionPromises = []; for (let peerID of bootstrapPeerIDs.keys()) { // this.bootstrapPeerConnection = await this.connectToPeer(peerID); let bootstrapPeerConnectionPromise = new Promise(async (resolve, reject) => { let peerConnection = await this.connectToPeer(peerID); if (!peerConnection) { reject(peerConnection); } this.bootstrapPeerConnections?.set(peerID, peerConnection); resolve(peerConnection); }); bootstrapPeerConnectionPromises.push(bootstrapPeerConnectionPromise); } await Promise.race(bootstrapPeerConnectionPromises); this.connectPromiseCallbacks?.resolve(); } sendHello2() { this.websocketSend({ type: "hello2", user_id: this.userID, // user_name: app.username, peer_id: this.peerID, session_id: this.sessionID, // peer_name: app.peername, is_bootstrap_peer: this._isBootstrapPeer, // peer_description: this.rtcPeerDescription }); } websocketSendPeerMessage(remotePeerID, peerMessage) { this.websocketSend({ type: "peer_message", from: this.peerID, to: remotePeerID, from_username: "blah user", from_peername: "blah peer", message: peerMessage }); // let responseMessage = { type: "peer_message", // from: app.peerID, // to: data.from, // from_username: app.username, // from_peername: app.peername, // message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } } } constructor(userID, peerID, isBootstrapPeer) { // private signaler: Signaler; this.searchQueryFunctions = new Map(); this.RPC_remote = new Map(); this.rpc = {}; this._isBootstrapPeer = false; this.bootstrapPeerConnections = null; this.sessionID = generateID(); this.websocket = null; this.bootstrapPeerIDs = null; this.connectPromiseCallbacks = null; this.connectPromise = null; this.pingPeers = []; this.watchdogPeriodSeconds = 10; this.eventListeners = new Map(); this.reconnectPeriod = 10; this.messageSuperlog = false; this.watchdogInterval = null; this.reconnectTimer = null; this.peerStateSuperlog = true; // async watchdog() { // // Check that we're connected to at least N peers. If not, reconnect to the bootstrap server. // if (this.peers.size === 0) { // await this.sendHello2(); // } // } this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal']; this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy']; this.snakes = ['mamba', 'cobra', 'python', 'viper', 'krait', 'sidewinder', 'constrictor', 'boa', 'asp', 'anaconda', 'krait']; this._isBootstrapPeer = isBootstrapPeer; this.peers = new Map(); this.routingTable = new Map(); this.userID = userID; this.peerID = peerID; } disconnect() { this.websocket?.close(); for (let peer of this.peers.values()) { peer.disconnect(); } } connectWebSocket() { try { let hostname = globalThis?.location?.hostname ?? 'ddln.app'; let port = globalThis?.location?.port ?? '443'; let wsURL = `wss://${hostname}:${port}/ws`; console.log.apply(null, log(`Attempting to connect websocket to URL: ${wsURL}`)); this.websocket = new WebSocket(wsURL); // this.websocket.onclose = (e: CloseEvent) => { // let closedUnexpectedly = !e.wasClean; // if (closedUnexpectedly) { // console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`)); // // let alreadyReconnecting = this.reconnectTimer !== null; // // if (!alreadyReconnecting) { // this.reconnectTimer = globalThis.setTimeout(() => { // console.log.apply(null, log(`Reconnecting web socket`)); // this.reconnectTimer = null; // this.connectWebSocket(); // }, this.reconnectPeriod * 1000) // }; // } // } } catch (error) { throw new Error(error.message); } this.websocket.onopen = async (event) => { console.log.apply(null, log("peermanager:ws:onopen")); this.sendHello2(); }; this.websocket.onmessage = this.onWebsocketMessage.bind(this); } connect() { // setInterval(this.watchdog.bind(this), this.watchdogPeriodSeconds * 1000); // Side effects :( if (!this.watchdogInterval) { this.watchdogInterval = setInterval(() => { let numActive = 0; for (let [id, peer] of this.peers) { if ( /*id === this.bootstrapPeerID ||*/peer.rtcPeer?.connectionState === "new" || peer.rtcPeer?.connectionState === "connecting") { continue; } numActive++; } if (!this._isBootstrapPeer && numActive === 0) { console.log.apply(null, log(`No peers connected, will attempt to reconnect in ${this.reconnectPeriod} seconds...`)); // Websocket reconnect if (this.websocket?.readyState === WebSocket.OPEN) { this.sendHello2(); } if (this.websocket?.readyState === WebSocket.CLOSED) { this.connectWebSocket(); } } let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n"; for (let [peerID, peer] of this.peers) { output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${this.bootstrapPeerIDs?.has(peerID) ? "[Bootstrap]" : ""}` + "\n"; if (peer.rpcSuperlog) { for (let transactionID of peer.pendingRPCs.keys()) { output += `[${logID(transactionID)}]`; } } } output += `numActivePeers: ${numActive}` + "\n"; console.log.apply(null, log(output)); }, this.reconnectPeriod * 1000); } let connectPromise = this.connectPromise; if (!connectPromise) { connectPromise = new Promise((resolve, reject) => { this.connectPromiseCallbacks = { resolve, reject }; }); this.connectPromise = connectPromise; } this.connectWebSocket(); return connectPromise; // this.signaler = new Signaler(userID, peerID, isBootstrapPeer, this.onConnected.bind(this)); // Testing // let dummyPeer = new PeerConnection(this, "dummy_peer", this.websocketSendPeerMessage.bind(this)); // this.peers.set("dummy_peer", dummyPeer); } async connectToPeer(remotePeerID) { // Connect to the peer that has the peer id remotePeerID. // TODO how do multiple windows / tabs from the same peer and user work? // Need to decide if they should all get a unique connection. A peer should only be requesting and writing // Data once though, so it probably need to be solved on the client side as the data is shared obv // Maybe use BroadcastChannel to proxy all calls to peermanager? That will probably really complicate things. // What if we just user session+peerID for the connections? Then we might have two windows making requests // For IDs etc, it would probably be best to proxy everything. // Maybe once we put this logic in a web worker, we'll need an interface to it that works over postMessage // anyway, and at that point, we could just use that same interface over a broadcastChannel // let's keep it simple for now and ignore the problem :) let peerConnection = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this)); this.peers.set(remotePeerID, peerConnection); await peerConnection.connect(); this.onPeerConnected(remotePeerID); return peerConnection; } onPeerConnected(peerID) { this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: Successfully connected to peer ${peerID}`)); this.dispatchEvent(PeerEventTypes.PEER_CONNECTED, { peerID: peerID }); } dispatchEvent(event, parameters) { let listeners = this.eventListeners.get(event); if (!listeners) { return; } for (let listener of listeners) { listener(parameters); } } addEventListener(eventName, func) { let listeners = this.eventListeners.get(eventName); if (!listeners) { this.eventListeners.set(eventName, [func]); } } onPeerDisconnected(peerID) { let deleted = this.peers.delete(peerID); if (!deleted) { throw new Error(`Can't find peer that disconnected ${peerID}`); } // TODO: What do we do if we lose connection to the bootstrap peer? // If we have other connections, it probably doesn't matter. // Eventually we want the bootstrap peer to be no different than any other peer anyway. // We should disconnect from the websocket once we connect to our intial peers. // If we have no peer connections, try to connect. If connection fails, start a timer to reconnect. if (this.bootstrapPeerIDs?.has(peerID)) { this.bootstrapPeerIDs.delete(peerID); this.bootstrapPeerConnections?.delete(peerID); } this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`)); this.dispatchEvent(PeerEventTypes.PEER_DISCONNECTED, { peerID: peerID }); } async disconnectFromPeer(remotePeerID) { let peer = this.peers.get(remotePeerID); if (!peer) { console.log.apply(null, log(`PeerManager.disconnect: couldn't find peer ${remotePeerID}`)); return; } console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`)); await peer.disconnect(); this.onPeerDisconnected(remotePeerID); } async call(peerID, functionName, args) { let peer = this.peers.get(peerID); if (!peer) { console.log.apply(null, log(`Can't find peer ${peerID}`)); return; } let returnValues = await peer.call(functionName, args); return returnValues; } async callFromRemote(functionName, args) { let func = this.RPC_remote.get(functionName); if (!func) { throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); } let returnValues = await func.apply(null, args); return returnValues; } registerRPC(functionName, func) { this.rpc[functionName] = (peerID, ...args) => { return this.call(peerID, functionName, args); }; this.RPC_remote.set(functionName, func); } registerSearchQuery(searchType, queryFunction) { this.searchQueryFunctions.set(searchType, queryFunction); } async search(type, message) { let promises = []; for (let peer of this.peers.values()) { promises.push(peer.call(type, message)); } return await Promise.allSettled(promises); } onMessage(remotePeerID, message) { console.log.apply(null, log(remotePeerID, message)); } } class PeerConnection { async RPCHandler(message) { } constructor(peerManager, remotePeerID, sendPeerMessage) { this.dataChannel = null; this.messageHandlers = new Map(); this.makingOffer = false; this.ignoreOffer = false; this.isSettingRemoteAnswerPending = false; this.polite = true; this.webRTCSuperlog = true; this.dataChannelSuperlog = true; this.chunkSize = (16 * 1024) - 100; this.messageSuperlog = false; this.sendQueueSuperLog = false; this.rpcSuperlog = false; this.pendingRPCs = new Map(); this.connectionPromise = null; // private makingOffer:boolean = false; // private ignoreOffer:boolean = false; this.rtcPeer = null; // longMessageQueue: string[] = []; this.longMessages = new Map(); this.chunkSuperlog = false; this.sendPeerMessage = sendPeerMessage; this.peerManager = peerManager; this.remotePeerID = remotePeerID; // this.signaler = signaler; // this.signaler.route(remotePeerID, this); } setPolite(polite) { this.polite = polite; } setupDataChannel() { if (!this.dataChannel) { throw new Error(); } this.dataChannel.onopen = (e) => { if (!this.dataChannel) { throw new Error(); } this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID)); this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID }); // this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`); // console.log.apply(null, log([...this.peerManager.peers.keys()])); if (this.peerManager._isBootstrapPeer) { this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) }); // this.dataChannel.send(JSON.stringify()); } this.connectionPromise?.resolve(this.remotePeerID); //globalThis.setTimeout(()=>this.connectionPromise?.resolve(this.remotePeerID), 5000); }; this.dataChannel.onmessage = (e) => { this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]: `, e.data)); this.onMessage(e.data); }; this.dataChannel.onclose = (e) => { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`)); this.peerManager.disconnectFromPeer(this.remotePeerID); }; this.dataChannel.onerror = (e) => { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error)); }; } async connect() { let connectionPromise = new Promise((resolve, reject) => { this.connectionPromise = { resolve, reject }; }); if (!(typeof RTCPeerConnection === "function")) { throw new Error("RTCPeerConnection is not a function, exiting."); } this.rtcPeer = new RTCPeerConnection(PeerConnection.config); this.rtcPeer.onconnectionstatechange = async (e) => { this.webRTCSuperlog && console.log.apply(null, log(`rtcPeer: onconnectionstatechange: ${this.rtcPeer?.connectionState}: ${this.remotePeerID}`)); if (!this.rtcPeer) { throw new Error("onconnectionstatechange"); } // When the connection is closed, tell the peer manager that this connection has gone away if (this.rtcPeer.connectionState === "failed") { this.peerManager.onPeerDisconnected(this.remotePeerID); // globalThis.setTimeout(async () => { await this.peerManager.connectToPeer(this.remotePeerID) }, 10_000); } if (this.rtcPeer.connectionState === "connected") { // Check the selected candidates const stats = await this.rtcPeer.getStats(); let localIP = ''; let remoteIP = ''; for (const report of stats.values()) { if (report.type === 'transport') { let candidatePair = stats.get(report.selectedCandidatePairId); let localCandidate = stats.get(candidatePair.localCandidateId); let remoteCandidate = stats.get(candidatePair.remoteCandidateId); this.webRTCSuperlog && console.log.apply(null, log("Connected candidates\n", localCandidate, remoteCandidate)); } } } }; this.rtcPeer.ondatachannel = (e) => { let dataChannel = e.channel; this.dataChannel = dataChannel; this.setupDataChannel(); }; if (this.polite) { this.dataChannel = this.rtcPeer.createDataChannel("ddln_main"); this.setupDataChannel(); } if (this.rtcPeer === null) { return; } // this.rtcPeer.onicecandidate = ({ candidate }) => this.signaler.send(JSON.stringify({ candidate })); // this.rtcPeer.onicecandidate = ({ candidate }) => console.log.apply(null, log(candidate); this.rtcPeer.onicegatheringstatechange = (event) => { this.webRTCSuperlog && console.log.apply(null, log("onicegatheringstatechange:", this.rtcPeer?.iceGatheringState)); }; this.rtcPeer.oniceconnectionstatechange = (event) => { this.webRTCSuperlog && console.log.apply(null, log("oniceconnectionstatechange:", this.rtcPeer?.iceConnectionState)); }; this.rtcPeer.onicecandidateerror = (event) => { this.webRTCSuperlog && console.log.apply(null, log(`onicecandidateerror: ${event.errorCode} ${event.errorText} ${event.address} ${event.url}`)); }; this.rtcPeer.onicecandidate = ({ candidate }) => { this.webRTCSuperlog && console.log.apply(null, log(`onicecandidate`, candidate)); this.sendPeerMessage(this.remotePeerID, { type: "rtc_candidate", candidate: candidate }); }; this.rtcPeer.onnegotiationneeded = async (event) => { this.webRTCSuperlog && console.log.apply(null, log("on negotiation needed fired")); if (!this.rtcPeer) { throw new Error(); } try { this.makingOffer = true; await this.rtcPeer.setLocalDescription(); if (!this.rtcPeer.localDescription) { return; } this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription }); } catch (err) { console.error(err); } finally { this.makingOffer = false; } }; return connectionPromise; } async onWebsocketMessage(message) { if (message.type == "rtc_connect") { try { this.rtcPeer?.setRemoteDescription(message.description); } catch (e) { console.log(e); } } // /* // let ignoreOffer = false; // let isSettingRemoteAnswerPending = false; // signaler.onmessage = async ({ data: { description, candidate } }) => { if (!this.rtcPeer) { throw new Error("Unable to instantiate RTCPeerConnection, exiting."); } let description = null; if (message.type == "rtc_description") { description = message.description; } let candidate = null; if (message.type == "rtc_candidate") { candidate = message.candidate; } try { if (description) { const readyForOffer = !this.makingOffer && (this.rtcPeer.signalingState === "stable" || this.isSettingRemoteAnswerPending); const offerCollision = description.type === "offer" && !readyForOffer; this.ignoreOffer = !this.polite && offerCollision; if (this.ignoreOffer) { console.warn(">>>>>>>>>>>>>>>>>IGNORING OFFER"); return; } this.isSettingRemoteAnswerPending = description.type == "answer"; try { await this.rtcPeer.setRemoteDescription(description); } catch (e) { console.log("PeerConnection:setRemoteDescription:failed:", e, description); } this.isSettingRemoteAnswerPending = false; if (description.type === "offer") { await this.rtcPeer.setLocalDescription(); this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription }); } } else if (candidate) { try { await this.rtcPeer.addIceCandidate(candidate); } catch (e) { if (!this.ignoreOffer) { console.log("PeerConnection:addIceCandidate:failed:", e, candidate); throw e; } } } } catch (err) { console.error(err); } // }; // */ } disconnect() { this.rtcPeer?.close(); this.rtcPeer = null; } async send(message) { if (!this.dataChannel) { throw new Error("Send called but datachannel is null"); } // this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: bufferedAmount ${this.dataChannel.bufferedAmount}`)); while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) { this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: send buffer full, waiting 1 second`)); await new Promise((resolve, reject) => { setTimeout(() => resolve(), 1000); }); } let messageJSON = JSON.stringify(message); this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); if (messageJSON.length > this.chunkSize) { this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length)); this.sendLongMessage(messageJSON); return; } try { this.dataChannel?.send(messageJSON); } catch (e) { console.log.apply(null, log(e)); } // this.onMessage(messageJSON); } // Get a polyfill for browsers that don't have this API async hashMessage(message) { let msgUint8 = new TextEncoder().encode(message); const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8); const hashArray = Array.from(new Uint8Array(hashBuffer)); const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(''); return hashHex; } async sendLongMessage(message) { // message = JSON.parse(message); let chunkSize = this.chunkSize / 2; // let chunkSize = 1024; let chunks = Math.ceil(message.length / chunkSize); let messageID = generateID(); let hash = await this.hashMessage(message); for (let i = 0; i < chunks; i++) { let offset = i * chunkSize; let chunk = message?.substring(offset, offset + chunkSize); // this.send(message?.substring(offset, offset + chunkSize-1)); // console.log("[chunk]", chunk); let chunkHash = await this.hashMessage(chunk); this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`)); let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash }; await this.send(netMessage); } } call(functionName, args) { let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer // Think about a timeout here to auto reject it after a while. let promise = new Promise((resolve, reject) => { this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); setTimeout(() => reject(`function:${functionName}[${transactionID}] failed to resolve after 10 seconds.`), 10000); }); let message = { type: "rpc_call", transaction_id: transactionID, function_name: functionName, args: args, }; this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); this.send(message); return promise; } async onMessage(messageJSON) { let message = {}; try { message = JSON.parse(messageJSON); } catch (e) { console.log.apply(null, log("PeerConnection.onMessage:", e)); } this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); let type = message.type; if (type === "rpc_response") { this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); let pendingRPC = this.pendingRPCs.get(message.transaction_id); if (!pendingRPC) { throw new Error(); } pendingRPC.resolve(message.response); this.pendingRPCs.delete(message.transaction_id); } if (type === "rpc_call") { this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); let response = await this.peerManager.callFromRemote(message.function_name, message.args); this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; this.send(responseMessage); } if (type === "initial_peers") { for (let peerID of message.peers) { console.log.apply(null, log("Connecting to initial peer ", peerID)); this.peerManager.connectToPeer(peerID); } } if (type === "chunk") { let messageID = message.message_id; if (!this.longMessages.has(messageID)) { this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash }); } let longMessage = this.longMessages.get(messageID); if (!longMessage) { return; } let chunkHash = await this.hashMessage(message.chunk_hash); longMessage.messageChunks.push(message.chunk); this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`)); if (message.chunk_index === longMessage.totalChunks - 1) { let completeMessage = longMessage.messageChunks.join(''); let hash = await this.hashMessage(completeMessage); this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`)); if (hash !== longMessage.hash) { throw new Error("[chunk] long message hashes don't match."); } this.onMessage(completeMessage); this.longMessages.delete(messageID); } } // this.peerManger.onMessage(this.remotePeerID, message); } } PeerConnection.config = { iceServers: [ { urls: "stun:ddln.app" }, // { urls: "turn:ddln.app", username: "a", credential: "b" }, { urls: "stun:stun.l.google.com" }, // keeping this for now as my STUN server is not returning ipv6 { urls: "stun:stun1.l.google.com" }, { urls: "stun:stun2.l.google.com" }, { urls: "stun:stun3.l.google.com" }, { urls: "stun:stun4.l.google.com" }, ], }; //# sourceMappingURL=PeerManager.js.map