Files
dandelion/static/PeerManager.js
2025-06-04 02:23:42 -07:00

702 lines
32 KiB
JavaScript

// connect to WS server, send info, connecto to bootstrap peer
// once connected to bootstrap peer,
// Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync.
// how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP.
import { generateID } from "IDUtils";
import { log, logID } from "log";
export var PeerEventTypes;
(function (PeerEventTypes) {
PeerEventTypes[PeerEventTypes["PEER_CONNECTED"] = 0] = "PEER_CONNECTED";
PeerEventTypes[PeerEventTypes["PEER_DISCONNECTED"] = 1] = "PEER_DISCONNECTED";
})(PeerEventTypes || (PeerEventTypes = {}));
export class PeerManager {
hashIdToIndices(id) {
let indices = [];
for (let char of id) {
if (char !== '0' && char !== '-') {
indices.push(parseInt(char, 16));
if (indices.length == 2) {
break;
}
}
}
return [indices[0], indices[1]];
}
funkyName(id, listOne, listTwo) {
let [one, two] = this.hashIdToIndices(id);
let first = listOne[one % listOne.length];
let second = listTwo[two % listTwo.length];
return { first, second };
}
getPeername(peerID) {
let { first: adjective, second: snake } = this.funkyName(peerID, this.adjectives, this.snakes);
let peername = `${adjective}_${snake}`;
return peername;
}
websocketSend(message) {
if (!this.websocket) {
throw new Error();
}
let messageJSON = "";
try {
messageJSON = JSON.stringify(message);
}
catch (e) {
log(e);
return;
}
this.messageSuperlog && console.log.apply(null, log("<-signaler:", message));
this.websocket.send(messageJSON);
}
onWebsocketMessage(event) {
let messageJSON = event.data;
let message = null;
try {
message = JSON.parse(messageJSON);
}
catch (e) {
log(e);
throw new Error();
}
this.messageSuperlog && console.log.apply(null, log("->signaler:", message));
if (message.type === "hello2") {
if (!this.isBootstrapPeer) {
this.bootstrapPeerID = message.bootstrapPeers[0];
}
this.onHello2Received(this.bootstrapPeerID);
}
if (message.type === "peer_message") {
let peerConnection = this.peers.get(message.from);
if (message.message.type === "rtc_description") {
// let existingConnection = this.peers.get(message.from);
// // We're already connected, so delete the existing connection and make a new one.
if (peerConnection?.rtcPeer?.connectionState === "connected") {
console.log.apply(null, log("Connecting peer is already connected. Deleting existing peer connection and reconnecting."));
peerConnection.disconnect();
this.peers.delete(message.from);
peerConnection = undefined;
}
if (!peerConnection) {
let remotePeerID = message.from;
let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
if (this.isBootstrapPeer) {
newPeer.setPolite(false);
}
peerConnection = newPeer;
this.peers.set(newPeer.remotePeerID, newPeer);
this.onConnectRequest(newPeer);
}
}
if (!peerConnection) {
console.log.apply(null, log("Can't find peer for peer message:", message));
return;
}
peerConnection.onWebsocketMessage(message.message);
}
}
async onConnectRequest(newPeer) {
// let remotePeerID = message.from;
// let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
// if (this.isBootstrapPeer) {
// newPeer.setPolite(false);
// }
await newPeer.connect();
this.onPeerConnected(newPeer.remotePeerID);
return newPeer;
}
async onHello2Received(bootstrapPeerID) {
if (this.isBootstrapPeer) {
this.connectPromiseCallbacks?.resolve();
return;
}
if (!bootstrapPeerID) {
console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds..."));
// let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() };
// setTimeout(callSendHello2OnTimeout, 5_000);
return;
}
this.bootstrapPeerConnection = await this.connectToPeer(bootstrapPeerID);
this.connectPromiseCallbacks?.resolve();
}
sendHello2() {
this.websocketSend({
type: "hello2",
user_id: this.userID,
// user_name: app.username,
peer_id: this.peerID,
session_id: this.sessionID,
// peer_name: app.peername,
is_bootstrap_peer: this.isBootstrapPeer,
// peer_description: this.rtcPeerDescription
});
}
websocketSendPeerMessage(remotePeerID, peerMessage) {
this.websocketSend({
type: "peer_message",
from: this.peerID,
to: remotePeerID,
from_username: "blah user",
from_peername: "blah peer",
message: peerMessage
});
// let responseMessage = { type: "peer_message",
// from: app.peerID,
// to: data.from,
// from_username: app.username,
// from_peername: app.peername,
// message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } }
}
constructor(userID, peerID, isBootstrapPeer) {
// private signaler: Signaler;
this.searchQueryFunctions = new Map();
this.RPC_remote = new Map();
this.rpc = {};
this.isBootstrapPeer = false;
this.bootstrapPeerConnection = null;
this.sessionID = generateID();
this.websocket = null;
this.bootstrapPeerID = null;
this.connectPromiseCallbacks = null;
this.connectPromise = null;
this.pingPeers = [];
this.watchdogPeriodSeconds = 10;
this.eventListeners = new Map();
this.reconnectPeriod = 10;
this.messageSuperlog = false;
this.watchdogInterval = null;
this.reconnectTimer = null;
this.peerStateSuperlog = true;
// async watchdog() {
// // Check that we're connected to at least N peers. If not, reconnect to the bootstrap server.
// if (this.peers.size === 0) {
// await this.sendHello2();
// }
// }
this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal'];
this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy'];
this.snakes = ['mamba', 'cobra', 'python', 'viper', 'krait', 'sidewinder', 'constrictor', 'boa', 'asp', 'anaconda', 'krait'];
this.isBootstrapPeer = isBootstrapPeer;
this.peers = new Map();
this.routingTable = new Map();
this.userID = userID;
this.peerID = peerID;
}
disconnect() {
this.websocket?.close();
for (let peer of this.peers.values()) {
peer.disconnect();
}
}
connectWebSocket() {
try {
let hostname = globalThis?.location?.hostname ?? 'ddln.app';
let port = globalThis?.location?.port ?? '443';
let wsURL = `wss://${hostname}:${port}/ws`;
console.log.apply(null, log(`Attempting to connect websocket to URL: ${wsURL}`));
this.websocket = new WebSocket(wsURL);
// this.websocket.onclose = (e: CloseEvent) => {
// let closedUnexpectedly = !e.wasClean;
// if (closedUnexpectedly) {
// console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`));
// // let alreadyReconnecting = this.reconnectTimer !== null;
// // if (!alreadyReconnecting) {
// this.reconnectTimer = globalThis.setTimeout(() => {
// console.log.apply(null, log(`Reconnecting web socket`));
// this.reconnectTimer = null;
// this.connectWebSocket();
// }, this.reconnectPeriod * 1000)
// };
// }
// }
}
catch (error) {
throw new Error(error.message);
}
this.websocket.onopen = async (event) => {
console.log.apply(null, log("peermanager:ws:onopen"));
this.sendHello2();
};
this.websocket.onmessage = this.onWebsocketMessage.bind(this);
}
connect() {
// setInterval(this.watchdog.bind(this), this.watchdogPeriodSeconds * 1000);
// Side effects :(
if (!this.watchdogInterval) {
this.watchdogInterval = setInterval(() => {
let numActive = 0;
for (let [id, peer] of this.peers) {
if ( /*id === this.bootstrapPeerID ||*/peer.rtcPeer?.connectionState === "new" ||
peer.rtcPeer?.connectionState === "connecting") {
continue;
}
numActive++;
}
if (!this.isBootstrapPeer && numActive === 0) {
console.log.apply(null, log(`No peers connected, will attempt to reconnect in ${this.reconnectPeriod} seconds...`));
// Websocket reconnect
if (this.websocket?.readyState === WebSocket.OPEN) {
this.sendHello2();
}
if (this.websocket?.readyState === WebSocket.CLOSED) {
this.connectWebSocket();
}
}
let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n";
for (let [peerID, peer] of this.peers) {
output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${(peerID === this.bootstrapPeerID) ? "[Bootstrap]" : ""}` + "\n";
}
output += `numActivePeers: ${numActive}` + "\n";
console.log.apply(null, log(output));
}, this.reconnectPeriod * 1000);
}
let connectPromise = this.connectPromise;
if (!connectPromise) {
connectPromise = new Promise((resolve, reject) => {
this.connectPromiseCallbacks = { resolve, reject };
});
this.connectPromise = connectPromise;
}
this.connectWebSocket();
return connectPromise;
// this.signaler = new Signaler(userID, peerID, isBootstrapPeer, this.onConnected.bind(this));
// Testing
// let dummyPeer = new PeerConnection(this, "dummy_peer", this.websocketSendPeerMessage.bind(this));
// this.peers.set("dummy_peer", dummyPeer);
}
async connectToPeer(remotePeerID) {
// Connect to the peer that has the peer id remotePeerID.
// TODO how do multiple windows / tabs from the same peer and user work?
// Need to decide if they should all get a unique connection. A peer should only be requesting and writing
// Data once though, so it probably need to be solved on the client side as the data is shared obv
// Maybe use BroadcastChannel to proxy all calls to peermanager? That will probably really complicate things.
// What if we just user session+peerID for the connections? Then we might have two windows making requests
// For IDs etc, it would probably be best to proxy everything.
// Maybe once we put this logic in a web worker, we'll need an interface to it that works over postMessage
// anyway, and at that point, we could just use that same interface over a broadcastChannel
// let's keep it simple for now and ignore the problem :)
let peerConnection = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
this.peers.set(remotePeerID, peerConnection);
await peerConnection.connect();
this.onPeerConnected(remotePeerID);
return peerConnection;
}
onPeerConnected(peerID) {
this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: Successfully connected to peer ${peerID}`));
this.dispatchEvent(PeerEventTypes.PEER_CONNECTED, { peerID: peerID });
}
dispatchEvent(event, parameters) {
let listeners = this.eventListeners.get(event);
if (!listeners) {
return;
}
for (let listener of listeners) {
listener(parameters);
}
}
addEventListener(eventName, func) {
let listeners = this.eventListeners.get(eventName);
if (!listeners) {
this.eventListeners.set(eventName, [func]);
}
}
onPeerDisconnected(peerID) {
let deleted = this.peers.delete(peerID);
if (!deleted) {
throw new Error(`Can't find peer that disconnected ${peerID}`);
}
// TODO: What do we do if we lose connection to the bootstrap peer?
// If we have other connections, it probably doesn't matter.
// Eventually we want the bootstrap peer to be no different than any other peer anyway.
// We should disconnect from the websocket once we connect to our intial peers.
// If we have no peer connections, try to connect. If connection fails, start a timer to reconnect.
if (peerID === this.bootstrapPeerID) {
this.bootstrapPeerID = null;
this.bootstrapPeerConnection = null;
}
this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`));
this.dispatchEvent(PeerEventTypes.PEER_DISCONNECTED, { peerID: peerID });
}
async disconnectFromPeer(remotePeerID) {
let peer = this.peers.get(remotePeerID);
if (!peer) {
console.log.apply(null, log(`PeerManager.disconnect: couldn't find peer ${remotePeerID}`));
return;
}
console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`));
await peer.disconnect();
this.onPeerDisconnected(remotePeerID);
}
async call(peerID, functionName, args) {
let peer = this.peers.get(peerID);
if (!peer) {
console.log.apply(null, log(`Can't find peer ${peerID}`));
return;
}
let returnValues = await peer.call(functionName, args);
return returnValues;
}
async callFromRemote(functionName, args) {
let func = this.RPC_remote.get(functionName);
if (!func) {
throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`);
}
let returnValues = await func.apply(null, args);
return returnValues;
}
registerRPC(functionName, func) {
this.rpc[functionName] = (peerID, ...args) => {
return this.call(peerID, functionName, args);
};
this.RPC_remote.set(functionName, func);
}
registerSearchQuery(searchType, queryFunction) {
this.searchQueryFunctions.set(searchType, queryFunction);
}
async search(type, message) {
let promises = [];
for (let peer of this.peers.values()) {
promises.push(peer.call(type, message));
}
return await Promise.allSettled(promises);
}
onMessage(remotePeerID, message) {
console.log.apply(null, log(remotePeerID, message));
}
}
class PeerConnection {
async RPCHandler(message) {
}
constructor(peerManager, remotePeerID, sendPeerMessage) {
this.dataChannel = null;
this.messageHandlers = new Map();
this.makingOffer = false;
this.ignoreOffer = false;
this.isSettingRemoteAnswerPending = false;
this.polite = true;
this.webRTCSuperlog = true;
this.dataChannelSuperlog = false;
this.chunkSize = (16 * 1024) - 100;
this.messageSuperlog = false;
this.rpcSuperlog = false;
this.pendingRPCs = new Map();
this.connectionPromise = null;
// private makingOffer:boolean = false;
// private ignoreOffer:boolean = false;
this.rtcPeer = null;
// longMessageQueue: string[] = [];
this.longMessages = new Map();
this.chunkSuperlog = false;
this.sendPeerMessage = sendPeerMessage;
this.peerManager = peerManager;
this.remotePeerID = remotePeerID;
// this.signaler = signaler;
// this.signaler.route(remotePeerID, this);
}
setPolite(polite) {
this.polite = polite;
}
setupDataChannel() {
if (!this.dataChannel) {
throw new Error();
}
this.dataChannel.onopen = (e) => {
if (!this.dataChannel) {
throw new Error();
}
this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID));
this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID });
// this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`);
console.log.apply(null, log([...this.peerManager.peers.keys()]));
if (this.peerManager.isBootstrapPeer) {
this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) });
// this.dataChannel.send(JSON.stringify());
}
this.connectionPromise?.resolve(this.remotePeerID);
//globalThis.setTimeout(()=>this.connectionPromise?.resolve(this.remotePeerID), 5000);
};
this.dataChannel.onmessage = (e) => {
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]: `, e.data));
this.onMessage(e.data);
};
this.dataChannel.onclose = (e) => {
this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`));
this.peerManager.disconnectFromPeer(this.remotePeerID);
};
this.dataChannel.onerror = (e) => {
this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error));
};
}
async connect() {
let connectionPromise = new Promise((resolve, reject) => { this.connectionPromise = { resolve, reject }; });
this.rtcPeer = new RTCPeerConnection(PeerConnection.config);
this.rtcPeer.onconnectionstatechange = async (e) => {
this.webRTCSuperlog && console.log.apply(null, log(`rtcPeer: onconnectionstatechange: ${this.rtcPeer?.connectionState}: ${this.remotePeerID}`));
if (!this.rtcPeer) {
throw new Error("onconnectionstatechange");
}
// When the connection is closed, tell the peer manager that this connection has gone away
if (this.rtcPeer.connectionState === "failed") {
this.peerManager.onPeerDisconnected(this.remotePeerID);
// globalThis.setTimeout(async () => { await this.peerManager.connectToPeer(this.remotePeerID) }, 10_000);
}
if (this.rtcPeer.connectionState === "connected") {
// Check the selected candidates
const stats = await this.rtcPeer.getStats();
let localIP = '';
let remoteIP = '';
for (const report of stats.values()) {
if (report.type === 'transport') {
let candidatePair = stats.get(report.selectedCandidatePairId);
let localCandidate = stats.get(candidatePair.localCandidateId);
let remoteCandidate = stats.get(candidatePair.remoteCandidateId);
this.webRTCSuperlog && console.log.apply(null, log("Connected candidates\n", localCandidate, remoteCandidate));
}
}
}
};
this.rtcPeer.ondatachannel = (e) => {
let dataChannel = e.channel;
this.dataChannel = dataChannel;
this.setupDataChannel();
};
if (this.polite) {
this.dataChannel = this.rtcPeer.createDataChannel("ddln_main");
this.setupDataChannel();
}
if (this.rtcPeer === null) {
return;
}
// this.rtcPeer.onicecandidate = ({ candidate }) => this.signaler.send(JSON.stringify({ candidate }));
// this.rtcPeer.onicecandidate = ({ candidate }) => console.log.apply(null, log(candidate);
this.rtcPeer.onicegatheringstatechange = (event) => {
this.webRTCSuperlog && console.log.apply(null, log("onicegatheringstatechange:", this.rtcPeer?.iceGatheringState));
};
this.rtcPeer.oniceconnectionstatechange = (event) => {
this.webRTCSuperlog && console.log.apply(null, log("oniceconnectionstatechange:", this.rtcPeer?.iceConnectionState));
};
this.rtcPeer.onicecandidateerror = (event) => {
this.webRTCSuperlog && console.log.apply(null, log(`onicecandidateerror: ${event.errorCode} ${event.errorText} ${event.address} ${event.url}`));
};
this.rtcPeer.onicecandidate = ({ candidate }) => {
this.webRTCSuperlog && console.log.apply(null, log(`onicecandidate`, candidate));
this.sendPeerMessage(this.remotePeerID, { type: "rtc_candidate", candidate: candidate });
};
this.rtcPeer.onnegotiationneeded = async (event) => {
this.webRTCSuperlog && console.log.apply(null, log("on negotiation needed fired"));
if (!this.rtcPeer) {
throw new Error();
}
try {
this.makingOffer = true;
await this.rtcPeer.setLocalDescription();
if (!this.rtcPeer.localDescription) {
return;
}
this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription });
}
catch (err) {
console.error(err);
}
finally {
this.makingOffer = false;
}
};
return connectionPromise;
}
async onWebsocketMessage(message) {
if (message.type == "rtc_connect") {
this.rtcPeer?.setRemoteDescription(message.description);
}
// /*
// let ignoreOffer = false;
// let isSettingRemoteAnswerPending = false;
// signaler.onmessage = async ({ data: { description, candidate } }) => {
if (!this.rtcPeer) {
throw new Error();
}
let description = null;
if (message.type == "rtc_description") {
description = message.description;
}
let candidate = null;
if (message.type == "rtc_candidate") {
candidate = message.candidate;
}
try {
if (description) {
const readyForOffer = !this.makingOffer &&
(this.rtcPeer.signalingState === "stable" || this.isSettingRemoteAnswerPending);
const offerCollision = description.type === "offer" && !readyForOffer;
this.ignoreOffer = !this.polite && offerCollision;
if (this.ignoreOffer) {
console.warn(">>>>>>>>>>>>>>>>>IGNORING OFFER");
return;
}
this.isSettingRemoteAnswerPending = description.type == "answer";
await this.rtcPeer.setRemoteDescription(description);
this.isSettingRemoteAnswerPending = false;
if (description.type === "offer") {
await this.rtcPeer.setLocalDescription();
this.sendPeerMessage(this.remotePeerID, { type: "rtc_description", description: this.rtcPeer.localDescription });
}
}
else if (candidate) {
try {
await this.rtcPeer.addIceCandidate(candidate);
}
catch (err) {
if (!this.ignoreOffer) {
throw err;
}
}
}
}
catch (err) {
console.error(err);
}
// };
// */
}
disconnect() {
this.rtcPeer?.close();
this.rtcPeer = null;
}
async send(message) {
if (!this.dataChannel) {
throw new Error("Send called but datachannel is null");
}
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
await new Promise((resolve, reject) => {
setTimeout(() => resolve(), 1000);
});
}
let messageJSON = JSON.stringify(message);
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
if (messageJSON.length > this.chunkSize) {
this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length));
this.sendLongMessage(messageJSON);
return;
}
try {
this.dataChannel?.send(messageJSON);
}
catch (e) {
console.log.apply(null, log(e));
}
// this.onMessage(messageJSON);
}
// Get a polyfill for browsers that don't have this API
async hashMessage(message) {
let msgUint8 = new TextEncoder().encode(message);
const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8);
const hashArray = Array.from(new Uint8Array(hashBuffer));
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join('');
return hashHex;
}
async sendLongMessage(message) {
// message = JSON.parse(message);
let chunkSize = this.chunkSize / 2;
// let chunkSize = 1024;
let chunks = Math.ceil(message.length / chunkSize);
let messageID = generateID();
let hash = await this.hashMessage(message);
for (let i = 0; i < chunks; i++) {
let offset = i * chunkSize;
let chunk = message?.substring(offset, offset + chunkSize);
// this.send(message?.substring(offset, offset + chunkSize-1));
// console.log("[chunk]", chunk);
let chunkHash = await this.hashMessage(chunk);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`));
let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash };
await this.send(netMessage);
}
}
call(functionName, args) {
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
// Think about a timeout here to auto reject it after a while.
let promise = new Promise((resolve, reject) => {
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
// setTimeout(() => reject("bad"), 1000);
});
let message = {
type: "rpc_call",
transaction_id: transactionID,
function_name: functionName,
args: args,
};
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
this.send(message);
return promise;
}
async onMessage(messageJSON) {
let message = {};
try {
message = JSON.parse(messageJSON);
}
catch (e) {
console.log.apply(null, log("PeerConnection.onMessage:", e));
}
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
let type = message.type;
if (type === "rpc_response") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
if (!pendingRPC) {
throw new Error();
}
pendingRPC.resolve(message.response);
}
if (type === "rpc_call") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
if (response === undefined) {
return;
}
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
this.send(responseMessage);
}
if (type === "initial_peers") {
for (let peerID of message.peers) {
console.log.apply(null, log("Connecting to initial peer ", peerID));
this.peerManager.connectToPeer(peerID);
}
}
if (type === "chunk") {
let messageID = message.message_id;
if (!this.longMessages.has(messageID)) {
this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash });
}
let longMessage = this.longMessages.get(messageID);
if (!longMessage) {
return;
}
let chunkHash = await this.hashMessage(message.chunk_hash);
longMessage.messageChunks.push(message.chunk);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`));
if (message.chunk_index === longMessage.totalChunks - 1) {
let completeMessage = longMessage.messageChunks.join('');
let hash = await this.hashMessage(completeMessage);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`));
if (hash !== longMessage.hash) {
throw new Error("[chunk] long message hashes don't match.");
}
this.onMessage(completeMessage);
this.longMessages.delete(messageID);
}
}
// this.peerManger.onMessage(this.remotePeerID, message);
}
}
PeerConnection.config = {
iceServers: [
{ urls: "stun:ddln.app" },
// { urls: "turn:ddln.app", username: "a", credential: "b" },
{ urls: "stun:stun.l.google.com" }, // keeping this for now as my STUN server is not returning ipv6
{ urls: "stun:stun1.l.google.com" },
{ urls: "stun:stun2.l.google.com" },
{ urls: "stun:stun3.l.google.com" },
{ urls: "stun:stun4.l.google.com" },
],
};