Merge branch 'bobbyd-multiple-bootstrap-peers'

This commit is contained in:
2025-06-04 02:24:28 -07:00
2 changed files with 77 additions and 37 deletions

View File

@@ -27,13 +27,13 @@ export class PeerManager {
RPC_remote: Map<string, Function> = new Map(); RPC_remote: Map<string, Function> = new Map();
rpc: { [key: string]: Function } = {}; rpc: { [key: string]: Function } = {};
isBootstrapPeer: boolean = false; isBootstrapPeer: boolean = false;
bootstrapPeerConnection: PeerConnection | null = null; bootstrapPeerConnections: Map<string, PeerConnection> | null = null;
sessionID = generateID(); sessionID = generateID();
userID: string; userID: string;
peerID: PeerID; peerID: PeerID;
websocket: WebSocket | null = null; websocket: WebSocket | null = null;
bootstrapPeerID: string | null = null; bootstrapPeerIDs: Set<string> | null = null;
connectPromiseCallbacks: { resolve: Function, reject: Function } | null = null; connectPromiseCallbacks: { resolve: Function, reject: Function } | null = null;
connectPromise: Promise<null> | null = null; connectPromise: Promise<null> | null = null;
@@ -42,7 +42,7 @@ export class PeerManager {
eventListeners: Map<PeerEventTypes, Function[]> = new Map(); eventListeners: Map<PeerEventTypes, Function[]> = new Map();
reconnectPeriod: number = 10; reconnectPeriod: number = 10;
messageSuperlog = false; messageSuperlog = false;
watchdogInterval: ReturnType<typeof setTimeout> |null = null; watchdogInterval: ReturnType<typeof setTimeout> | null = null;
reconnectTimer: number | null = null; reconnectTimer: number | null = null;
peerStateSuperlog: boolean = true; peerStateSuperlog: boolean = true;
@@ -118,11 +118,11 @@ export class PeerManager {
if (message.type === "hello2") { if (message.type === "hello2") {
if (!this.isBootstrapPeer) { if (!this.isBootstrapPeer && Array.isArray(message?.bootstrapPeers)) {
this.bootstrapPeerID = message.bootstrapPeers[0]; this.bootstrapPeerIDs = new Set(message.bootstrapPeers);
} }
this.onHello2Received(this.bootstrapPeerID as string); this.onHello2Received(this.bootstrapPeerIDs);
} }
if (message.type === "peer_message") { if (message.type === "peer_message") {
@@ -175,21 +175,41 @@ export class PeerManager {
return newPeer; return newPeer;
} }
async onHello2Received(bootstrapPeerID: string) { async onHello2Received(bootstrapPeerIDs: Set<string> | null) {
if (this.isBootstrapPeer) { if (this.isBootstrapPeer) {
this.connectPromiseCallbacks?.resolve(); this.connectPromiseCallbacks?.resolve();
return; return;
} }
if (!bootstrapPeerID) { if (!bootstrapPeerIDs) {
console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds...")); console.log.apply(null, log("Didn't get any bootstrap peer, waiting 10 seconds..."));
// let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() }; // let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() };
// setTimeout(callSendHello2OnTimeout, 5_000); // setTimeout(callSendHello2OnTimeout, 5_000);
return; return;
} }
this.bootstrapPeerConnection = await this.connectToPeer(bootstrapPeerID); let bootstrapPeerConnectionPromises = [];
for (let peerID of bootstrapPeerIDs.keys()) {
// this.bootstrapPeerConnection = await this.connectToPeer(peerID);
let bootstrapPeerConnectionPromise = new Promise( async (resolve, reject)=>{
let peerConnection = await this.connectToPeer(peerID);
if (!peerConnection) {
reject(peerConnection);
}
this.bootstrapPeerConnections?.set(peerID, peerConnection);
resolve(peerConnection);
})
bootstrapPeerConnectionPromises.push(bootstrapPeerConnectionPromise);
}
await Promise.race(bootstrapPeerConnectionPromises);
this.connectPromiseCallbacks?.resolve(); this.connectPromiseCallbacks?.resolve();
} }
@@ -321,7 +341,7 @@ export class PeerManager {
let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n"; let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n";
for (let [peerID, peer] of this.peers) { for (let [peerID, peer] of this.peers) {
output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${(peerID === this.bootstrapPeerID) ? "[Bootstrap]" : ""}` + "\n"; output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${this.bootstrapPeerIDs?.has(peerID) ? "[Bootstrap]" : ""}` + "\n";
} }
output += `numActivePeers: ${numActive}` + "\n"; output += `numActivePeers: ${numActive}` + "\n";
@@ -409,9 +429,9 @@ export class PeerManager {
// We should disconnect from the websocket once we connect to our intial peers. // We should disconnect from the websocket once we connect to our intial peers.
// If we have no peer connections, try to connect. If connection fails, start a timer to reconnect. // If we have no peer connections, try to connect. If connection fails, start a timer to reconnect.
if (peerID === this.bootstrapPeerID) { if (this.bootstrapPeerIDs?.has(peerID)) {
this.bootstrapPeerID = null; this.bootstrapPeerIDs.delete(peerID);
this.bootstrapPeerConnection = null; this.bootstrapPeerConnections?.delete(peerID);
} }
this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`)); this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`));
@@ -501,10 +521,11 @@ class PeerConnection {
private ignoreOffer: boolean = false; private ignoreOffer: boolean = false;
private isSettingRemoteAnswerPending: boolean = false; private isSettingRemoteAnswerPending: boolean = false;
private polite = true; private polite = true;
private webRTCSuperlog = true; private webRTCSuperlog = false;
private dataChannelSuperlog = false; private dataChannelSuperlog = false;
private chunkSize = (16 * 1024) - 100; private chunkSize = (16 * 1024) - 100;
messageSuperlog: boolean = false; messageSuperlog: boolean = false;
sendQueueSuperLog: boolean = false;
rpcSuperlog: boolean = false; rpcSuperlog: boolean = false;
pendingRPCs: Map< pendingRPCs: Map<
string, string,
@@ -565,7 +586,7 @@ class PeerConnection {
this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID }); this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID });
// this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`); // this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`);
console.log.apply(null, log([...this.peerManager.peers.keys()])); // console.log.apply(null, log([...this.peerManager.peers.keys()]));
if (this.peerManager.isBootstrapPeer) { if (this.peerManager.isBootstrapPeer) {
this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) }); this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) });
@@ -656,7 +677,7 @@ class PeerConnection {
} }
this.rtcPeer.oniceconnectionstatechange = (event:Event) => { this.rtcPeer.oniceconnectionstatechange = (event: Event) => {
this.webRTCSuperlog && console.log.apply(null, log("oniceconnectionstatechange:", this.rtcPeer?.iceConnectionState)); this.webRTCSuperlog && console.log.apply(null, log("oniceconnectionstatechange:", this.rtcPeer?.iceConnectionState));
} }
@@ -776,8 +797,11 @@ class PeerConnection {
throw new Error("Send called but datachannel is null"); throw new Error("Send called but datachannel is null");
} }
// this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: bufferedAmount ${this.dataChannel.bufferedAmount}`));
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) { while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: send buffer full, waiting 1 second`));
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
setTimeout(() => resolve(), 1000); setTimeout(() => resolve(), 1000);
}) })

View File

@@ -60,10 +60,10 @@ export class PeerManager {
} }
this.messageSuperlog && console.log.apply(null, log("->signaler:", message)); this.messageSuperlog && console.log.apply(null, log("->signaler:", message));
if (message.type === "hello2") { if (message.type === "hello2") {
if (!this.isBootstrapPeer) { if (!this.isBootstrapPeer && Array.isArray(message?.bootstrapPeers)) {
this.bootstrapPeerID = message.bootstrapPeers[0]; this.bootstrapPeerIDs = new Set(message.bootstrapPeers);
} }
this.onHello2Received(this.bootstrapPeerID); this.onHello2Received(this.bootstrapPeerIDs);
} }
if (message.type === "peer_message") { if (message.type === "peer_message") {
let peerConnection = this.peers.get(message.from); let peerConnection = this.peers.get(message.from);
@@ -104,18 +104,31 @@ export class PeerManager {
this.onPeerConnected(newPeer.remotePeerID); this.onPeerConnected(newPeer.remotePeerID);
return newPeer; return newPeer;
} }
async onHello2Received(bootstrapPeerID) { async onHello2Received(bootstrapPeerIDs) {
if (this.isBootstrapPeer) { if (this.isBootstrapPeer) {
this.connectPromiseCallbacks?.resolve(); this.connectPromiseCallbacks?.resolve();
return; return;
} }
if (!bootstrapPeerID) { if (!bootstrapPeerIDs) {
console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds...")); console.log.apply(null, log("Didn't get any bootstrap peer, waiting 10 seconds..."));
// let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() }; // let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() };
// setTimeout(callSendHello2OnTimeout, 5_000); // setTimeout(callSendHello2OnTimeout, 5_000);
return; return;
} }
this.bootstrapPeerConnection = await this.connectToPeer(bootstrapPeerID); let bootstrapPeerConnectionPromises = [];
for (let peerID of bootstrapPeerIDs.keys()) {
// this.bootstrapPeerConnection = await this.connectToPeer(peerID);
let bootstrapPeerConnectionPromise = new Promise(async (resolve, reject) => {
let peerConnection = await this.connectToPeer(peerID);
if (!peerConnection) {
reject(peerConnection);
}
this.bootstrapPeerConnections?.set(peerID, peerConnection);
resolve(peerConnection);
});
bootstrapPeerConnectionPromises.push(bootstrapPeerConnectionPromise);
}
await Promise.race(bootstrapPeerConnectionPromises);
this.connectPromiseCallbacks?.resolve(); this.connectPromiseCallbacks?.resolve();
} }
sendHello2() { sendHello2() {
@@ -152,10 +165,10 @@ export class PeerManager {
this.RPC_remote = new Map(); this.RPC_remote = new Map();
this.rpc = {}; this.rpc = {};
this.isBootstrapPeer = false; this.isBootstrapPeer = false;
this.bootstrapPeerConnection = null; this.bootstrapPeerConnections = null;
this.sessionID = generateID(); this.sessionID = generateID();
this.websocket = null; this.websocket = null;
this.bootstrapPeerID = null; this.bootstrapPeerIDs = null;
this.connectPromiseCallbacks = null; this.connectPromiseCallbacks = null;
this.connectPromise = null; this.connectPromise = null;
this.pingPeers = []; this.pingPeers = [];
@@ -243,7 +256,7 @@ export class PeerManager {
} }
let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n"; let output = `Current status:` + "\n" + `[${logID(this.peerID)}]${this.getPeername(this.peerID)}[local]` + "\n";
for (let [peerID, peer] of this.peers) { for (let [peerID, peer] of this.peers) {
output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${(peerID === this.bootstrapPeerID) ? "[Bootstrap]" : ""}` + "\n"; output += `[${logID(peerID)}]${peer.rtcPeer?.connectionState}:${this.getPeername(peerID)}${this.bootstrapPeerIDs?.has(peerID) ? "[Bootstrap]" : ""}` + "\n";
} }
output += `numActivePeers: ${numActive}` + "\n"; output += `numActivePeers: ${numActive}` + "\n";
console.log.apply(null, log(output)); console.log.apply(null, log(output));
@@ -309,9 +322,9 @@ export class PeerManager {
// Eventually we want the bootstrap peer to be no different than any other peer anyway. // Eventually we want the bootstrap peer to be no different than any other peer anyway.
// We should disconnect from the websocket once we connect to our intial peers. // We should disconnect from the websocket once we connect to our intial peers.
// If we have no peer connections, try to connect. If connection fails, start a timer to reconnect. // If we have no peer connections, try to connect. If connection fails, start a timer to reconnect.
if (peerID === this.bootstrapPeerID) { if (this.bootstrapPeerIDs?.has(peerID)) {
this.bootstrapPeerID = null; this.bootstrapPeerIDs.delete(peerID);
this.bootstrapPeerConnection = null; this.bootstrapPeerConnections?.delete(peerID);
} }
this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`)); this.peerStateSuperlog && console.log.apply(null, log(`PeerManager: disconnected from peer ${peerID}`));
this.dispatchEvent(PeerEventTypes.PEER_DISCONNECTED, { peerID: peerID }); this.dispatchEvent(PeerEventTypes.PEER_DISCONNECTED, { peerID: peerID });
@@ -373,10 +386,11 @@ class PeerConnection {
this.ignoreOffer = false; this.ignoreOffer = false;
this.isSettingRemoteAnswerPending = false; this.isSettingRemoteAnswerPending = false;
this.polite = true; this.polite = true;
this.webRTCSuperlog = true; this.webRTCSuperlog = false;
this.dataChannelSuperlog = false; this.dataChannelSuperlog = false;
this.chunkSize = (16 * 1024) - 100; this.chunkSize = (16 * 1024) - 100;
this.messageSuperlog = false; this.messageSuperlog = false;
this.sendQueueSuperLog = false;
this.rpcSuperlog = false; this.rpcSuperlog = false;
this.pendingRPCs = new Map(); this.pendingRPCs = new Map();
this.connectionPromise = null; this.connectionPromise = null;
@@ -406,7 +420,7 @@ class PeerConnection {
this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID)); this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID));
this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID }); this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID });
// this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`); // this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`);
console.log.apply(null, log([...this.peerManager.peers.keys()])); // console.log.apply(null, log([...this.peerManager.peers.keys()]));
if (this.peerManager.isBootstrapPeer) { if (this.peerManager.isBootstrapPeer) {
this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) }); this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) });
// this.dataChannel.send(JSON.stringify()); // this.dataChannel.send(JSON.stringify());
@@ -565,7 +579,9 @@ class PeerConnection {
if (!this.dataChannel) { if (!this.dataChannel) {
throw new Error("Send called but datachannel is null"); throw new Error("Send called but datachannel is null");
} }
// this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: bufferedAmount ${this.dataChannel.bufferedAmount}`));
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) { while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
this.sendQueueSuperLog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]: send buffer full, waiting 1 second`));
await new Promise((resolve, reject) => { await new Promise((resolve, reject) => {
setTimeout(() => resolve(), 1000); setTimeout(() => resolve(), 1000);
}); });