working websocket and bootstrap peer reconnection when connection is lost

This commit is contained in:
2025-05-18 15:11:34 -07:00
parent 404a224bf1
commit 15e595cca1
6 changed files with 257 additions and 93 deletions

View File

@@ -139,6 +139,8 @@ function hello2Handler(m:Hello2Message, socket:WebSocket) {
peerSockets.set(m.peer_id, socket); // TODO:MAYBEBUG - what happens with multiple windows each with their own websocket?
socketPeers.set(socket, m.peer_id);
console.log(userPeers.get(m.user_id));
if (!m.is_bootstrap_peer) {
return JSON.stringify({ type: 'hello2', bootstrapPeers: [...bootstrapPeers.values()] });

View File

@@ -33,11 +33,16 @@ export class PeerManager {
websocket: WebSocket | null = null;
bootstrapPeerID: string | null = null;
connectPromise: { resolve: Function, reject: Function } | null = null;
connectPromiseCallbacks: { resolve: Function, reject: Function } | null = null;
connectPromise: Promise<null> | null = null;
pingPeers: RTCPeerConnection[] = [];
watchdogPeriodSeconds: number = 10;
eventListeners: Map<PeerEventTypes, Function[]> = new Map();
reconnectPeriod: number = 10;
messageSuperlog = false;
watchdogInterval: number = 0;
reconnectTimer: number | null = null;
// async watchdog() {
// // Check that we're connected to at least N peers. If not, reconnect to the bootstrap server.
@@ -60,7 +65,7 @@ export class PeerManager {
return;
}
console.log.apply(null, log("<-signaler:", message));
this.messageSuperlog && console.log.apply(null, log("<-signaler:", message));
this.websocket.send(messageJSON);
}
@@ -77,7 +82,7 @@ export class PeerManager {
throw new Error();
}
console.log.apply(null, log("->signaler:", message));
this.messageSuperlog && console.log.apply(null, log("->signaler:", message));
if (message.type === "hello2") {
@@ -105,7 +110,15 @@ export class PeerManager {
}
if (!peerConnection) {
peerConnection = this.onConnectRequest(message);
let remotePeerID = message.from;
let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
if (this.isBootstrapPeer) {
newPeer.setPolite(false);
}
peerConnection = newPeer;
this.peers.set(newPeer.remotePeerID, newPeer);
this.onConnectRequest(newPeer);
}
}
@@ -119,35 +132,36 @@ export class PeerManager {
}
}
onConnectRequest(message: any) {
let remotePeerID = message.from;
let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
if (this.isBootstrapPeer) {
newPeer.setPolite(false);
}
newPeer.connect();
this.peers.set(remotePeerID, newPeer);
async onConnectRequest(newPeer: PeerConnection) {
// let remotePeerID = message.from;
// let newPeer = new PeerConnection(this, remotePeerID, this.websocketSendPeerMessage.bind(this));
// if (this.isBootstrapPeer) {
// newPeer.setPolite(false);
// }
await newPeer.connect();
this.onPeerConnected(newPeer.remotePeerID);
return newPeer;
}
async onHello2Received(bootstrapPeerID: string) {
if (this.isBootstrapPeer) {
this.connectPromise?.resolve();
this.connectPromiseCallbacks?.resolve();
return;
}
if (!bootstrapPeerID) {
console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds..."));
setTimeout(async (e: Event) => { await this.sendHello2() }, 10_000);
// console.log.apply(null, log("Didn't get bootstrap peer, waiting 10 seconds..."));
// let callSendHello2OnTimeout = () => { console.log(this, "jajajajaj");this.sendHello2() };
// setTimeout(callSendHello2OnTimeout, 5_000);
return;
}
this.bootstrapPeerConnection = await this.connectToPeer(bootstrapPeerID);
this.connectPromise?.resolve();
this.connectPromiseCallbacks?.resolve();
}
async sendHello2() {
sendHello2() {
this.websocketSend({
type: "hello2",
user_id: this.userID,
@@ -196,44 +210,33 @@ export class PeerManager {
}
connect() {
// setInterval(this.watchdog.bind(this), this.watchdogPeriodSeconds * 1000);
setInterval(()=>{
if (!this.isBootstrapPeer && this.peers.size === 0) {
console.log.apply(null, log("No peers connected :("));
if (this.websocket?.readyState === WebSocket.OPEN) {
this.sendHello2();
}
}
let output = `local peerID:${logID(this.peerID)}` + "\n";
for (let [peerID, peer] of this.peers) {
output += `${logID(peerID)}: ${peer.rtcPeer?.connectionState}` + "\n";
}
console.log.apply(null, log(output));
}, 5000);
let connectPromise = new Promise((resolve, reject) => {
this.connectPromise = { resolve, reject };
});
connectWebSocket() {
try {
let hostname = globalThis?.location?.hostname ?? 'ddln.app';
let port = globalThis?.location?.port ?? '443';
let wsURL = `wss://${hostname}:${port}/ws`;
console.log(`wsURL: ${wsURL}`);
console.log.apply(null, log(`Attempting to connect websocket to URL: ${wsURL}`));
this.websocket = new WebSocket(wsURL);
// this.websocket.onclose = (e: CloseEvent) => {
// let closedUnexpectedly = !e.wasClean;
// if (closedUnexpectedly) {
// console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`));
// // let alreadyReconnecting = this.reconnectTimer !== null;
// // if (!alreadyReconnecting) {
// this.reconnectTimer = globalThis.setTimeout(() => {
// console.log.apply(null, log(`Reconnecting web socket`));
// this.reconnectTimer = null;
// this.connectWebSocket();
// }, this.reconnectPeriod * 1000)
// };
// }
// }
} catch (error: any) {
throw new Error(error.message);
}
@@ -244,6 +247,52 @@ export class PeerManager {
};
this.websocket.onmessage = this.onWebsocketMessage.bind(this);
}
connect() {
// setInterval(this.watchdog.bind(this), this.watchdogPeriodSeconds * 1000);
// Side effects :(
if (!this.watchdogInterval) {
this.watchdogInterval = setInterval(() => {
if (!this.isBootstrapPeer && this.peers.size === 0) {
console.log.apply(null, log(`No peers connected, will attempt to reconnect in ${this.reconnectPeriod} seconds...`));
// Websocket reconnect
if (this.websocket?.readyState === WebSocket.OPEN) {
this.sendHello2();
}
if (this.websocket?.readyState === WebSocket.CLOSED) {
this.connectWebSocket();
}
}
let output = `local peerID:${logID(this.peerID)}` + "\n";
for (let [peerID, peer] of this.peers) {
output += `${logID(peerID)}: ${peer.rtcPeer?.connectionState}` + "\n";
}
console.log.apply(null, log(output));
}, this.reconnectPeriod * 1000);
}
let connectPromise = this.connectPromise;
if (!connectPromise) {
connectPromise = new Promise((resolve, reject) => {
this.connectPromiseCallbacks = { resolve, reject };
});
this.connectPromise = connectPromise;
}
this.connectWebSocket();
return connectPromise;
@@ -257,7 +306,7 @@ export class PeerManager {
async connectToPeer(remotePeerID: string) {
// Connect to the peer that has the peer id remotePeerID.
// TODO how do multiple windows / tabs from the same peer and user work?
// Need to decide if they shold all get a unique connection. A peer should only be requesting and writing
// Need to decide if they should all get a unique connection. A peer should only be requesting and writing
// Data once though, so it probably need to be solved on the client side as the data is shared obv
// Maybe use BroadcastChannel to proxy all calls to peermanager? That will probably really complicate things.
// What if we just user session+peerID for the connections? Then we might have two windows making requests
@@ -274,6 +323,7 @@ export class PeerManager {
}
onPeerConnected(peerID: PeerID) {
console.log.apply(null, log(`PeerManager: Successfully connected to peer ${peerID}`));
this.dispatchEvent(PeerEventTypes.PEER_CONNECTED, { peerID: peerID });
}
@@ -388,7 +438,7 @@ interface Message {
}
class PeerConnection {
private remotePeerID: string;
remotePeerID: string;
// private signaler: Signaler;
private peerManager: PeerManager;
private dataChannel: RTCDataChannel | null = null;
@@ -398,6 +448,8 @@ class PeerConnection {
private ignoreOffer: boolean = false;
private isSettingRemoteAnswerPending: boolean = false;
private polite = true;
private webRTCSuperlog = false;
private dataChannelSuperlog = false;
// private makingOffer:boolean = false;
// private ignoreOffer:boolean = false;
@@ -407,7 +459,7 @@ class PeerConnection {
static config = {
iceServers: [
{ urls: "stun:ddln.app" },
// { urls: "turn:ddln.app", username: "a", credential: "b" },
{ urls: "turn:ddln.app", username: "a", credential: "b" },
{ urls: "stun:stun.l.google.com" }, // keeping this for now as my STUN server is not return ipv6
// { urls: "stun:stun1.l.google.com" },
// { urls: "stun:stun2.l.google.com" },
@@ -420,7 +472,7 @@ class PeerConnection {
string,
{ resolve: Function; reject: Function; functionName: string }
> = new Map();
messageSuperlog: boolean = true;
messageSuperlog: boolean = false;
connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null;
async RPCHandler(message: any) {
@@ -451,37 +503,40 @@ class PeerConnection {
if (!this.dataChannel) {
throw new Error();
}
console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID));
this.dataChannelSuperlog && console.log.apply(null, log("data channel is open to: ", this.remotePeerID, " from: ", this.peerManager.peerID));
this.send({ type: "hello datachannel", from: this.peerManager.peerID, to: this.remotePeerID });
// this.dataChannel?.send(`{typeHello datachannel from: ${this.peerManager.peerID}`);
console.log.apply(null, log([...this.peerManager.peers.keys()]));
if (this.peerManager.isBootstrapPeer) {
this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) })
this.send({ type: 'initial_peers', from: this.peerManager.peerID, peers: [...this.peerManager.peers.keys()].filter(entry => entry !== this.remotePeerID) });
// this.dataChannel.send(JSON.stringify());
}
this.connectionPromise?.resolve();
this.connectionPromise?.resolve(this.remotePeerID);
//globalThis.setTimeout(()=>this.connectionPromise?.resolve(this.remotePeerID), 5000);
}
this.dataChannel.onmessage = (e: MessageEvent) => {
console.log.apply(null, log("->datachannel: ", e.data))
this.dataChannelSuperlog && console.log.apply(null, log("->datachannel: ", e.data));
this.onMessage(e.data);
}
this.dataChannel.onclose = (e: Event) => {
console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`));
this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`));
this.peerManager.disconnectFromPeer(this.remotePeerID);
}
}
async connect() {
let connectionPromise = new Promise((resolve, reject) => this.connectionPromise = { resolve, reject });
let connectionPromise = new Promise((resolve, reject) => { this.connectionPromise = { resolve, reject } });
this.rtcPeer = new RTCPeerConnection(PeerConnection.config);
this.rtcPeer.onconnectionstatechange = async (e: any) => {
console.log.apply(null, log(`rtcPeer: onconnectionstatechange: ${this.rtcPeer?.connectionState}: ${this.remotePeerID}`));
this.webRTCSuperlog && console.log.apply(null, log(`rtcPeer: onconnectionstatechange: ${this.rtcPeer?.connectionState}: ${this.remotePeerID}`));
if (!this.rtcPeer) {
throw new Error("onconnectionstatechange");
@@ -507,7 +562,7 @@ class PeerConnection {
let localCandidate = stats.get(candidatePair.localCandidateId);
let remoteCandidate = stats.get(candidatePair.remoteCandidateId);
console.log.apply(null, log("Connected candidates\n", localCandidate, remoteCandidate));
this.webRTCSuperlog && console.log.apply(null, log("Connected candidates\n", localCandidate, remoteCandidate));
}
}
}
@@ -536,12 +591,12 @@ class PeerConnection {
this.rtcPeer.onicecandidate = ({ candidate }) => {
console.log.apply(null, log(candidate));
this.webRTCSuperlog && console.log.apply(null, log(candidate));
this.sendPeerMessage(this.remotePeerID, { type: "rtc_candidate", candidate: candidate });
}
this.rtcPeer.onnegotiationneeded = async (event) => {
console.log.apply(null, log("on negotiation needed fired"));
this.webRTCSuperlog && console.log.apply(null, log("on negotiation needed fired"));
if (!this.rtcPeer) {
throw new Error();

35
src/Sync.ts Normal file
View File

@@ -0,0 +1,35 @@
export class Sync {
static async getFollowing(userID: string): Promise<string[]> {
// Rob
if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') {
return [
'b38b623c-c3fa-4351-9cab-50233c99fa4e',
'6d774268-16cd-4e86-8bbe-847a0328893d', // Sean
'05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin
'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO
'bba3ad24-9181-4e22-90c8-c265c80873ea', // Harry
'8f6802be-c3b6-46c1-969c-5f90cbe01479', // Fiona
]
}
// Martin
if (userID === '05a495a0-0dd8-4186-94c3-b8309ba6fc4c') {
return [
'b38b623c-c3fa-4351-9cab-50233c99fa4e',
'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO
]
}
// Fiona
if (userID === '8f6802be-c3b6-46c1-969c-5f90cbe01479') {
return [
'b38b623c-c3fa-4351-9cab-50233c99fa4e', // Rob
'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO
'05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin
]
}
return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :)
}
}

View File

@@ -34,6 +34,7 @@ Restruucture the app around the data. App/WS split is messy. Clean it up.
import { openDatabase, getData, addData, addDataArray, clearData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db";
import { generateID } from "IDUtils";
import { PeerManager, PeerEventTypes } from "PeerManager";
import { Sync } from "Sync";
import { log, logID, renderLog, setLogVisibility } from "log"
@@ -759,6 +760,11 @@ class App {
// Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc.
// Basically that live "dandelion" display.
this.peerManager.registerRPC('getPeersForUser', (userID: any) => {
return [1, 2, 3, 4, 5];
});
this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => {
return [1, 2, 3, 4, 5]
});
@@ -767,12 +773,45 @@ class App {
console.log.apply(null, log("*************** after peerManager.connect"));;
if (!this.isBootstrapPeer) {
let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID);
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
if (this.isBootstrapPeer) {
return;
}
let usersToSync = await Sync.getFollowing(this.userID);
for (let userID of usersToSync) {
console.log(userID);
// this.peerManager.rpc.getPeersForUser(userID);
}
// for (let userID in this.sync.usersToSync()) {
// let peers = await this.peerManager.rpc.getPeersForUser(userID);
// for (let peer in peers) {
// let peer = await this.peerManager.connectToPeer(userID);
// let postIDs = peer.getPostIDsForUser(userID);
// let postIDsNeeded = this.sync.checkPostIds(userID, postIDs);
// if (postIDs.length === 0) {
// continue;
// }
// let posts = peer.rpc.getPostsForUser(userID, postIDs);
// this.sync.writePostsForUser(userID, posts);
// this.render();
// }
// }
let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID);
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
}
@@ -1538,10 +1577,10 @@ class App {
return { id: this.peerID, user: this.userID, user_name: this.username, peer_name: this.peername };
});
if (!this.isBootstrapPeer) {
let pong = await this.peerManager.rpc.ping(this.peerManager.bootstrapPeerID);
console.log.apply(null, log('pong from: ', pong));
}
// if (!this.isBootstrapPeer) {
// let pong = await this.peerManager.rpc.ping(this.peerManager.bootstrapPeerID);
// console.log.apply(null, log('pong from: ', pong));
// }
@@ -1557,13 +1596,18 @@ class App {
throw new Error();
}
this.peerManager.registerRPC('getPostIDsForUser', (args: any) => {
return [1, 2, 3, 4, 5]
this.peerManager.registerRPC('getPeersForUser', (userID: any) => {
return [1, 2, 3, 4, 5];
});
let postIDs = await this.peerManager.rpc.getPostIDsForUser("dummy_peer", "bloop");
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
// this.peerManager.registerRPC('getPostIDsForUser', (args: any) => {
// return [1, 2, 3, 4, 5];
// });
// let postIDs = await this.peerManager.rpc.getPostIDsForUser("dummy_peer", "bloop");
// console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
// this.peerManager.registerSearchQuery('find_peers_for_user', this.query_findPeersForUser);

View File

@@ -13,7 +13,8 @@
"db": "/static/db.js",
"IDUtils": "/static/IDUtils.js",
"PeerManager": "/static/PeerManager.js",
"log": "/static/log.js"
"log": "/static/log.js",
"Sync": "/static/Sync.js"
}
}
</script>

View File

@@ -29,6 +29,7 @@ Restruucture the app around the data. App/WS split is messy. Clean it up.
import { openDatabase, getData, addData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db";
import { generateID } from "IDUtils";
import { PeerManager, PeerEventTypes } from "PeerManager";
import { Sync } from "Sync";
import { log, logID, renderLog, setLogVisibility } from "log";
// let posts:any;
// let keyBase = "dandelion_posts_v1_"
@@ -584,17 +585,40 @@ class App {
// we could return progress information as we connect and have the app subscribe to that?
// Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc.
// Basically that live "dandelion" display.
this.peerManager.registerRPC('getPeersForUser', (userID) => {
return [1, 2, 3, 4, 5];
});
this.peerManager.registerRPC('getPostIDsForUser', (userID) => {
return [1, 2, 3, 4, 5];
});
await this.peerManager.connect();
console.log.apply(null, log("*************** after peerManager.connect"));
;
if (!this.isBootstrapPeer) {
if (this.isBootstrapPeer) {
return;
}
let usersToSync = await Sync.getFollowing(this.userID);
for (let userID of usersToSync) {
console.log(userID);
// this.peerManager.rpc.getPeersForUser(userID);
}
// for (let userID in this.sync.usersToSync()) {
// let peers = await this.peerManager.rpc.getPeersForUser(userID);
// for (let peer in peers) {
// let peer = await this.peerManager.connectToPeer(userID);
// let postIDs = peer.getPostIDsForUser(userID);
// let postIDsNeeded = this.sync.checkPostIds(userID, postIDs);
// if (postIDs.length === 0) {
// continue;
// }
// let posts = peer.rpc.getPostsForUser(userID, postIDs);
// this.sync.writePostsForUser(userID, posts);
// this.render();
// }
// }
let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID);
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
}
}
getPreferentialUserID() {
return this.router.userID.length !== 0 ? this.router.userID : this.userID;
}
@@ -1175,10 +1199,10 @@ class App {
this.peerManager.registerRPC('ping', (args) => {
return { id: this.peerID, user: this.userID, user_name: this.username, peer_name: this.peername };
});
if (!this.isBootstrapPeer) {
let pong = await this.peerManager.rpc.ping(this.peerManager.bootstrapPeerID);
console.log.apply(null, log('pong from: ', pong));
}
// if (!this.isBootstrapPeer) {
// let pong = await this.peerManager.rpc.ping(this.peerManager.bootstrapPeerID);
// console.log.apply(null, log('pong from: ', pong));
// }
// this.peerManager.registerRPC('getPostIDsForUser', (args: any) => {
// this.sync.getPostsForUser
// });
@@ -1187,11 +1211,14 @@ class App {
if (!this.peerManager) {
throw new Error();
}
this.peerManager.registerRPC('getPostIDsForUser', (args) => {
this.peerManager.registerRPC('getPeersForUser', (userID) => {
return [1, 2, 3, 4, 5];
});
let postIDs = await this.peerManager.rpc.getPostIDsForUser("dummy_peer", "bloop");
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
// this.peerManager.registerRPC('getPostIDsForUser', (args: any) => {
// return [1, 2, 3, 4, 5];
// });
// let postIDs = await this.peerManager.rpc.getPostIDsForUser("dummy_peer", "bloop");
// console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
// this.peerManager.registerSearchQuery('find_peers_for_user', this.query_findPeersForUser);
// let peers = await this.peerManager.search('find_peers_for_user', { 'user_id': 'bloop' });
}