Announce users the local peers knows to remote peers when they connect. Fix RPC system to correctly serialize and multiple params. Add detailed logging for rpc calls.

This commit is contained in:
2025-05-18 20:26:54 -07:00
parent 15e595cca1
commit b46c600d75
4 changed files with 174 additions and 34 deletions

View File

@@ -379,7 +379,7 @@ export class PeerManager {
console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`)); console.log.apply(null, log(`PeerManager.disconnect: disconnecting peer ${remotePeerID}`));
await peer.disconnect(); await peer.disconnect();
this.peers.delete(remotePeerID); this.onPeerDisconnected(remotePeerID);
} }
async call(peerID: string, functionName: string, args: any) { async call(peerID: string, functionName: string, args: any) {
@@ -400,11 +400,11 @@ export class PeerManager {
throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`); throw new Error(`callFromRemote: got RPC we don't know about: ${functionName}, ${args}`);
} }
return func(args); return func.apply(null, args);
} }
registerRPC(functionName: string, func: Function) { registerRPC(functionName: string, func: Function) {
this.rpc[functionName] = (peerID: string, args: any) => { this.rpc[functionName] = (peerID: string, ...args: any) => {
return this.call(peerID, functionName, args); return this.call(peerID, functionName, args);
}; };
this.RPC_remote.set(functionName, func); this.RPC_remote.set(functionName, func);
@@ -450,6 +450,12 @@ class PeerConnection {
private polite = true; private polite = true;
private webRTCSuperlog = false; private webRTCSuperlog = false;
private dataChannelSuperlog = false; private dataChannelSuperlog = false;
pendingRPCs: Map<
string,
{ resolve: Function; reject: Function; functionName: string }
> = new Map();
messageSuperlog: boolean = true;
connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null;
// private makingOffer:boolean = false; // private makingOffer:boolean = false;
// private ignoreOffer:boolean = false; // private ignoreOffer:boolean = false;
@@ -467,13 +473,8 @@ class PeerConnection {
// { urls: "stun:stun4.l.google.com" }, // { urls: "stun:stun4.l.google.com" },
], ],
}; };
rpcSuperlog: boolean = true;
pendingRPCs: Map<
string,
{ resolve: Function; reject: Function; functionName: string }
> = new Map();
messageSuperlog: boolean = false;
connectionPromise: { resolve: (value?: unknown) => void; reject: (reason?: any) => void; } | null = null;
async RPCHandler(message: any) { async RPCHandler(message: any) {
} }
@@ -521,7 +522,7 @@ class PeerConnection {
} }
this.dataChannel.onmessage = (e: MessageEvent) => { this.dataChannel.onmessage = (e: MessageEvent) => {
this.dataChannelSuperlog && console.log.apply(null, log("->datachannel: ", e.data)); this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]: `, e.data));
this.onMessage(e.data); this.onMessage(e.data);
} }
@@ -695,9 +696,11 @@ class PeerConnection {
} }
send(message: any) { send(message: any) {
this.messageSuperlog && console.log.apply(null, log("<-datachannel:", message.type, message));
let messageJSON = JSON.stringify(message); let messageJSON = JSON.stringify(message);
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
this.dataChannel?.send(messageJSON); this.dataChannel?.send(messageJSON);
// this.onMessage(messageJSON); // this.onMessage(messageJSON);
@@ -718,6 +721,9 @@ class PeerConnection {
args: args, args: args,
}; };
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
this.send(message); this.send(message);
return promise; return promise;
@@ -732,10 +738,12 @@ class PeerConnection {
console.log.apply(null, log("PeerConnection.onMessage:", e)); console.log.apply(null, log("PeerConnection.onMessage:", e));
} }
this.messageSuperlog && console.log.apply(null, log("->", message.type, message)); this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
let type = message.type; let type = message.type;
if (type === "rpc_response") { if (type === "rpc_response") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let pendingRPC = this.pendingRPCs.get(message.transaction_id); let pendingRPC = this.pendingRPCs.get(message.transaction_id);
if (!pendingRPC) { if (!pendingRPC) {
@@ -747,10 +755,16 @@ class PeerConnection {
if (type === "rpc_call") { if (type === "rpc_call") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let response = this.peerManager.callFromRemote(message.function_name, message.args); let response = this.peerManager.callFromRemote(message.function_name, message.args);
let responseMessage = { type: 'rpc_response', transaction_id: message.transaction_id, response: response }; if (response === undefined) {
return;
}
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
this.send(responseMessage); this.send(responseMessage);
} }

View File

@@ -1,5 +1,60 @@
import { openDatabase, getData, addData, addDataArray, clearData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db";
import { log, logID } from "log";
export class Sync { export class Sync {
static async getFollowing(userID: string): Promise<string[]> { userID: string = "";
userIDsToSync: Set<string> = new Set();
constructor() {
}
setUserID(userID: string) {
this.userIDsToSync = new Set(this.getFollowing(userID));
}
shouldSyncUserID(userID: string) {
return true;
}
// shouldSyncUserID(userID: string) {
// if (app.isHeadless) {
// return true;
// }
// return this.UserIDsTothis.has(userID);
// }
async getKnownUsers() {
let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined);
knownUsers = knownUsers
.filter(userID => this.shouldSyncUserID(userID))
.filter(userID => !this.userBlockList.has(userID))
.filter(async userID => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID.
return knownUsers;
}
userPeers: Map<string, string[]> = new Map();
userBlockList = new Set([
'5d63f0b2-a842-41bf-bf06-e0e4f6369271',
'5f1b85c4-b14c-454c-8df1-2cacc93f8a77',
// 'bba3ad24-9181-4e22-90c8-c265c80873ea'
])
postBlockList = new Set([
'1c71f53c-c467-48e4-bc8c-39005b37c0d5',
'64203497-f77b-40d6-9e76-34d17372e72a',
'243130d8-4a41-471e-8898-5075f1bd7aec',
'e01eff89-5100-4b35-af4c-1c1bcb007dd0',
'194696a2-d850-4bb0-98f7-47416b3d1662',
'f6b21eb1-a0ff-435b-8efc-6a3dd70c0dca',
'dd1d92aa-aa24-4166-a925-94ba072a9048'
]);
getFollowing(userID: string): string[] {
// Rob // Rob
if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') { if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') {
@@ -32,4 +87,29 @@ export class Sync {
return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :)
} }
async getPostIdsForUser(userID: string) {
let postIds = await getAllIds(userID) ?? [];
postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID));
if (postIds.length === 0) {
console.log.apply(null, log(`Net: I know about user ${logID(userID)} but I have 0 posts`));;
return null;
}
return postIds;
}
// async getPostIdsForUserHandler(data: any) {
// let message = data.message;
// let postIds = await getAllIds(message.user_id) ?? [];
// postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID));
// if (postIds.length === 0) {
// console.log.apply(null, log(`Net: I know about user ${logID(message.user_id)} but I have 0 posts, so I'm not sending any to to peer ${logID(data.from)}`));;
// return;
// }
// console.log.apply(null, log(`Net: Sending ${postIds.length} post Ids for user ${logID(message.user_id)} to peer ${logID(data.from)}`));
// let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_post_ids_for_user_response", post_ids: postIds, user_id: message.user_id } }
// this.send(responseMessage);
// }
} }

View File

@@ -740,15 +740,29 @@ class App {
connectURL: string = ""; connectURL: string = "";
firstRun = false; firstRun = false;
peerManager: PeerManager | null = null; peerManager: PeerManager | null = null;
sync = new Sync();
async connect() { async connect() {
this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer);
if (this.peerManager === null) {
throw new Error();
}
this.registerRPCs(); this.registerRPCs();
this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, (event: any) => { this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event: any) => {
if (!this.peerManager) {
throw new Error();
}
console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); console.log.apply(null, log(`[app]: peer connected:${event.peerID}`));
let knownUsers = await this.sync.getKnownUsers();
this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers);
// rpc saying what peers we have // rpc saying what peers we have
}) });
this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => {
console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`));
});
console.log.apply(null, log("*************** before peerManager.connect")); console.log.apply(null, log("*************** before peerManager.connect"));
@@ -760,13 +774,23 @@ class App {
// Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc.
// Basically that live "dandelion" display. // Basically that live "dandelion" display.
this.peerManager.registerRPC('announceUsers', (sendingPeerID: string, userIDs: string[]) => {
console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs));
for (let userID of userIDs) {
console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`));
}
});
this.peerManager.registerRPC('getPeersForUser', (userID: any) => { this.peerManager.registerRPC('getPeersForUser', (userID: any) => {
return [1, 2, 3, 4, 5]; return [1, 2, 3, 4, 5];
}); });
this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => { this.peerManager.registerRPC('getPostIDsForUser', (userID: any) => {
return [1, 2, 3, 4, 5] let postIDs = this.sync.getPostIdsForUser(userID);
return postIDs;
}); });
await this.peerManager.connect(); await this.peerManager.connect();
@@ -777,12 +801,12 @@ class App {
return; return;
} }
let usersToSync = await Sync.getFollowing(this.userID); // let usersToSync = await Sync.getFollowing(this.userID);
for (let userID of usersToSync) { // for (let userID of usersToSync) {
console.log(userID); // console.log(userID);
// this.peerManager.rpc.getPeersForUser(userID); // // this.peerManager.rpc.getPeersForUser(userID);
} // }
// for (let userID in this.sync.usersToSync()) { // for (let userID in this.sync.usersToSync()) {
@@ -808,10 +832,10 @@ class App {
// } // }
let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); // let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID);
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); // console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
} }
@@ -1635,6 +1659,8 @@ class App {
this.userID = this.getUserID(); this.userID = this.getUserID();
this.username = this.getUsername(); this.username = this.getUsername();
this.sync.setUserID(this.userID);
this.connect(); this.connect();
// this.registerRPCs(); // this.registerRPCs();

View File

@@ -555,6 +555,7 @@ class App {
this.connectURL = ""; this.connectURL = "";
this.firstRun = false; this.firstRun = false;
this.peerManager = null; this.peerManager = null;
this.sync = new Sync();
this.time = 0; this.time = 0;
this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal']; this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal'];
this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy']; this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy'];
@@ -573,11 +574,22 @@ class App {
} }
async connect() { async connect() {
this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer); this.peerManager = new PeerManager(this.userID, this.peerID, this.isBootstrapPeer);
if (this.peerManager === null) {
throw new Error();
}
this.registerRPCs(); this.registerRPCs();
this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, (event) => { this.peerManager.addEventListener(PeerEventTypes.PEER_CONNECTED, async (event) => {
if (!this.peerManager) {
throw new Error();
}
console.log.apply(null, log(`[app]: peer connected:${event.peerID}`)); console.log.apply(null, log(`[app]: peer connected:${event.peerID}`));
let knownUsers = await this.sync.getKnownUsers();
this.peerManager.rpc.announceUsers(event.peerID, this.peerID, knownUsers);
// rpc saying what peers we have // rpc saying what peers we have
}); });
this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event) => {
console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`));
});
console.log.apply(null, log("*************** before peerManager.connect")); console.log.apply(null, log("*************** before peerManager.connect"));
// We use promises here to only return from this call once we're connected to the boostrap peer // We use promises here to only return from this call once we're connected to the boostrap peer
// and the datachannel is open. // and the datachannel is open.
@@ -585,11 +597,18 @@ class App {
// we could return progress information as we connect and have the app subscribe to that? // we could return progress information as we connect and have the app subscribe to that?
// Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc. // Would be lovely to show a little display of peers connecting, whether you're connected directly to a friend's peer etc.
// Basically that live "dandelion" display. // Basically that live "dandelion" display.
this.peerManager.registerRPC('announceUsers', (sendingPeerID, userIDs) => {
console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs));
for (let userID of userIDs) {
console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`));
}
});
this.peerManager.registerRPC('getPeersForUser', (userID) => { this.peerManager.registerRPC('getPeersForUser', (userID) => {
return [1, 2, 3, 4, 5]; return [1, 2, 3, 4, 5];
}); });
this.peerManager.registerRPC('getPostIDsForUser', (userID) => { this.peerManager.registerRPC('getPostIDsForUser', (userID) => {
return [1, 2, 3, 4, 5]; let postIDs = this.sync.getPostIdsForUser(userID);
return postIDs;
}); });
await this.peerManager.connect(); await this.peerManager.connect();
console.log.apply(null, log("*************** after peerManager.connect")); console.log.apply(null, log("*************** after peerManager.connect"));
@@ -597,11 +616,11 @@ class App {
if (this.isBootstrapPeer) { if (this.isBootstrapPeer) {
return; return;
} }
let usersToSync = await Sync.getFollowing(this.userID); // let usersToSync = await Sync.getFollowing(this.userID);
for (let userID of usersToSync) { // for (let userID of usersToSync) {
console.log(userID); // console.log(userID);
// this.peerManager.rpc.getPeersForUser(userID); // // this.peerManager.rpc.getPeersForUser(userID);
} // }
// for (let userID in this.sync.usersToSync()) { // for (let userID in this.sync.usersToSync()) {
// let peers = await this.peerManager.rpc.getPeersForUser(userID); // let peers = await this.peerManager.rpc.getPeersForUser(userID);
// for (let peer in peers) { // for (let peer in peers) {
@@ -616,8 +635,8 @@ class App {
// this.render(); // this.render();
// } // }
// } // }
let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID); // let postIDs = await this.peerManager.rpc.getPostIDsForUser(this.peerManager.bootstrapPeerID, this.userID);
console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs)); // console.log.apply(null, log("peerManager.rpc.getPostIDsForUser", postIDs));
} }
getPreferentialUserID() { getPreferentialUserID() {
return this.router.userID.length !== 0 ? this.router.userID : this.userID; return this.router.userID.length !== 0 ? this.router.userID : this.userID;
@@ -1238,6 +1257,7 @@ class App {
this.peername = this.getPeername(); this.peername = this.getPeername();
this.userID = this.getUserID(); this.userID = this.getUserID();
this.username = this.getUsername(); this.username = this.getUsername();
this.sync.setUserID(this.userID);
this.connect(); this.connect();
// this.registerRPCs(); // this.registerRPCs();
// this.testPeerManager(); // this.testPeerManager();