From b46c600d753c5b6ce0d34008d3dd54164d49bee3 Mon Sep 17 00:00:00 2001 From: bobbydigitales Date: Sun, 18 May 2025 20:26:54 -0700 Subject: [PATCH] Announce users the local peers knows to remote peers when they connect. Fix RPC system to correctly serialize and multiple params. Add detailed logging for rpc calls. --- src/PeerManager.ts | 42 ++++++++++++++++-------- src/Sync.ts | 82 +++++++++++++++++++++++++++++++++++++++++++++- src/main2.ts | 46 ++++++++++++++++++++------ static/main2.js | 38 ++++++++++++++++----- 4 files changed, 174 insertions(+), 34 deletions(-) diff --git a/src/PeerManager.ts b/src/PeerManager.ts index d10d7d7..2db8a88 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -379,7 +379,7 @@ export class PeerManager { console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`)); await peer.disconnect(); - this.peers.delete(remotePeerID); + this.onPeerDisconnected(remotePeerID); } async call(peerID: string, functionName: string, args: any) { @@ -400,11 +400,11 @@ export class PeerManager { throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); } - return func(args); + return func.apply(null, args); } registerRPC(functionName: string, func: Function) { - this.rpc[functionName] = (peerID: string, args: any) => { + this.rpc[functionName] = (peerID: string, ...args: any) => { return this.call(peerID, functionName, args); }; this.RPC_remote.set(functionName, func); @@ -450,6 +450,12 @@ class PeerConnection { private polite = true; private webRTCSuperlog = false; private dataChannelSuperlog = false; + pendingRPCs: Map< + string, + { resolve: Function; reject: Function; functionName: string } + > = new Map(); + messageSuperlog: boolean = true; + connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null; // private makingOffer:boolean = false; // private ignoreOffer:boolean = false; @@ -467,13 +473,8 @@ class PeerConnection { // { urls: "stun:stun4.l.google.com" }, ], }; + rpcSuperlog: boolean = true; - pendingRPCs: Map< - string, - { resolve: Function; reject: Function; functionName: string } - > = new Map(); - messageSuperlog: boolean = false; - connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null; async RPCHandler(message: any) { } @@ -521,7 +522,7 @@ class PeerConnection { } this.dataChannel.onmessage = (e: MessageEvent) => { - this.dataChannelSuperlog && console.log.apply(null, log("->datachannel: ", e.data)); + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]: `, e.data)); this.onMessage(e.data); } @@ -695,9 +696,11 @@ class PeerConnection { } send(message: any) { - this.messageSuperlog && console.log.apply(null, log("<-datachannel:", message.type, message)); - + let messageJSON = JSON.stringify(message); + + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); + this.dataChannel?.send(messageJSON); // this.onMessage(messageJSON); @@ -718,6 +721,9 @@ class PeerConnection { args: args, }; + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + this.send(message); return promise; @@ -732,10 +738,12 @@ class PeerConnection { console.log.apply(null, log("PeerConnection.onMessage:", e)); } - this.messageSuperlog && console.log.apply(null, log("->", message.type, message)); + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); let type = message.type; if (type === "rpc_response") { + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + let pendingRPC = this.pendingRPCs.get(message.transaction_id); if (!pendingRPC) { @@ -747,10 +755,16 @@ class PeerConnection { if (type === "rpc_call") { + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + let response = this.peerManager.callFromRemote(message.function_name, message.args); - let responseMessage = { type: 'rpc_response', transaction_id: message.transaction_id, response: response }; + if (response === undefined) { + return; + } + let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; this.send(responseMessage); } diff --git a/src/Sync.ts b/src/Sync.ts index 076dee8..5cada9b 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -1,5 +1,60 @@ +import { openDatabase, getData, addData, addDataArray, clearData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db"; +import { log, logID } from "log"; + export class Sync { - static async getFollowing(userID: string): Promise { + userID: string = ""; + + userIDsToSync: Set = new Set(); + + constructor() { + } + + setUserID(userID: string) { + this.userIDsToSync = new Set(this.getFollowing(userID)); + } + + shouldSyncUserID(userID: string) { + return true; + } + + // shouldSyncUserID(userID: string) { + // if (app.isHeadless) { + // return true; + // } + + // return this.UserIDsTothis.has(userID); + // } + + async getKnownUsers() { + let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined); + knownUsers = knownUsers + .filter(userID => this.shouldSyncUserID(userID)) + .filter(userID => !this.userBlockList.has(userID)) + .filter(async userID => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID. + + return knownUsers; + } + + userPeers: Map = new Map(); + + + userBlockList = new Set([ + '5d63f0b2-a842-41bf-bf06-e0e4f6369271', + '5f1b85c4-b14c-454c-8df1-2cacc93f8a77', + // 'bba3ad24-9181-4e22-90c8-c265c80873ea' + ]) + + postBlockList = new Set([ + '1c71f53c-c467-48e4-bc8c-39005b37c0d5', + '64203497-f77b-40d6-9e76-34d17372e72a', + '243130d8-4a41-471e-8898-5075f1bd7aec', + 'e01eff89-5100-4b35-af4c-1c1bcb007dd0', + '194696a2-d850-4bb0-98f7-47416b3d1662', + 'f6b21eb1-a0ff-435b-8efc-6a3dd70c0dca', + 'dd1d92aa-aa24-4166-a925-94ba072a9048' + ]); + + getFollowing(userID: string): string[] { // Rob if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') { @@ -32,4 +87,29 @@ export class Sync { return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) } + + async getPostIdsForUser(userID: string) { + let postIds = await getAllIds(userID) ?? []; + postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID)); + if (postIds.length === 0) { + console.log.apply(null, log(`Net: I know about user ${logID(userID)} but I have 0 posts`));; + return null; + } + + return postIds; + } + + // async getPostIdsForUserHandler(data: any) { + // let message = data.message; + // let postIds = await getAllIds(message.user_id) ?? []; + // postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID)); + // if (postIds.length === 0) { + // console.log.apply(null, log(`Net: I know about user ${logID(message.user_id)} but I have 0 posts, so I'm not sending any to to peer ${logID(data.from)}`));; + // return; + // } + // console.log.apply(null, log(`Net: Sending ${postIds.length} post Ids for user ${logID(message.user_id)} to peer ${logID(data.from)}`)); + + // let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_post_ids_for_user_response", post_ids: postIds, user_id: message.user_id } } + // this.send(responseMessage); + // } } \ No newline at end of file diff --git a/src/main2.ts b/src/main2.ts index 39bb00b..a9c1fca 100644 --- a/src/main2.ts +++ b/src/main2.ts @@ -740,15 +740,29 @@ class App { connectURL: string = ""; firstRun = false; peerManager: PeerManager | null = null; + sync = new Sync(); async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); + if (this.peerManager === null) { + throw new Error(); + } this.registerRPCs(); - this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, (event: any) => { + this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event: any) => { + if (!this.peerManager) { + throw new Error(); + } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + let knownUsers = await this.sync.getKnownUsers(); + this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have - }) + }); + + this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => { + console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`)); + }); + console.log.apply(null, log("*************** before peerManager.connect")); @@ -760,13 +774,23 @@ class App { // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Basically that live "dandelion" display. + this.peerManager.registerRPC('announceUsers', (sendingPeerID: string, userIDs: string[]) => { + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + } + }); + this.peerManager.registerRPC('getPeersForUser', (userID: any) => { return [1, 2, 3, 4, 5]; }); this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => { - return [1, 2, 3, 4, 5] + let postIDs = this.sync.getPostIdsForUser(userID); + + return postIDs; }); await this.peerManager.connect(); @@ -777,12 +801,12 @@ class App { return; } - let usersToSync = await Sync.getFollowing(this.userID); + // let usersToSync = await Sync.getFollowing(this.userID); - for (let userID of usersToSync) { - console.log(userID); - // this.peerManager.rpc.getPeersForUser(userID); - } + // for (let userID of usersToSync) { + // console.log(userID); + // // this.peerManager.rpc.getPeersForUser(userID); + // } // for (let userID in this.sync.usersToSync()) { @@ -808,10 +832,10 @@ class App { // } - let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); + // let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); - console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); + // console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); } @@ -1635,6 +1659,8 @@ class App { this.userID = this.getUserID(); this.username = this.getUsername(); + this.sync.setUserID(this.userID); + this.connect(); // this.registerRPCs(); diff --git a/static/main2.js b/static/main2.js index 01b18a9..b1c4dc5 100644 --- a/static/main2.js +++ b/static/main2.js @@ -555,6 +555,7 @@ class App { this.connectURL = ""; this.firstRun = false; this.peerManager = null; + this.sync = new Sync(); this.time = 0; this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal']; this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy']; @@ -573,11 +574,22 @@ class App { } async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); + if (this.peerManager === null) { + throw new Error(); + } this.registerRPCs(); - this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, (event) => { + this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event) => { + if (!this.peerManager) { + throw new Error(); + } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + let knownUsers = await this.sync.getKnownUsers(); + this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have }); + this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event) => { + console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`)); + }); console.log.apply(null, log("*************** before peerManager.connect")); // We use promises here to only return from this call once we're connected to the boostrap peer // and the datachannel is open. @@ -585,11 +597,18 @@ class App { // we could return progress information as we connect and have the app subscribe to that? // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Basically that live "dandelion" display. + this.peerManager.registerRPC('announceUsers', (sendingPeerID, userIDs) => { + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + } + }); this.peerManager.registerRPC('getPeersForUser', (userID) => { return [1, 2, 3, 4, 5]; }); this.peerManager.registerRPC('getPostIDsForUser', (userID) => { - return [1, 2, 3, 4, 5]; + let postIDs = this.sync.getPostIdsForUser(userID); + return postIDs; }); await this.peerManager.connect(); console.log.apply(null, log("*************** after peerManager.connect")); @@ -597,11 +616,11 @@ class App { if (this.isBootstrapPeer) { return; } - let usersToSync = await Sync.getFollowing(this.userID); - for (let userID of usersToSync) { - console.log(userID); - // this.peerManager.rpc.getPeersForUser(userID); - } + // let usersToSync = await Sync.getFollowing(this.userID); + // for (let userID of usersToSync) { + // console.log(userID); + // // this.peerManager.rpc.getPeersForUser(userID); + // } // for (let userID in this.sync.usersToSync()) { // let peers = await this.peerManager.rpc.getPeersForUser(userID); // for (let peer in peers) { @@ -616,8 +635,8 @@ class App { // this.render(); // } // } - let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); - console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); + // let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); + // console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); } getPreferentialUserID() { return this.router.userID.length !== 0 ? this.router.userID : this.userID; @@ -1238,6 +1257,7 @@ class App { this.peername = this.getPeername(); this.userID = this.getUserID(); this.username = this.getUsername(); + this.sync.setUserID(this.userID); this.connect(); // this.registerRPCs(); // this.testPeerManager();