This commit is contained in:
2025-04-13 15:10:07 -07:00
parent 5cd701ca2b
commit aade48a7b8
18 changed files with 3185 additions and 299 deletions

544
src/PeerManager.ts Normal file
View File

@@ -0,0 +1,544 @@
// connect to WS server, send info, connecto to bootstrap peer
// once connected to bootstrap peer,
// Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync.
// how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP.
import { generateID } from "IDUtils";
// Use a broadcast channel to only have one peer manager for multiple tabs,
// then we won't need to have a session ID as all queries for a peerID will be coming from the same peer manager
export class PeerManager {
routingTable: Map<string, string>;
private peers: Map<string, PeerConnection>;
private signaler: Signaler;
searchQueryFunctions: Map<string, Function> = new Map();
RPC_remote: Map<string, Function> = new Map();
rpc: { [key: string]: Function } = {};
isBootstrapPeer: boolean = false;
bootstrapPeerConnection: PeerConnection | null = null;
async onConnected(bootstrapPeerID: string) {
this.bootstrapPeerConnection = await this.connect(bootstrapPeerID);
}
constructor(userID: string, peerID: string, isBootstrapPeer: boolean) {
this.isBootstrapPeer = isBootstrapPeer;
this.peers = new Map();
this.routingTable = new Map();
this.signaler = new Signaler(userID, peerID, isBootstrapPeer, this.onConnected.bind(this));
// Testing
let dummyPeer = new PeerConnection(this, "dummy_peer", this.signaler);
this.peers.set("dummy_peer", dummyPeer);
}
async connect(remotePeerID: string) {
// Connect to the peer that has the peer id peerID
let peerConnection = new PeerConnection(this, remotePeerID, this.signaler);
await peerConnection.connect();
this.peers.set(remotePeerID, peerConnection);
return peerConnection;
}
async disconnect(remotePeerID: string) {
let peer = this.peers.get(remotePeerID);
if (!peer) {
console.log(`PeerManager.disconnect: couln't find peer ${remotePeerID}`);
return;
}
await peer.disconnect();
this.peers.delete(remotePeerID);
}
async call(peerID: string, functionName: string, args: any) {
let peer = this.peers.get(peerID);
if (!peer) {
console.log(`Can't find peer ${peerID}`);
return;
}
return await peer.call(functionName, args);
}
callFromRemote(functionName: string, args: any) {
let func = this.RPC_remote.get(functionName);
if (!func) {
throw new Error();
}
return func(args);
}
registerRPC(functionName: string, func: Function) {
this.rpc[functionName] = (peerID: string, args: any) => {
return this.call(peerID, functionName, args);
};
this.RPC_remote.set(functionName, func);
}
registerSearchQuery(searchType: string, queryFunction: Function) {
this.searchQueryFunctions.set(searchType, queryFunction);
}
async search(type: string, message: any) {
let promises = [];
for (let peer of this.peers.values()) {
promises.push(peer.call(type, message));
}
return await Promise.allSettled(promises);
}
onMessage(remotePeerID: string, message: any) {
console.log(remotePeerID, message);
}
}
function log(...args: any[]): void {
for (let arg of args) {
console.log("[LOG]", arg);
}
}
interface Message {
type: string;
from_peer: string;
to_peer: string;
from_username: string;
from_peername: string;
peer_message: any;
}
// Initially this wil be the bootstrap peer, We'll keep a connection to it and it will keep a list of all connected peers.
// Eventually we will replace this with connecting via other peers.
class Signaler {
websocket: WebSocket;
sessionID: string;
userID: string;
peerID: string;
bootstrapPeerID: string = "";
private isBootstrapPeer: boolean = false;
private onConnected: Function;
constructor(userID: string, peerID: string, isBootstrapPeer: boolean, onConnected: Function) {
this.onConnected = onConnected;
this.isBootstrapPeer = isBootstrapPeer;
this.sessionID = generateID();
this.userID = userID;
this.peerID = peerID;
try {
this.websocket = new WebSocket(
`wss://${window.location.hostname}:${window.location.port}/ws`,
);
} catch (error: any) {
throw new Error(error.message);
}
this.websocket.onopen = async (event) => {
log("signaler:ws:onopen");
await this.sendHello2();
};
this.websocket.onmessage = this.onMessage.bind(this);
}
sendPeerMessage(remotePeerID: string, peerMessage: { type: string; description: RTCSessionDescription; }) {
this.send({
type: "peer_message",
from: this.peerID,
to: remotePeerID,
from_username: "blah user",
from_peername: "blah peer",
message: peerMessage
})
// let responseMessage = { type: "peer_message",
// from: app.peerID,
// to: data.from,
// from_username: app.username,
// from_peername: app.peername,
// message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } }
}
async sendHello2() {
this.send({
type: "hello2",
user_id: this.userID,
// user_name: app.username,
peer_id: this.peerID,
session_id: this.sessionID,
// peer_name: app.peername,
is_bootstrap_peer: this.isBootstrapPeer,
// peer_description: this.rtcPeerDescription
});
}
connect() {
}
onMessage(event: MessageEvent) {
let messageJSON = event.data;
let message: any = null;
try {
message = JSON.parse(messageJSON);
} catch (e) {
console.log(e);
throw new Error();
}
console.log("->signaler:", message);
if (message.type === "hello2") {
if (!this.isBootstrapPeer) {
this.bootstrapPeerID = message.bootstrapPeers[0];
}
this.onConnected(this.bootstrapPeerID);
}
}
send(message: any) {
let messageJSON = "";
try {
messageJSON = JSON.stringify(message);
} catch (e) {
console.log(e);
return;
}
console.log("<-signaler:", message);
this.websocket.send(messageJSON);
}
// sendPeerMessage
}
class PeerConnection {
private remotePeerID: string;
private signaler: Signaler;
private peerManager: PeerManager;
private dataChannel: RTCDataChannel | null = null;
private messageHandlers: Map<string, Function> = new Map();
// private makingOffer:boolean = false;
// private ignoreOffer:boolean = false;
rtcPeer: RTCPeerConnection | null = null;
static config = {
iceServers: [
{ urls: "stun:ddln.app" },
// { urls: "turn:ddln.app", username: "a", credential: "b" },
{ urls: "stun:stun.l.google.com" }, // keeping this for now as my STUN server is not return ipv6
// { urls: "stun:stun1.l.google.com" },
// { urls: "stun:stun2.l.google.com" },
// { urls: "stun:stun3.l.google.com" },
// { urls: "stun:stun4.l.google.com" },
],
};
pendingRPCs: Map<
string,
{ resolve: Function; reject: Function; functionName: string }
> = new Map();
messageSuperlog: boolean = true;
async RPCHandler(message: any) {
}
constructor(
peerManager: PeerManager,
remotePeerID: string,
signaler: Signaler,
) {
this.peerManager = peerManager;
this.remotePeerID = remotePeerID;
this.signaler = signaler;
}
async connect() {
this.rtcPeer = new RTCPeerConnection(PeerConnection.config);
this.dataChannel = this.rtcPeer.createDataChannel("ddln_main");
if (this.rtcPeer === null) {
return;
}
// this.rtcPeer.onicecandidate = ({ candidate }) => this.signaler.send(JSON.stringify({ candidate }));
this.rtcPeer.onicecandidate = ({ candidate }) => console.log(candidate);
this.rtcPeer.onnegotiationneeded = async (event) => {
log("on negotiation needed fired");
if (!this.rtcPeer) {
throw new Error();
}
let makingOffer = false;
try {
makingOffer = true;
await this.rtcPeer.setLocalDescription();
if (!this.rtcPeer.localDescription) {
return;
}
this.signaler.sendPeerMessage(this.remotePeerID, { type: "rtcDescription", description: this.rtcPeer.localDescription });
} catch (err) {
console.error(err);
} finally {
makingOffer = false;
}
};
}
async disconnect() {
}
send(message: any) {
this.messageSuperlog && console.log("<-", message.type, message);
let messageJSON = JSON.stringify(message);
// this.dataChannel?.send();
this.onMessage(messageJSON);
}
call(functionName: string, args: any) {
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
// Think about a timeout here to auto reject it after a while.
let promise = new Promise((resolve, reject) => {
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
// setTimeout(() => reject("bad"), 1000);
});
let message = {
type: "rpc_call",
transaction_id: transactionID,
function_name: functionName,
args: args,
};
this.send(message);
return promise;
}
onMessage(messageJSON: any) {
let message: any = {};
try {
message = JSON.parse(messageJSON);
} catch (e) {
console.log("PeerConnection.onMessage:", e);
}
this.messageSuperlog && console.log("->", message.type, message);
let type = message.type;
if (type === "rpc_response") {
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
if (!pendingRPC) {
throw new Error();
}
pendingRPC.resolve(message.response);
}
if (type === "rpc_call") {
let response = this.peerManager.callFromRemote(message.function_name, message.args);
let responseMessage = { type: 'rpc_response', transaction_id: message.transaction_id, response: response };
this.send(responseMessage);
}
// this.peerManger.onMessage(this.remotePeerID, message);
}
}
// export class PeerConnection2 {
// id: string;
// private makingOffer:boolean = false;
// private ignoreOffer:boolean = false;
// rtcPeer: RTCPeerConnection;
// signaler:WebSocket;
// static config = {
// iceServers: [
// { urls: "stun:stun.l.google.com" },
// { urls: "stun:stun1.l.google.com" },
// { urls: "stun:stun2.l.google.com" },
// { urls: "stun:stun3.l.google.com" },
// { urls: "stun:stun4.l.google.com" },
// ],
// };
// constructor(remotePeerID: string, signaler:WebSocket) {
// this.id = remotePeerID;
// this.rtcPeer = new RTCPeerConnection(PeerConnection2.config);
// this.signaler = signaler;;
// this.rtcPeer.onnegotiationneeded = async () => {
// try {
// this.makingOffer = true;
// await this.rtcPeer.setLocalDescription();
// signaler.send(JSON.stringify({ description: this.rtcPeer.localDescription }));
// } catch (err) {
// console.error(err);
// } finally {
// this.makingOffer = false;
// }
// };
// this.rtcPeer.onicecandidate = ({ candidate }) => signaler.send(JSON.stringify({ candidate }));
// this.ignoreOffer = false;
// }
// onSignallerMessage = async ({ data: { description, candidate } }: MessageEvent) => {
// try {
// if (description) {
// // const offerCollision =
// // description.type === "offer" &&
// // (this.makingOffer || this.rtcPeer.signalingState !== "stable");
// // this.ignoreOffer = !polite && offerCollision;
// if (this.ignoreOffer) {
// return;
// }
// await this.rtcPeer.setRemoteDescription(description);
// if (description.type === "offer") {
// await this.rtcPeer.setLocalDescription();
// this.signaler.send(JSON.stringify({ description: this.rtcPeer.localDescription }));
// }
// } else if (candidate) {
// try {
// await this.rtcPeer.addIceCandidate(candidate);
// } catch (err) {
// if (!this.ignoreOffer) {
// throw err;
// }
// }
// }
// } catch (err) {
// console.error(err);
// }
// };
// }
// // const config = {
// // iceServers: [{ urls: "stun:stun.mystunserver.tld" }],
// // };
// // let polite = true;
// // const signaler = new SignalingChannel();
// // const signaler: any = {}
// // const rtcPeer = new RTCPeerConnection(config);
// // // const constraints = { audio: true, video: true };
// // const selfVideo = document.querySelector("video.selfview");
// // const remoteVideo = document.querySelector("video.remoteview");
// // async function start() {
// // try {
// // const stream = await navigator.mediaDevices.getUserMedia(constraints);
// // for (const track of stream.getTracks()) {
// // pc.addTrack(track, stream);
// // }
// // selfVideo.srcObject = stream;
// // } catch (err) {
// // console.error(err);
// // }
// // }
// // rtcPeer.ontrack = ({ track, streams }) => {
// // track.onunmute = () => {
// // if (remoteVideo.srcObject) {
// // return;
// // }
// // remoteVideo.srcObject = streams[0];
// // };
// // };
// // makingOffer = false;
// // rtcPeer.onnegotiationneeded = async () => {
// // try {
// // // makingOffer = true;
// // await rtcPeer.setLocalDescription();
// // signaler.send({ description: rtcPeer.localDescription });
// // } catch (err) {
// // console.error(err);
// // } finally {
// // makingOffer = false;
// // }
// // };
// // rtcPeer.onicecandidate = ({ candidate }) => signaler.send({ candidate });
// // let ignoreOffer = false;
// // signaler.onmessage = async ({ data: { description, candidate } }: MessageEvent) => {
// // try {
// // if (description) {
// // const offerCollision =
// // description.type === "offer" &&
// // // (makingOffer || rtcPeer.signalingState !== "stable");
// // ignoreOffer = !polite && offerCollision;
// // if (ignoreOffer) {
// // return;
// // }
// // await rtcPeer.setRemoteDescription(description);
// // if (description.type === "offer") {
// // await rtcPeer.setLocalDescription();
// // signaler.send({ description: rtcPeer.localDescription });
// // }
// // } else if (candidate) {
// // try {
// // await rtcPeer.addIceCandidate(candidate);
// // } catch (err) {
// // if (!ignoreOffer) {
// // throw err;
// // }
// // }
// // }
// // } catch (err) {
// // console.error(err);
// // }
// // };