request post ids for users that match users we follow when peers announce users they know about. Fix for RPCs being async on the remote end. Check returned postIDs to see if we need any posts from a peer.
This commit is contained in:
@@ -390,17 +390,19 @@ export class PeerManager {
|
||||
return;
|
||||
}
|
||||
|
||||
return await peer.call(functionName, args);
|
||||
let returnValues = await peer.call(functionName, args);
|
||||
return returnValues;
|
||||
}
|
||||
|
||||
callFromRemote(functionName: string, args: any) {
|
||||
async callFromRemote(functionName: string, args: any) {
|
||||
let func = this.RPC_remote.get(functionName);
|
||||
|
||||
if (!func) {
|
||||
throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`);
|
||||
}
|
||||
|
||||
return func.apply(null, args);
|
||||
let returnValues = await func.apply(null, args);
|
||||
return returnValues;
|
||||
}
|
||||
|
||||
registerRPC(functionName: string, func: Function) {
|
||||
@@ -449,12 +451,13 @@ class PeerConnection {
|
||||
private isSettingRemoteAnswerPending: boolean = false;
|
||||
private polite = true;
|
||||
private webRTCSuperlog = false;
|
||||
private dataChannelSuperlog = false;
|
||||
private dataChannelSuperlog = true;
|
||||
messageSuperlog: boolean = true;
|
||||
rpcSuperlog: boolean = false;
|
||||
pendingRPCs: Map<
|
||||
string,
|
||||
{ resolve: Function; reject: Function; functionName: string }
|
||||
> = new Map();
|
||||
messageSuperlog: boolean = true;
|
||||
connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null;
|
||||
|
||||
// private makingOffer:boolean = false;
|
||||
@@ -473,7 +476,6 @@ class PeerConnection {
|
||||
// { urls: "stun:stun4.l.google.com" },
|
||||
],
|
||||
};
|
||||
rpcSuperlog: boolean = true;
|
||||
|
||||
|
||||
async RPCHandler(message: any) {
|
||||
@@ -729,7 +731,7 @@ class PeerConnection {
|
||||
return promise;
|
||||
}
|
||||
|
||||
onMessage(messageJSON: any) {
|
||||
async onMessage(messageJSON: any) {
|
||||
|
||||
let message: any = {};
|
||||
try {
|
||||
@@ -755,10 +757,14 @@ class PeerConnection {
|
||||
|
||||
if (type === "rpc_call") {
|
||||
|
||||
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
||||
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
||||
|
||||
|
||||
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
|
||||
|
||||
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
|
||||
|
||||
|
||||
let response = this.peerManager.callFromRemote(message.function_name, message.args);
|
||||
|
||||
if (response === undefined) {
|
||||
return;
|
||||
|
||||
62
src/Sync.ts
62
src/Sync.ts
@@ -2,12 +2,14 @@ import { openDatabase, getData, addData, addDataArray, clearData, deleteData, me
|
||||
import { log, logID } from "log";
|
||||
|
||||
export class Sync {
|
||||
isArchivePeer:boolean = false;
|
||||
isArchivePeer: boolean = false;
|
||||
userID: string = "";
|
||||
userPeers: Map<string, Set<string> = new Map();
|
||||
|
||||
userIDsToSync: Set<string> = new Set();
|
||||
syncSuperlog: boolean = true;
|
||||
|
||||
setArchive(isHeadless:boolean) {
|
||||
setArchive(isHeadless: boolean) {
|
||||
this.isArchivePeer = isHeadless;
|
||||
}
|
||||
|
||||
@@ -24,6 +26,40 @@ export class Sync {
|
||||
return this.userIDsToSync.has(userID);
|
||||
}
|
||||
|
||||
getPeersForUser(userID:string) {
|
||||
let peers = this.userPeers.get(userID);
|
||||
if (!peers) {
|
||||
return [];
|
||||
}
|
||||
|
||||
return [...peers.keys()];
|
||||
}
|
||||
|
||||
addUserPeer(userID: string, peerID: string) {
|
||||
this.syncSuperlog && console.log.apply(null, log(`[sync] addUserPeer user:${logID(userID)} peer:${logID(peerID)}`));;
|
||||
|
||||
if (!this.userPeers.has(userID)) {
|
||||
this.userPeers.set(userID, new Set());
|
||||
}
|
||||
|
||||
let peers = this.userPeers.get(userID) as Set<string>;
|
||||
peers.add(peerID);
|
||||
|
||||
|
||||
this.syncSuperlog && console.log.apply(null, log(this.userPeers));;
|
||||
}
|
||||
|
||||
deleteUserPeer(peerIDToDelete: string) {
|
||||
for (const peers of this.userPeers.values()) {
|
||||
for (const peerID of peers) {
|
||||
if (peerID === peerIDToDelete) {
|
||||
peers.delete(peerIDToDelete);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// shouldSyncUserID(userID: string) {
|
||||
// if (app.isHeadless) {
|
||||
// return true;
|
||||
@@ -42,9 +78,6 @@ export class Sync {
|
||||
return knownUsers;
|
||||
}
|
||||
|
||||
userPeers: Map<string, string[]> = new Map();
|
||||
|
||||
|
||||
userBlockList = new Set([
|
||||
'5d63f0b2-a842-41bf-bf06-e0e4f6369271',
|
||||
'5f1b85c4-b14c-454c-8df1-2cacc93f8a77',
|
||||
@@ -106,6 +139,25 @@ export class Sync {
|
||||
return postIds;
|
||||
}
|
||||
|
||||
async checkPostIds(userID:string, peerID:string, postIDs:string[]) {
|
||||
let startTime = performance.now();
|
||||
let neededPostIds = await checkPostIds(userID, postIDs);
|
||||
console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime) .toFixed(2)}ms`));
|
||||
|
||||
if (neededPostIds.length > 0) {
|
||||
console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));;
|
||||
} else {
|
||||
console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));;
|
||||
|
||||
}
|
||||
|
||||
// if (postIds.length === 0) {
|
||||
// return [];
|
||||
// }
|
||||
|
||||
return postIDs;
|
||||
}
|
||||
|
||||
// async getPostIdsForUserHandler(data: any) {
|
||||
// let message = data.message;
|
||||
// let postIds = await getAllIds(message.user_id) ?? [];
|
||||
|
||||
53
src/main2.ts
53
src/main2.ts
@@ -741,20 +741,47 @@ class App {
|
||||
connectURL: string = "";
|
||||
firstRun = false;
|
||||
peerManager: PeerManager | null = null;
|
||||
sync:Sync = new Sync();
|
||||
sync: Sync = new Sync();
|
||||
|
||||
async announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) {
|
||||
if (this.isBootstrapPeer) {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs));
|
||||
|
||||
for (let userID of userIDs) {
|
||||
console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`));
|
||||
this.sync.addUserPeer(userID, sendingPeerID);
|
||||
if (this.sync.shouldSyncUserID(userID)) {
|
||||
let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID);
|
||||
console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs));
|
||||
let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs);
|
||||
|
||||
console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts));
|
||||
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
async connect() {
|
||||
this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer);
|
||||
if (this.peerManager === null) {
|
||||
throw new Error();
|
||||
}
|
||||
this.registerRPCs();
|
||||
// this.registerRPCs();
|
||||
|
||||
this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event: any) => {
|
||||
if (!this.peerManager) {
|
||||
throw new Error();
|
||||
}
|
||||
console.log.apply(null, log(`[app]: peer connected:${event.peerID}`));
|
||||
|
||||
if (this.isBootstrapPeer) {
|
||||
return;
|
||||
}
|
||||
|
||||
let knownUsers = await this.sync.getKnownUsers();
|
||||
this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers);
|
||||
// rpc saying what peers we have
|
||||
@@ -776,11 +803,7 @@ class App {
|
||||
// Basically that live "dandelion" display.
|
||||
|
||||
this.peerManager.registerRPC('announceUsers', (sendingPeerID: string, userIDs: string[]) => {
|
||||
console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs));
|
||||
|
||||
for (let userID of userIDs) {
|
||||
console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`));
|
||||
}
|
||||
this.announceUser_rpc_response(sendingPeerID, userIDs);
|
||||
});
|
||||
|
||||
this.peerManager.registerRPC('getPeersForUser', (userID: any) => {
|
||||
@@ -788,8 +811,8 @@ class App {
|
||||
});
|
||||
|
||||
|
||||
this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => {
|
||||
let postIDs = this.sync.getPostIdsForUser(userID);
|
||||
this.peerManager.registerRPC('getPostIDsForUser', async (userID: any) => {
|
||||
let postIDs = await this.sync.getPostIdsForUser(userID);
|
||||
|
||||
return postIDs;
|
||||
});
|
||||
@@ -1647,7 +1670,7 @@ class App {
|
||||
let urlParams = (new URL(window.location.href)).searchParams;
|
||||
if (urlParams.has('log')) {
|
||||
this.showInfo();
|
||||
}
|
||||
}
|
||||
|
||||
this.isHeadless = /\bHeadlessChrome\//.test(navigator.userAgent) || urlParams.has('headless');
|
||||
this.isArchivePeer = urlParams.has('archive');
|
||||
@@ -1659,7 +1682,7 @@ class App {
|
||||
if (limitPostsParam) {
|
||||
this.limitPosts = parseInt(limitPostsParam);
|
||||
}
|
||||
|
||||
|
||||
this.peerID = this.getPeerID();
|
||||
this.peername = this.getPeername();
|
||||
this.userID = this.getUserID();
|
||||
@@ -2059,6 +2082,14 @@ namespace App {
|
||||
CONNECT,
|
||||
};
|
||||
|
||||
|
||||
export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) {
|
||||
throw new Error("Function not implemented.");
|
||||
}
|
||||
|
||||
export function announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) {
|
||||
throw new Error("Function not implemented.");
|
||||
}
|
||||
// export function connect() {
|
||||
// throw new Error("Function not implemented.");
|
||||
// }
|
||||
|
||||
Reference in New Issue
Block a user