Implement per-user message queues to prevent duplicate post syncing. Correclty initialize from a user or connect URL. Dont register the service worker when we\'re headless or an archive peer. Wrap sync logging with superlog

This commit is contained in:
2025-05-27 00:01:52 -07:00
parent 818451397f
commit 1eb86a2c7a
3 changed files with 200 additions and 179 deletions

View File

@@ -46,6 +46,14 @@ interface StoragePost {
data: Post;
}
interface SyncItem {
peerID: string;
postIDs: string[];
}
export class App {
username: string = '';
peername: string = '';
@@ -67,50 +75,59 @@ export class App {
peerManager: PeerManager | null = null;
sync: Sync = new Sync();
renderTimer: number = 0;
postSyncQueue: any[] = [];
postSyncPromise: any = null;
syncQueues: Map<string, SyncItem[]> = new Map();
syncing: Set<string> = new Set();
async syncPostsInQueue() {
async processSyncQueue(userID: string) {
if (this.postSyncPromise) {
if (this.syncing.has(userID)) {
return;
}
while (this.postSyncQueue.length !== 0) {
let syncQueue = this.syncQueues.get(userID) as SyncItem[];
let queueItem = this.postSyncQueue.pop();
while (syncQueue.length !== 0) {
this.syncing.add(userID);
let syncItem = syncQueue.pop();
let userID = queueItem.userID;
let peerID = queueItem.peerID;
let postIDs = queueItem.postIDs;
if (!syncItem) {
throw new Error();
}
new Promise(async (resolve, reject) => {
let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs);
let peerID = syncItem?.peerID;
let postIDs = syncItem?.postIDs;
let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs);
if (neededPostIDs.length > 0) {
console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`));
let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs);
// console.log(neededPosts);
}
else {
console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(sendingPeerID)}`));
}
})
if (neededPostIDs.length > 0) {
console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`));
let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs);
// console.log(neededPosts);
}
else {
console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));
}
}
this.syncing.delete(userID);
}
addPostIDsToSyncQueue(userID: string, peerID: string, postIDs: string[]) {
this.postSyncQueue.push({ userID: userID, peerID: peerID, postIDs: postIDs });
let syncQueue = this.syncQueues.get(userID);
if (!syncQueue) {
let newArray: SyncItem[] = [];
this.syncQueues.set(userID, newArray);
syncQueue = newArray;
}
syncQueue.push({ peerID: peerID, postIDs: postIDs });
this.processSyncQueue(userID);
}
// To avoid reuesting the same posts from multiple peers:
// 1. Add incoming IDs to queue
// 2. Call a function that tests IDs and then gets posts.
@@ -128,18 +145,14 @@ export class App {
if (!(this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID))) {
console.log.apply(null, log(`[app] announceUser_rpc_response skipping user[${logID(userID)}] from[${logID(sendingPeerID)}]`));
continue;
continue;
}
console.log.apply(null, log(`[app] calling getPostIDsForUser for user [${logID(userID)}] on peer [${logID(sendingPeerID)}]`));
let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID);
console.log.apply(null, log(`[app] Got (${postIDs.length}) post IDs for user [${logID(userID)}] from peer [${logID(sendingPeerID)}]`));
this.addPostIDsToSyncQueue(userID, sendingPeerID, postIDs);
}
}
@@ -167,9 +180,7 @@ export class App {
});
this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => {
let peerID = event.peerID;
console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`));
this.sync.deleteUserPeer(peerID);
});
@@ -206,15 +217,17 @@ export class App {
for (let post of posts) {
console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.post_id)}] to [${logID(requestingPeerID)}]`, userID, post.author, post.text));
this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post);
await this.peerManager?.rpc.sendPostForUser(requestingPeerID, this.peerID, userID, post);
}
return true;
// return posts;
// return postIDs;
});
this.peerManager.registerRPC('sendPostForUser', async (userID: string, post: Post) => {
console.log.apply(null, log(`[app] sendPostForUser got post ${logID(userID)} author ${post.author} text ${post.text}`));
this.peerManager.registerRPC('sendPostForUser', async (sendingPeerID: string, userID: string, post: Post) => {
console.log.apply(null, log(`[app] sendPostForUser got post[${logID(post.post_id)}] from peer[${logID(sendingPeerID)}] for user[${logID(userID)}] author[${post.author}] text[${post.text}]`));
// if (post.text === "image...") {
// debugger;
// }
@@ -225,7 +238,9 @@ export class App {
clearTimeout(this.renderTimer);
}
this.renderTimer = setTimeout(() => { this.render() }, 200);
this.renderTimer = setTimeout(() => { this.render() }, 1000);
return true;
// }
});
@@ -243,7 +258,7 @@ export class App {
// for (let userID of usersToSync) {
// console.log(userID);
// // this.peerManager.rpc.getPeersForUser(userID);
// }
// }
// for (let userID in this.sync.usersToSync()) {
@@ -794,7 +809,7 @@ export class App {
return document.getElementById(elementName) as HTMLDivElement;
}
initButtons(userID: string, posts: StoragePost[], registration: ServiceWorkerRegistration | undefined) {
initButtons(userID: string, posts: StoragePost[]) {
// let font1Button = document.getElementById("button_font1") as HTMLButtonElement;
// let font2Button = document.getElementById("button_font2") as HTMLButtonElement;
// let importTweetsButton = document.getElementById("import_tweets") as HTMLButtonElement;
@@ -1067,13 +1082,7 @@ export class App {
this.limitPosts = parseInt(limitPostsParam);
}
this.peerID = this.getPeerID();
this.peername = this.getPeername();
this.userID = this.getUserID();
this.username = this.getUsername();
this.sync.setUserID(this.userID)
this.sync.setArchive(this.isArchivePeer);
this.getRoute();
if (this.router.route === App.Route.CONNECT) {
@@ -1082,6 +1091,15 @@ export class App {
localStorage.removeItem("dandelion_username");
}
this.peerID = this.getPeerID();
this.peername = this.getPeername();
this.userID = this.getUserID();
this.username = this.getUsername();
this.sync.setUserID(this.userID)
this.sync.setArchive(this.isArchivePeer);
this.connect();
await this.initDB();
@@ -1124,17 +1142,19 @@ export class App {
// let storageUsed = (await navigator?.storage?.estimate())?.usage/1024/1024
// }
// if (urlParams.get("sw") === "true") {
let registration;
registration = await this.registerServiceWorker();
// }
let shouldRegisterServiceWorker = !(this.isBootstrapPeer || this.isArchivePeer || this.isHeadless);
if (shouldRegisterServiceWorker) {
registration = await this.registerServiceWorker();
}
document.getElementById('username')!.innerText = `${this.username}`;
document.getElementById('peername')!.innerText = `peername:${this.peername}`;
document.getElementById('user_id')!.innerText = `user_id:${this.userID}`;
document.getElementById('peer_id')!.innerText = `peer_id:${this.peerID}`;
this.initButtons(this.userID, this.posts, registration);
this.initButtons(this.userID, this.posts);