From cedc1d7b369a81b2ca97dd17404a69311b798be0 Mon Sep 17 00:00:00 2001 From: bobbydigitales Date: Mon, 19 May 2025 00:06:23 -0700 Subject: [PATCH] request post ids for users that match users we follow when peers announce users they know about. Fix for RPCs being async on the remote end. Check returned postIDs to see if we need any posts from a peer. --- src/PeerManager.ts | 24 +++++++++++------- src/Sync.ts | 62 ++++++++++++++++++++++++++++++++++++++++++---- src/main2.ts | 53 +++++++++++++++++++++++++++++++-------- static/main2.js | 39 +++++++++++++++++++++++------ 4 files changed, 146 insertions(+), 32 deletions(-) diff --git a/src/PeerManager.ts b/src/PeerManager.ts index 2db8a88..e1e89ae 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -390,17 +390,19 @@ export class PeerManager { return; } - return await peer.call(functionName, args); + let returnValues = await peer.call(functionName, args); + return returnValues; } - callFromRemote(functionName: string, args: any) { + async callFromRemote(functionName: string, args: any) { let func = this.RPC_remote.get(functionName); if (!func) { throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); } - return func.apply(null, args); + let returnValues = await func.apply(null, args); + return returnValues; } registerRPC(functionName: string, func: Function) { @@ -449,12 +451,13 @@ class PeerConnection { private isSettingRemoteAnswerPending: boolean = false; private polite = true; private webRTCSuperlog = false; - private dataChannelSuperlog = false; + private dataChannelSuperlog = true; + messageSuperlog: boolean = true; + rpcSuperlog: boolean = false; pendingRPCs: Map< string, { resolve: Function; reject: Function; functionName: string } > = new Map(); - messageSuperlog: boolean = true; connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null; // private makingOffer:boolean = false; @@ -473,7 +476,6 @@ class PeerConnection { // { urls: "stun:stun4.l.google.com" }, ], }; - rpcSuperlog: boolean = true; async RPCHandler(message: any) { @@ -729,7 +731,7 @@ class PeerConnection { return promise; } - onMessage(messageJSON: any) { + async onMessage(messageJSON: any) { let message: any = {}; try { @@ -755,10 +757,14 @@ class PeerConnection { if (type === "rpc_call") { - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + + let response = await this.peerManager.callFromRemote(message.function_name, message.args); + + this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); - let response = this.peerManager.callFromRemote(message.function_name, message.args); if (response === undefined) { return; diff --git a/src/Sync.ts b/src/Sync.ts index 9580660..e9343e4 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -2,12 +2,14 @@ import { openDatabase, getData, addData, addDataArray, clearData, deleteData, me import { log, logID } from "log"; export class Sync { - isArchivePeer:boolean = false; + isArchivePeer: boolean = false; userID: string = ""; + userPeers: Map = new Map(); userIDsToSync: Set = new Set(); + syncSuperlog: boolean = true; - setArchive(isHeadless:boolean) { + setArchive(isHeadless: boolean) { this.isArchivePeer = isHeadless; } @@ -24,6 +26,40 @@ export class Sync { return this.userIDsToSync.has(userID); } + getPeersForUser(userID:string) { + let peers = this.userPeers.get(userID); + if (!peers) { + return []; + } + + return [...peers.keys()]; + } + + addUserPeer(userID: string, peerID: string) { + this.syncSuperlog && console.log.apply(null, log(`[sync] addUserPeer user:${logID(userID)} peer:${logID(peerID)}`));; + + if (!this.userPeers.has(userID)) { + this.userPeers.set(userID, new Set()); + } + + let peers = this.userPeers.get(userID) as Set; + peers.add(peerID); + + + this.syncSuperlog && console.log.apply(null, log(this.userPeers));; + } + + deleteUserPeer(peerIDToDelete: string) { + for (const peers of this.userPeers.values()) { + for (const peerID of peers) { + if (peerID === peerIDToDelete) { + peers.delete(peerIDToDelete); + } + } + } + } + + // shouldSyncUserID(userID: string) { // if (app.isHeadless) { // return true; @@ -42,9 +78,6 @@ export class Sync { return knownUsers; } - userPeers: Map = new Map(); - - userBlockList = new Set([ '5d63f0b2-a842-41bf-bf06-e0e4f6369271', '5f1b85c4-b14c-454c-8df1-2cacc93f8a77', @@ -106,6 +139,25 @@ export class Sync { return postIds; } + async checkPostIds(userID:string, peerID:string, postIDs:string[]) { + let startTime = performance.now(); + let neededPostIds = await checkPostIds(userID, postIDs); + console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime) .toFixed(2)}ms`)); + + if (neededPostIds.length > 0) { + console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));; + } else { + console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));; + + } + + // if (postIds.length === 0) { + // return []; + // } + + return postIDs; + } + // async getPostIdsForUserHandler(data: any) { // let message = data.message; // let postIds = await getAllIds(message.user_id) ?? []; diff --git a/src/main2.ts b/src/main2.ts index aaa6105..56978e5 100644 --- a/src/main2.ts +++ b/src/main2.ts @@ -741,20 +741,47 @@ class App { connectURL: string = ""; firstRun = false; peerManager: PeerManager | null = null; - sync:Sync = new Sync(); + sync: Sync = new Sync(); + + async announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + if (this.isBootstrapPeer) { + return; + } + + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + this.sync.addUserPeer(userID, sendingPeerID); + if (this.sync.shouldSyncUserID(userID)) { + let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); + console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + + console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + + } + }; + } + async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); if (this.peerManager === null) { throw new Error(); } - this.registerRPCs(); + // this.registerRPCs(); this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event: any) => { if (!this.peerManager) { throw new Error(); } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + + if (this.isBootstrapPeer) { + return; + } + let knownUsers = await this.sync.getKnownUsers(); this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have @@ -776,11 +803,7 @@ class App { // Basically that live "dandelion" display. this.peerManager.registerRPC('announceUsers', (sendingPeerID: string, userIDs: string[]) => { - console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); - - for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); - } + this.announceUser_rpc_response(sendingPeerID, userIDs); }); this.peerManager.registerRPC('getPeersForUser', (userID: any) => { @@ -788,8 +811,8 @@ class App { }); - this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => { - let postIDs = this.sync.getPostIdsForUser(userID); + this.peerManager.registerRPC('getPostIDsForUser', async (userID: any) => { + let postIDs = await this.sync.getPostIdsForUser(userID); return postIDs; }); @@ -1647,7 +1670,7 @@ class App { let urlParams = (new URL(window.location.href)).searchParams; if (urlParams.has('log')) { this.showInfo(); - } + } this.isHeadless = /\bHeadlessChrome\//.test(navigator.userAgent) || urlParams.has('headless'); this.isArchivePeer = urlParams.has('archive'); @@ -1659,7 +1682,7 @@ class App { if (limitPostsParam) { this.limitPosts = parseInt(limitPostsParam); } - + this.peerID = this.getPeerID(); this.peername = this.getPeername(); this.userID = this.getUserID(); @@ -2059,6 +2082,14 @@ namespace App { CONNECT, }; + + export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + throw new Error("Function not implemented."); + } + + export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + throw new Error("Function not implemented."); + } // export function connect() { // throw new Error("Function not implemented."); // } diff --git a/static/main2.js b/static/main2.js index 1e2fb86..1f9bec5 100644 --- a/static/main2.js +++ b/static/main2.js @@ -573,17 +573,37 @@ class App { mediaID: '' }; } + async announceUser_rpc_response(sendingPeerID, userIDs) { + if (this.isBootstrapPeer) { + return; + } + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + this.sync.addUserPeer(userID, sendingPeerID); + if (this.sync.shouldSyncUserID(userID)) { + let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); + console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + } + } + ; + } async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); if (this.peerManager === null) { throw new Error(); } - this.registerRPCs(); + // this.registerRPCs(); this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event) => { if (!this.peerManager) { throw new Error(); } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + if (this.isBootstrapPeer) { + return; + } let knownUsers = await this.sync.getKnownUsers(); this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have @@ -599,16 +619,13 @@ class App { // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Basically that live "dandelion" display. this.peerManager.registerRPC('announceUsers', (sendingPeerID, userIDs) => { - console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); - for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); - } + this.announceUser_rpc_response(sendingPeerID, userIDs); }); this.peerManager.registerRPC('getPeersForUser', (userID) => { return [1, 2, 3, 4, 5]; }); - this.peerManager.registerRPC('getPostIDsForUser', (userID) => { - let postIDs = this.sync.getPostIdsForUser(userID); + this.peerManager.registerRPC('getPostIDsForUser', async (userID) => { + let postIDs = await this.sync.getPostIdsForUser(userID); return postIDs; }); await this.peerManager.connect(); @@ -1550,6 +1567,14 @@ class App { Route[Route["CONNECT"] = 5] = "CONNECT"; })(Route = App.Route || (App.Route = {})); ; + function announceUser_rpc_response(sendingPeerID, userIDs) { + throw new Error("Function not implemented."); + } + App.announceUser_rpc_response = announceUser_rpc_response; + function announceUser_rpc_response(sendingPeerID, userIDs) { + throw new Error("Function not implemented."); + } + App.announceUser_rpc_response = announceUser_rpc_response; // export function connect() { // throw new Error("Function not implemented."); // }