diff --git a/src/PeerManager.ts b/src/PeerManager.ts index 2db8a88..e1e89ae 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -390,17 +390,19 @@ export class PeerManager { return; } - return await peer.call(functionName, args); + let returnValues = await peer.call(functionName, args); + return returnValues; } - callFromRemote(functionName: string, args: any) { + async callFromRemote(functionName: string, args: any) { let func = this.RPC_remote.get(functionName); if (!func) { throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); } - return func.apply(null, args); + let returnValues = await func.apply(null, args); + return returnValues; } registerRPC(functionName: string, func: Function) { @@ -449,12 +451,13 @@ class PeerConnection { private isSettingRemoteAnswerPending: boolean = false; private polite = true; private webRTCSuperlog = false; - private dataChannelSuperlog = false; + private dataChannelSuperlog = true; + messageSuperlog: boolean = true; + rpcSuperlog: boolean = false; pendingRPCs: Map< string, { resolve: Function; reject: Function; functionName: string } > = new Map(); - messageSuperlog: boolean = true; connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null; // private makingOffer:boolean = false; @@ -473,7 +476,6 @@ class PeerConnection { // { urls: "stun:stun4.l.google.com" }, ], }; - rpcSuperlog: boolean = true; async RPCHandler(message: any) { @@ -729,7 +731,7 @@ class PeerConnection { return promise; } - onMessage(messageJSON: any) { + async onMessage(messageJSON: any) { let message: any = {}; try { @@ -755,10 +757,14 @@ class PeerConnection { if (type === "rpc_call") { - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + + let response = await this.peerManager.callFromRemote(message.function_name, message.args); + + this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); - let response = this.peerManager.callFromRemote(message.function_name, message.args); if (response === undefined) { return; diff --git a/src/Sync.ts b/src/Sync.ts index 9580660..e9343e4 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -2,12 +2,14 @@ import { openDatabase, getData, addData, addDataArray, clearData, deleteData, me import { log, logID } from "log"; export class Sync { - isArchivePeer:boolean = false; + isArchivePeer: boolean = false; userID: string = ""; + userPeers: Map = new Map(); userIDsToSync: Set = new Set(); + syncSuperlog: boolean = true; - setArchive(isHeadless:boolean) { + setArchive(isHeadless: boolean) { this.isArchivePeer = isHeadless; } @@ -24,6 +26,40 @@ export class Sync { return this.userIDsToSync.has(userID); } + getPeersForUser(userID:string) { + let peers = this.userPeers.get(userID); + if (!peers) { + return []; + } + + return [...peers.keys()]; + } + + addUserPeer(userID: string, peerID: string) { + this.syncSuperlog && console.log.apply(null, log(`[sync] addUserPeer user:${logID(userID)} peer:${logID(peerID)}`));; + + if (!this.userPeers.has(userID)) { + this.userPeers.set(userID, new Set()); + } + + let peers = this.userPeers.get(userID) as Set; + peers.add(peerID); + + + this.syncSuperlog && console.log.apply(null, log(this.userPeers));; + } + + deleteUserPeer(peerIDToDelete: string) { + for (const peers of this.userPeers.values()) { + for (const peerID of peers) { + if (peerID === peerIDToDelete) { + peers.delete(peerIDToDelete); + } + } + } + } + + // shouldSyncUserID(userID: string) { // if (app.isHeadless) { // return true; @@ -42,9 +78,6 @@ export class Sync { return knownUsers; } - userPeers: Map = new Map(); - - userBlockList = new Set([ '5d63f0b2-a842-41bf-bf06-e0e4f6369271', '5f1b85c4-b14c-454c-8df1-2cacc93f8a77', @@ -106,6 +139,25 @@ export class Sync { return postIds; } + async checkPostIds(userID:string, peerID:string, postIDs:string[]) { + let startTime = performance.now(); + let neededPostIds = await checkPostIds(userID, postIDs); + console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime) .toFixed(2)}ms`)); + + if (neededPostIds.length > 0) { + console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));; + } else { + console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));; + + } + + // if (postIds.length === 0) { + // return []; + // } + + return postIDs; + } + // async getPostIdsForUserHandler(data: any) { // let message = data.message; // let postIds = await getAllIds(message.user_id) ?? []; diff --git a/src/main2.ts b/src/main2.ts index aaa6105..56978e5 100644 --- a/src/main2.ts +++ b/src/main2.ts @@ -741,20 +741,47 @@ class App { connectURL: string = ""; firstRun = false; peerManager: PeerManager | null = null; - sync:Sync = new Sync(); + sync: Sync = new Sync(); + + async announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + if (this.isBootstrapPeer) { + return; + } + + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + this.sync.addUserPeer(userID, sendingPeerID); + if (this.sync.shouldSyncUserID(userID)) { + let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); + console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + + console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + + } + }; + } + async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); if (this.peerManager === null) { throw new Error(); } - this.registerRPCs(); + // this.registerRPCs(); this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event: any) => { if (!this.peerManager) { throw new Error(); } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + + if (this.isBootstrapPeer) { + return; + } + let knownUsers = await this.sync.getKnownUsers(); this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have @@ -776,11 +803,7 @@ class App { // Basically that live "dandelion" display. this.peerManager.registerRPC('announceUsers', (sendingPeerID: string, userIDs: string[]) => { - console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); - - for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); - } + this.announceUser_rpc_response(sendingPeerID, userIDs); }); this.peerManager.registerRPC('getPeersForUser', (userID: any) => { @@ -788,8 +811,8 @@ class App { }); - this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => { - let postIDs = this.sync.getPostIdsForUser(userID); + this.peerManager.registerRPC('getPostIDsForUser', async (userID: any) => { + let postIDs = await this.sync.getPostIdsForUser(userID); return postIDs; }); @@ -1647,7 +1670,7 @@ class App { let urlParams = (new URL(window.location.href)).searchParams; if (urlParams.has('log')) { this.showInfo(); - } + } this.isHeadless = /\bHeadlessChrome\//.test(navigator.userAgent) || urlParams.has('headless'); this.isArchivePeer = urlParams.has('archive'); @@ -1659,7 +1682,7 @@ class App { if (limitPostsParam) { this.limitPosts = parseInt(limitPostsParam); } - + this.peerID = this.getPeerID(); this.peername = this.getPeername(); this.userID = this.getUserID(); @@ -2059,6 +2082,14 @@ namespace App { CONNECT, }; + + export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + throw new Error("Function not implemented."); + } + + export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { + throw new Error("Function not implemented."); + } // export function connect() { // throw new Error("Function not implemented."); // } diff --git a/static/main2.js b/static/main2.js index 1e2fb86..1f9bec5 100644 --- a/static/main2.js +++ b/static/main2.js @@ -573,17 +573,37 @@ class App { mediaID: '' }; } + async announceUser_rpc_response(sendingPeerID, userIDs) { + if (this.isBootstrapPeer) { + return; + } + console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); + for (let userID of userIDs) { + console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + this.sync.addUserPeer(userID, sendingPeerID); + if (this.sync.shouldSyncUserID(userID)) { + let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); + console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + } + } + ; + } async connect() { this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); if (this.peerManager === null) { throw new Error(); } - this.registerRPCs(); + // this.registerRPCs(); this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event) => { if (!this.peerManager) { throw new Error(); } console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); + if (this.isBootstrapPeer) { + return; + } let knownUsers = await this.sync.getKnownUsers(); this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers); // rpc saying what peers we have @@ -599,16 +619,13 @@ class App { // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Basically that live "dandelion" display. this.peerManager.registerRPC('announceUsers', (sendingPeerID, userIDs) => { - console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); - for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); - } + this.announceUser_rpc_response(sendingPeerID, userIDs); }); this.peerManager.registerRPC('getPeersForUser', (userID) => { return [1, 2, 3, 4, 5]; }); - this.peerManager.registerRPC('getPostIDsForUser', (userID) => { - let postIDs = this.sync.getPostIdsForUser(userID); + this.peerManager.registerRPC('getPostIDsForUser', async (userID) => { + let postIDs = await this.sync.getPostIdsForUser(userID); return postIDs; }); await this.peerManager.connect(); @@ -1550,6 +1567,14 @@ class App { Route[Route["CONNECT"] = 5] = "CONNECT"; })(Route = App.Route || (App.Route = {})); ; + function announceUser_rpc_response(sendingPeerID, userIDs) { + throw new Error("Function not implemented."); + } + App.announceUser_rpc_response = announceUser_rpc_response; + function announceUser_rpc_response(sendingPeerID, userIDs) { + throw new Error("Function not implemented."); + } + App.announceUser_rpc_response = announceUser_rpc_response; // export function connect() { // throw new Error("Function not implemented."); // }