diff --git a/src/App.ts b/src/App.ts index a127b4b..4904313 100644 --- a/src/App.ts +++ b/src/App.ts @@ -46,6 +46,14 @@ interface StoragePost { data: Post; } + + +interface SyncItem { + peerID: string; + postIDs: string[]; + +} + export class App { username: string = ''; peername: string = ''; @@ -67,50 +75,59 @@ export class App { peerManager: PeerManager | null = null; sync: Sync = new Sync(); renderTimer: number = 0; - postSyncQueue: any[] = []; - postSyncPromise: any = null; + syncQueues: Map = new Map(); + syncing: Set = new Set(); - async syncPostsInQueue() { + async processSyncQueue(userID: string) { - if (this.postSyncPromise) { + if (this.syncing.has(userID)) { return; } - while (this.postSyncQueue.length !== 0) { + let syncQueue = this.syncQueues.get(userID) as SyncItem[]; - let queueItem = this.postSyncQueue.pop(); + while (syncQueue.length !== 0) { + this.syncing.add(userID); + let syncItem = syncQueue.pop(); - let userID = queueItem.userID; - let peerID = queueItem.peerID; - let postIDs = queueItem.postIDs; + if (!syncItem) { + throw new Error(); + } - new Promise(async (resolve, reject) => { - let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs); + let peerID = syncItem?.peerID; + let postIDs = syncItem?.postIDs; + let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs); - if (neededPostIDs.length > 0) { - console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`)); - let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs); - // console.log(neededPosts); - - } - else { - console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(sendingPeerID)}`)); - } - - }) + if (neededPostIDs.length > 0) { + console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`)); + let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs); + // console.log(neededPosts); + } + else { + console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`)); + } } - - - - + this.syncing.delete(userID); } addPostIDsToSyncQueue(userID: string, peerID: string, postIDs: string[]) { - this.postSyncQueue.push({ userID: userID, peerID: peerID, postIDs: postIDs }); + + let syncQueue = this.syncQueues.get(userID); + + if (!syncQueue) { + let newArray: SyncItem[] = []; + this.syncQueues.set(userID, newArray); + syncQueue = newArray; + } + + syncQueue.push({ peerID: peerID, postIDs: postIDs }); + + this.processSyncQueue(userID); } + // To avoid reuesting the same posts from multiple peers: // 1. Add incoming IDs to queue // 2. Call a function that tests IDs and then gets posts. @@ -128,18 +145,14 @@ export class App { if (!(this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID))) { console.log.apply(null, log(`[app] announceUser_rpc_response skipping user[${logID(userID)}] from[${logID(sendingPeerID)}]`)); - continue; + continue; } console.log.apply(null, log(`[app] calling getPostIDsForUser for user [${logID(userID)}] on peer [${logID(sendingPeerID)}]`)); let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); console.log.apply(null, log(`[app] Got (${postIDs.length}) post IDs for user [${logID(userID)}] from peer [${logID(sendingPeerID)}]`)); - - this.addPostIDsToSyncQueue(userID, sendingPeerID, postIDs); - - } } @@ -167,9 +180,7 @@ export class App { }); this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => { - let peerID = event.peerID; console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`)); - this.sync.deleteUserPeer(peerID); }); @@ -206,15 +217,17 @@ export class App { for (let post of posts) { console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.post_id)}] to [${logID(requestingPeerID)}]`, userID, post.author, post.text)); - this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post); + await this.peerManager?.rpc.sendPostForUser(requestingPeerID, this.peerID, userID, post); } + + return true; // return posts; // return postIDs; }); - this.peerManager.registerRPC('sendPostForUser', async (userID: string, post: Post) => { - console.log.apply(null, log(`[app] sendPostForUser got post ${logID(userID)} author ${post.author} text ${post.text}`)); + this.peerManager.registerRPC('sendPostForUser', async (sendingPeerID: string, userID: string, post: Post) => { + console.log.apply(null, log(`[app] sendPostForUser got post[${logID(post.post_id)}] from peer[${logID(sendingPeerID)}] for user[${logID(userID)}] author[${post.author}] text[${post.text}]`)); // if (post.text === "image...") { // debugger; // } @@ -225,7 +238,9 @@ export class App { clearTimeout(this.renderTimer); } - this.renderTimer = setTimeout(() => { this.render() }, 200); + this.renderTimer = setTimeout(() => { this.render() }, 1000); + + return true; // } }); @@ -243,7 +258,7 @@ export class App { // for (let userID of usersToSync) { // console.log(userID); // // this.peerManager.rpc.getPeersForUser(userID); - // } + // } // for (let userID in this.sync.usersToSync()) { @@ -794,7 +809,7 @@ export class App { return document.getElementById(elementName) as HTMLDivElement; } - initButtons(userID: string, posts: StoragePost[], registration: ServiceWorkerRegistration | undefined) { + initButtons(userID: string, posts: StoragePost[]) { // let font1Button = document.getElementById("button_font1") as HTMLButtonElement; // let font2Button = document.getElementById("button_font2") as HTMLButtonElement; // let importTweetsButton = document.getElementById("import_tweets") as HTMLButtonElement; @@ -1067,13 +1082,7 @@ export class App { this.limitPosts = parseInt(limitPostsParam); } - this.peerID = this.getPeerID(); - this.peername = this.getPeername(); - this.userID = this.getUserID(); - this.username = this.getUsername(); - - this.sync.setUserID(this.userID) - this.sync.setArchive(this.isArchivePeer); + this.getRoute(); if (this.router.route === App.Route.CONNECT) { @@ -1082,6 +1091,15 @@ export class App { localStorage.removeItem("dandelion_username"); } + + this.peerID = this.getPeerID(); + this.peername = this.getPeername(); + this.userID = this.getUserID(); + this.username = this.getUsername(); + + this.sync.setUserID(this.userID) + this.sync.setArchive(this.isArchivePeer); + this.connect(); await this.initDB(); @@ -1124,17 +1142,19 @@ export class App { // let storageUsed = (await navigator?.storage?.estimate())?.usage/1024/1024 // } - // if (urlParams.get("sw") === "true") { let registration; - registration = await this.registerServiceWorker(); - // } + let shouldRegisterServiceWorker = !(this.isBootstrapPeer || this.isArchivePeer || this.isHeadless); + + if (shouldRegisterServiceWorker) { + registration = await this.registerServiceWorker(); + } document.getElementById('username')!.innerText = `${this.username}`; document.getElementById('peername')!.innerText = `peername:${this.peername}`; document.getElementById('user_id')!.innerText = `user_id:${this.userID}`; document.getElementById('peer_id')!.innerText = `peer_id:${this.peerID}`; - this.initButtons(this.userID, this.posts, registration); + this.initButtons(this.userID, this.posts); diff --git a/src/PeerManager.ts b/src/PeerManager.ts index d92db1f..0169c9d 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -714,28 +714,29 @@ class PeerConnection { while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) { - await new Promise((resolve, reject) => { setTimeout(()=> resolve(), 1000); - }) - } + await new Promise((resolve, reject) => { + setTimeout(() => resolve(), 1000); + }) + } let messageJSON = JSON.stringify(message); -this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); -if (messageJSON.length > this.chunkSize) { - this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length)); - this.sendLongMessage(messageJSON); - return; -} + if (messageJSON.length > this.chunkSize) { + this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length)); + this.sendLongMessage(messageJSON); + return; + } -try { - this.dataChannel?.send(messageJSON); -} catch (e) { - console.log.apply(null, log(e)); + try { + this.dataChannel?.send(messageJSON); + } catch (e) { + console.log.apply(null, log(e)); -} + } // this.onMessage(messageJSON); } @@ -743,140 +744,140 @@ try { // Get a polyfill for browsers that don't have this API async hashMessage(message: string) { - let msgUint8 = new TextEncoder().encode(message); - const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8); - const hashArray = Array.from(new Uint8Array(hashBuffer)); - const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(''); - return hashHex; -} + let msgUint8 = new TextEncoder().encode(message); + const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(''); + return hashHex; + } async sendLongMessage(message: string) { - // message = JSON.parse(message); - let chunkSize = this.chunkSize / 2; - // let chunkSize = 1024; - let chunks = Math.ceil(message!.length / chunkSize); - let messageID = generateID(); + // message = JSON.parse(message); + let chunkSize = this.chunkSize / 2; + // let chunkSize = 1024; + let chunks = Math.ceil(message!.length / chunkSize); + let messageID = generateID(); - let hash = await this.hashMessage(message); + let hash = await this.hashMessage(message); - for (let i = 0; i < chunks; i++) { - let offset = i * chunkSize; - let chunk = message?.substring(offset, offset + chunkSize); - // this.send(message?.substring(offset, offset + chunkSize-1)); - // console.log("[chunk]", chunk); - let chunkHash = await this.hashMessage(chunk); - this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`)); + for (let i = 0; i < chunks; i++) { + let offset = i * chunkSize; + let chunk = message?.substring(offset, offset + chunkSize); + // this.send(message?.substring(offset, offset + chunkSize-1)); + // console.log("[chunk]", chunk); + let chunkHash = await this.hashMessage(chunk); + this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`)); - let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash }; - await this.send(netMessage); + let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash }; + await this.send(netMessage); + } } -} -call(functionName: string, args: any) { - let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer - // Think about a timeout here to auto reject it after a while. - let promise = new Promise((resolve, reject) => { - this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); - // setTimeout(() => reject("bad"), 1000); - }); + call(functionName: string, args: any) { + let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer + // Think about a timeout here to auto reject it after a while. + let promise = new Promise((resolve, reject) => { + this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); + // setTimeout(() => reject("bad"), 1000); + }); - let message = { - type: "rpc_call", - transaction_id: transactionID, - function_name: functionName, - args: args, - }; + let message = { + type: "rpc_call", + transaction_id: transactionID, + function_name: functionName, + args: args, + }; - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); - this.send(message); + this.send(message); - return promise; -} + return promise; + } async onMessage(messageJSON: any) { - let message: any = {}; - try { - message = JSON.parse(messageJSON); - } catch (e) { - console.log.apply(null, log("PeerConnection.onMessage:", e)); - } - - this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); - let type = message.type; - - if (type === "rpc_response") { - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); - - let pendingRPC = this.pendingRPCs.get(message.transaction_id); - - if (!pendingRPC) { - throw new Error(); + let message: any = {}; + try { + message = JSON.parse(messageJSON); + } catch (e) { + console.log.apply(null, log("PeerConnection.onMessage:", e)); } - pendingRPC.resolve(message.response); - } + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); + let type = message.type; - if (type === "rpc_call") { + if (type === "rpc_response") { + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + let pendingRPC = this.pendingRPCs.get(message.transaction_id); - - let response = await this.peerManager.callFromRemote(message.function_name, message.args); - - this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); - - - - if (response === undefined) { - return; - } - - let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; - this.send(responseMessage); - - } - - if (type === "initial_peers") { - for (let peerID of message.peers) { - console.log(log("Connecting to initial peer ", peerID)); - this.peerManager.connectToPeer(peerID); - } - } - - if (type === "chunk") { - let messageID = message.message_id; - if (!this.longMessages.has(messageID)) { - this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash }); - } - - let longMessage = this.longMessages.get(messageID); - if (!longMessage) { - return; - } - - let chunkHash = await this.hashMessage(message.chunk_hash); - - longMessage.messageChunks.push(message.chunk); - - this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`)); - if (message.chunk_index === longMessage.totalChunks - 1) { - let completeMessage = longMessage.messageChunks.join(''); - let hash = await this.hashMessage(completeMessage); - this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`)); - if (hash !== longMessage.hash) { - throw new Error("[chunk] long message hashes don't match."); + if (!pendingRPC) { + throw new Error(); } - this.onMessage(completeMessage); - this.longMessages.delete(messageID); - } - } - // this.peerManger.onMessage(this.remotePeerID, message); -} + pendingRPC.resolve(message.response); + } + + if (type === "rpc_call") { + + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + + let response = await this.peerManager.callFromRemote(message.function_name, message.args); + + this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); + + + + if (response === undefined) { + return; + } + + let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; + this.send(responseMessage); + + } + + if (type === "initial_peers") { + for (let peerID of message.peers) { + console.log(log("Connecting to initial peer ", peerID)); + this.peerManager.connectToPeer(peerID); + } + } + + if (type === "chunk") { + let messageID = message.message_id; + if (!this.longMessages.has(messageID)) { + this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash }); + } + + let longMessage = this.longMessages.get(messageID); + if (!longMessage) { + return; + } + + let chunkHash = await this.hashMessage(message.chunk_hash); + + longMessage.messageChunks.push(message.chunk); + + this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`)); + if (message.chunk_index === longMessage.totalChunks - 1) { + let completeMessage = longMessage.messageChunks.join(''); + let hash = await this.hashMessage(completeMessage); + this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`)); + if (hash !== longMessage.hash) { + throw new Error("[chunk] long message hashes don't match."); + } + this.onMessage(completeMessage); + this.longMessages.delete(messageID); + } + } + + // this.peerManger.onMessage(this.remotePeerID, message); + } } diff --git a/src/Sync.ts b/src/Sync.ts index 08cfe40..5f864bd 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -181,12 +181,12 @@ export class Sync { async checkPostIds(userID: string, peerID: string, postIDs: string[]) { let startTime = performance.now(); let neededPostIds = await checkPostIds(userID, postIDs); - console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime).toFixed(2)}ms`)); + this.syncSuperlog && console.log.apply(null, log(`[sync] ID Check for user ${logID(userID)} with IDs from peer[${logID(peerID)}] took ${(performance.now() - startTime).toFixed(2)}ms`)); if (neededPostIds.length > 0) { - console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));; + this.syncSuperlog && console.log.apply(null, log(`[sync] Need posts (${neededPostIds.length}) for user[${logID(userID)}] from peer[${logID(peerID)}]`));; } else { - console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));; + this.syncSuperlog && console.log.apply(null, log(`[sync] Don't need any posts for user[${logID(userID)}] from peer[${logID(peerID)}]`));; }