Implement per-user message queues to prevent duplicate post syncing. Correclty initialize from a user or connect URL. Dont register the service worker when we\'re headless or an archive peer. Wrap sync logging with superlog
This commit is contained in:
122
src/App.ts
122
src/App.ts
@@ -46,6 +46,14 @@ interface StoragePost {
|
|||||||
data: Post;
|
data: Post;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
interface SyncItem {
|
||||||
|
peerID: string;
|
||||||
|
postIDs: string[];
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
export class App {
|
export class App {
|
||||||
username: string = '';
|
username: string = '';
|
||||||
peername: string = '';
|
peername: string = '';
|
||||||
@@ -67,50 +75,59 @@ export class App {
|
|||||||
peerManager: PeerManager | null = null;
|
peerManager: PeerManager | null = null;
|
||||||
sync: Sync = new Sync();
|
sync: Sync = new Sync();
|
||||||
renderTimer: number = 0;
|
renderTimer: number = 0;
|
||||||
postSyncQueue: any[] = [];
|
syncQueues: Map<string, SyncItem[]> = new Map();
|
||||||
postSyncPromise: any = null;
|
syncing: Set<string> = new Set();
|
||||||
|
|
||||||
async syncPostsInQueue() {
|
async processSyncQueue(userID: string) {
|
||||||
|
|
||||||
if (this.postSyncPromise) {
|
if (this.syncing.has(userID)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
while (this.postSyncQueue.length !== 0) {
|
let syncQueue = this.syncQueues.get(userID) as SyncItem[];
|
||||||
|
|
||||||
let queueItem = this.postSyncQueue.pop();
|
while (syncQueue.length !== 0) {
|
||||||
|
this.syncing.add(userID);
|
||||||
|
let syncItem = syncQueue.pop();
|
||||||
|
|
||||||
let userID = queueItem.userID;
|
if (!syncItem) {
|
||||||
let peerID = queueItem.peerID;
|
throw new Error();
|
||||||
let postIDs = queueItem.postIDs;
|
}
|
||||||
|
|
||||||
new Promise(async (resolve, reject) => {
|
let peerID = syncItem?.peerID;
|
||||||
let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs);
|
let postIDs = syncItem?.postIDs;
|
||||||
|
let neededPostIDs = await this.sync.checkPostIds(userID, peerID, postIDs);
|
||||||
|
|
||||||
if (neededPostIDs.length > 0) {
|
if (neededPostIDs.length > 0) {
|
||||||
console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`));
|
console.log.apply(null, log(`[app] Need (${neededPostIDs.length}) posts for user ${logID(userID)} from peer ${logID(peerID)}`));
|
||||||
let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs);
|
let neededPosts = await this.peerManager?.rpc.getPostsForUser(peerID, this.peerID, userID, neededPostIDs);
|
||||||
// console.log(neededPosts);
|
// console.log(neededPosts);
|
||||||
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(sendingPeerID)}`));
|
|
||||||
}
|
|
||||||
|
|
||||||
})
|
|
||||||
|
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
console.log.apply(null, log(`[app] Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.syncing.delete(userID);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
addPostIDsToSyncQueue(userID: string, peerID: string, postIDs: string[]) {
|
addPostIDsToSyncQueue(userID: string, peerID: string, postIDs: string[]) {
|
||||||
this.postSyncQueue.push({ userID: userID, peerID: peerID, postIDs: postIDs });
|
|
||||||
|
let syncQueue = this.syncQueues.get(userID);
|
||||||
|
|
||||||
|
if (!syncQueue) {
|
||||||
|
let newArray: SyncItem[] = [];
|
||||||
|
this.syncQueues.set(userID, newArray);
|
||||||
|
syncQueue = newArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
syncQueue.push({ peerID: peerID, postIDs: postIDs });
|
||||||
|
|
||||||
|
this.processSyncQueue(userID);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// To avoid reuesting the same posts from multiple peers:
|
// To avoid reuesting the same posts from multiple peers:
|
||||||
// 1. Add incoming IDs to queue
|
// 1. Add incoming IDs to queue
|
||||||
// 2. Call a function that tests IDs and then gets posts.
|
// 2. Call a function that tests IDs and then gets posts.
|
||||||
@@ -128,18 +145,14 @@ export class App {
|
|||||||
if (!(this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID))) {
|
if (!(this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID))) {
|
||||||
console.log.apply(null, log(`[app] announceUser_rpc_response skipping user[${logID(userID)}] from[${logID(sendingPeerID)}]`));
|
console.log.apply(null, log(`[app] announceUser_rpc_response skipping user[${logID(userID)}] from[${logID(sendingPeerID)}]`));
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
console.log.apply(null, log(`[app] calling getPostIDsForUser for user [${logID(userID)}] on peer [${logID(sendingPeerID)}]`));
|
console.log.apply(null, log(`[app] calling getPostIDsForUser for user [${logID(userID)}] on peer [${logID(sendingPeerID)}]`));
|
||||||
|
|
||||||
let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID);
|
let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID);
|
||||||
console.log.apply(null, log(`[app] Got (${postIDs.length}) post IDs for user [${logID(userID)}] from peer [${logID(sendingPeerID)}]`));
|
console.log.apply(null, log(`[app] Got (${postIDs.length}) post IDs for user [${logID(userID)}] from peer [${logID(sendingPeerID)}]`));
|
||||||
|
|
||||||
|
|
||||||
this.addPostIDsToSyncQueue(userID, sendingPeerID, postIDs);
|
this.addPostIDsToSyncQueue(userID, sendingPeerID, postIDs);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -167,9 +180,7 @@ export class App {
|
|||||||
});
|
});
|
||||||
|
|
||||||
this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => {
|
this.peerManager.addEventListener(PeerEventTypes.PEER_DISCONNECTED, async (event: any) => {
|
||||||
let peerID = event.peerID;
|
|
||||||
console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`));
|
console.log.apply(null, log(`[app]: peer disconnected:${event.peerID}`));
|
||||||
this.sync.deleteUserPeer(peerID);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
@@ -206,15 +217,17 @@ export class App {
|
|||||||
for (let post of posts) {
|
for (let post of posts) {
|
||||||
console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.post_id)}] to [${logID(requestingPeerID)}]`, userID, post.author, post.text));
|
console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.post_id)}] to [${logID(requestingPeerID)}]`, userID, post.author, post.text));
|
||||||
|
|
||||||
this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post);
|
await this.peerManager?.rpc.sendPostForUser(requestingPeerID, this.peerID, userID, post);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
// return posts;
|
// return posts;
|
||||||
|
|
||||||
// return postIDs;
|
// return postIDs;
|
||||||
});
|
});
|
||||||
|
|
||||||
this.peerManager.registerRPC('sendPostForUser', async (userID: string, post: Post) => {
|
this.peerManager.registerRPC('sendPostForUser', async (sendingPeerID: string, userID: string, post: Post) => {
|
||||||
console.log.apply(null, log(`[app] sendPostForUser got post ${logID(userID)} author ${post.author} text ${post.text}`));
|
console.log.apply(null, log(`[app] sendPostForUser got post[${logID(post.post_id)}] from peer[${logID(sendingPeerID)}] for user[${logID(userID)}] author[${post.author}] text[${post.text}]`));
|
||||||
// if (post.text === "image...") {
|
// if (post.text === "image...") {
|
||||||
// debugger;
|
// debugger;
|
||||||
// }
|
// }
|
||||||
@@ -225,7 +238,9 @@ export class App {
|
|||||||
clearTimeout(this.renderTimer);
|
clearTimeout(this.renderTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.renderTimer = setTimeout(() => { this.render() }, 200);
|
this.renderTimer = setTimeout(() => { this.render() }, 1000);
|
||||||
|
|
||||||
|
return true;
|
||||||
// }
|
// }
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -243,7 +258,7 @@ export class App {
|
|||||||
// for (let userID of usersToSync) {
|
// for (let userID of usersToSync) {
|
||||||
// console.log(userID);
|
// console.log(userID);
|
||||||
// // this.peerManager.rpc.getPeersForUser(userID);
|
// // this.peerManager.rpc.getPeersForUser(userID);
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
|
||||||
// for (let userID in this.sync.usersToSync()) {
|
// for (let userID in this.sync.usersToSync()) {
|
||||||
@@ -794,7 +809,7 @@ export class App {
|
|||||||
return document.getElementById(elementName) as HTMLDivElement;
|
return document.getElementById(elementName) as HTMLDivElement;
|
||||||
}
|
}
|
||||||
|
|
||||||
initButtons(userID: string, posts: StoragePost[], registration: ServiceWorkerRegistration | undefined) {
|
initButtons(userID: string, posts: StoragePost[]) {
|
||||||
// let font1Button = document.getElementById("button_font1") as HTMLButtonElement;
|
// let font1Button = document.getElementById("button_font1") as HTMLButtonElement;
|
||||||
// let font2Button = document.getElementById("button_font2") as HTMLButtonElement;
|
// let font2Button = document.getElementById("button_font2") as HTMLButtonElement;
|
||||||
// let importTweetsButton = document.getElementById("import_tweets") as HTMLButtonElement;
|
// let importTweetsButton = document.getElementById("import_tweets") as HTMLButtonElement;
|
||||||
@@ -1067,13 +1082,7 @@ export class App {
|
|||||||
this.limitPosts = parseInt(limitPostsParam);
|
this.limitPosts = parseInt(limitPostsParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.peerID = this.getPeerID();
|
|
||||||
this.peername = this.getPeername();
|
|
||||||
this.userID = this.getUserID();
|
|
||||||
this.username = this.getUsername();
|
|
||||||
|
|
||||||
this.sync.setUserID(this.userID)
|
|
||||||
this.sync.setArchive(this.isArchivePeer);
|
|
||||||
|
|
||||||
this.getRoute();
|
this.getRoute();
|
||||||
if (this.router.route === App.Route.CONNECT) {
|
if (this.router.route === App.Route.CONNECT) {
|
||||||
@@ -1082,6 +1091,15 @@ export class App {
|
|||||||
localStorage.removeItem("dandelion_username");
|
localStorage.removeItem("dandelion_username");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
this.peerID = this.getPeerID();
|
||||||
|
this.peername = this.getPeername();
|
||||||
|
this.userID = this.getUserID();
|
||||||
|
this.username = this.getUsername();
|
||||||
|
|
||||||
|
this.sync.setUserID(this.userID)
|
||||||
|
this.sync.setArchive(this.isArchivePeer);
|
||||||
|
|
||||||
this.connect();
|
this.connect();
|
||||||
|
|
||||||
await this.initDB();
|
await this.initDB();
|
||||||
@@ -1124,17 +1142,19 @@ export class App {
|
|||||||
// let storageUsed = (await navigator?.storage?.estimate())?.usage/1024/1024
|
// let storageUsed = (await navigator?.storage?.estimate())?.usage/1024/1024
|
||||||
// }
|
// }
|
||||||
|
|
||||||
// if (urlParams.get("sw") === "true") {
|
|
||||||
let registration;
|
let registration;
|
||||||
registration = await this.registerServiceWorker();
|
let shouldRegisterServiceWorker = !(this.isBootstrapPeer || this.isArchivePeer || this.isHeadless);
|
||||||
// }
|
|
||||||
|
if (shouldRegisterServiceWorker) {
|
||||||
|
registration = await this.registerServiceWorker();
|
||||||
|
}
|
||||||
|
|
||||||
document.getElementById('username')!.innerText = `${this.username}`;
|
document.getElementById('username')!.innerText = `${this.username}`;
|
||||||
document.getElementById('peername')!.innerText = `peername:${this.peername}`;
|
document.getElementById('peername')!.innerText = `peername:${this.peername}`;
|
||||||
document.getElementById('user_id')!.innerText = `user_id:${this.userID}`;
|
document.getElementById('user_id')!.innerText = `user_id:${this.userID}`;
|
||||||
document.getElementById('peer_id')!.innerText = `peer_id:${this.peerID}`;
|
document.getElementById('peer_id')!.innerText = `peer_id:${this.peerID}`;
|
||||||
|
|
||||||
this.initButtons(this.userID, this.posts, registration);
|
this.initButtons(this.userID, this.posts);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -714,28 +714,29 @@ class PeerConnection {
|
|||||||
|
|
||||||
|
|
||||||
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
|
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
|
||||||
await new Promise<void>((resolve, reject) => { setTimeout(()=> resolve(), 1000);
|
await new Promise<void>((resolve, reject) => {
|
||||||
})
|
setTimeout(() => resolve(), 1000);
|
||||||
}
|
})
|
||||||
|
}
|
||||||
|
|
||||||
let messageJSON = JSON.stringify(message);
|
let messageJSON = JSON.stringify(message);
|
||||||
|
|
||||||
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
|
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
|
||||||
|
|
||||||
if (messageJSON.length > this.chunkSize) {
|
if (messageJSON.length > this.chunkSize) {
|
||||||
this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length));
|
this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length));
|
||||||
this.sendLongMessage(messageJSON);
|
this.sendLongMessage(messageJSON);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
this.dataChannel?.send(messageJSON);
|
this.dataChannel?.send(messageJSON);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.log.apply(null, log(e));
|
console.log.apply(null, log(e));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this.onMessage(messageJSON);
|
// this.onMessage(messageJSON);
|
||||||
}
|
}
|
||||||
@@ -743,140 +744,140 @@ try {
|
|||||||
|
|
||||||
// Get a polyfill for browsers that don't have this API
|
// Get a polyfill for browsers that don't have this API
|
||||||
async hashMessage(message: string) {
|
async hashMessage(message: string) {
|
||||||
let msgUint8 = new TextEncoder().encode(message);
|
let msgUint8 = new TextEncoder().encode(message);
|
||||||
const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8);
|
const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8);
|
||||||
const hashArray = Array.from(new Uint8Array(hashBuffer));
|
const hashArray = Array.from(new Uint8Array(hashBuffer));
|
||||||
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join('');
|
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join('');
|
||||||
return hashHex;
|
return hashHex;
|
||||||
}
|
}
|
||||||
|
|
||||||
async sendLongMessage(message: string) {
|
async sendLongMessage(message: string) {
|
||||||
// message = JSON.parse(message);
|
// message = JSON.parse(message);
|
||||||
let chunkSize = this.chunkSize / 2;
|
let chunkSize = this.chunkSize / 2;
|
||||||
// let chunkSize = 1024;
|
// let chunkSize = 1024;
|
||||||
let chunks = Math.ceil(message!.length / chunkSize);
|
let chunks = Math.ceil(message!.length / chunkSize);
|
||||||
let messageID = generateID();
|
let messageID = generateID();
|
||||||
|
|
||||||
let hash = await this.hashMessage(message);
|
let hash = await this.hashMessage(message);
|
||||||
|
|
||||||
for (let i = 0; i < chunks; i++) {
|
for (let i = 0; i < chunks; i++) {
|
||||||
let offset = i * chunkSize;
|
let offset = i * chunkSize;
|
||||||
let chunk = message?.substring(offset, offset + chunkSize);
|
let chunk = message?.substring(offset, offset + chunkSize);
|
||||||
// this.send(message?.substring(offset, offset + chunkSize-1));
|
// this.send(message?.substring(offset, offset + chunkSize-1));
|
||||||
// console.log("[chunk]", chunk);
|
// console.log("[chunk]", chunk);
|
||||||
let chunkHash = await this.hashMessage(chunk);
|
let chunkHash = await this.hashMessage(chunk);
|
||||||
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`));
|
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`));
|
||||||
|
|
||||||
let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash };
|
let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash };
|
||||||
await this.send(netMessage);
|
await this.send(netMessage);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
call(functionName: string, args: any) {
|
call(functionName: string, args: any) {
|
||||||
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
|
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
|
||||||
// Think about a timeout here to auto reject it after a while.
|
// Think about a timeout here to auto reject it after a while.
|
||||||
let promise = new Promise((resolve, reject) => {
|
let promise = new Promise((resolve, reject) => {
|
||||||
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
|
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
|
||||||
// setTimeout(() => reject("bad"), 1000);
|
// setTimeout(() => reject("bad"), 1000);
|
||||||
});
|
});
|
||||||
|
|
||||||
let message = {
|
let message = {
|
||||||
type: "rpc_call",
|
type: "rpc_call",
|
||||||
transaction_id: transactionID,
|
transaction_id: transactionID,
|
||||||
function_name: functionName,
|
function_name: functionName,
|
||||||
args: args,
|
args: args,
|
||||||
};
|
};
|
||||||
|
|
||||||
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
||||||
|
|
||||||
|
|
||||||
this.send(message);
|
this.send(message);
|
||||||
|
|
||||||
return promise;
|
return promise;
|
||||||
}
|
}
|
||||||
|
|
||||||
async onMessage(messageJSON: any) {
|
async onMessage(messageJSON: any) {
|
||||||
|
|
||||||
let message: any = {};
|
let message: any = {};
|
||||||
try {
|
try {
|
||||||
message = JSON.parse(messageJSON);
|
message = JSON.parse(messageJSON);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.log.apply(null, log("PeerConnection.onMessage:", e));
|
console.log.apply(null, log("PeerConnection.onMessage:", e));
|
||||||
}
|
|
||||||
|
|
||||||
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
|
|
||||||
let type = message.type;
|
|
||||||
|
|
||||||
if (type === "rpc_response") {
|
|
||||||
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
|
||||||
|
|
||||||
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
|
|
||||||
|
|
||||||
if (!pendingRPC) {
|
|
||||||
throw new Error();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pendingRPC.resolve(message.response);
|
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
|
||||||
}
|
let type = message.type;
|
||||||
|
|
||||||
if (type === "rpc_call") {
|
if (type === "rpc_response") {
|
||||||
|
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
||||||
|
|
||||||
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
|
||||||
|
|
||||||
|
if (!pendingRPC) {
|
||||||
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
|
throw new Error();
|
||||||
|
|
||||||
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (response === undefined) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
|
|
||||||
this.send(responseMessage);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type === "initial_peers") {
|
|
||||||
for (let peerID of message.peers) {
|
|
||||||
console.log(log("Connecting to initial peer ", peerID));
|
|
||||||
this.peerManager.connectToPeer(peerID);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (type === "chunk") {
|
|
||||||
let messageID = message.message_id;
|
|
||||||
if (!this.longMessages.has(messageID)) {
|
|
||||||
this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash });
|
|
||||||
}
|
|
||||||
|
|
||||||
let longMessage = this.longMessages.get(messageID);
|
|
||||||
if (!longMessage) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let chunkHash = await this.hashMessage(message.chunk_hash);
|
|
||||||
|
|
||||||
longMessage.messageChunks.push(message.chunk);
|
|
||||||
|
|
||||||
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`));
|
|
||||||
if (message.chunk_index === longMessage.totalChunks - 1) {
|
|
||||||
let completeMessage = longMessage.messageChunks.join('');
|
|
||||||
let hash = await this.hashMessage(completeMessage);
|
|
||||||
this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`));
|
|
||||||
if (hash !== longMessage.hash) {
|
|
||||||
throw new Error("[chunk] long message hashes don't match.");
|
|
||||||
}
|
}
|
||||||
this.onMessage(completeMessage);
|
|
||||||
this.longMessages.delete(messageID);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// this.peerManger.onMessage(this.remotePeerID, message);
|
pendingRPC.resolve(message.response);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (type === "rpc_call") {
|
||||||
|
|
||||||
|
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
|
||||||
|
|
||||||
|
|
||||||
|
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
|
||||||
|
|
||||||
|
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
if (response === undefined) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
|
||||||
|
this.send(responseMessage);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "initial_peers") {
|
||||||
|
for (let peerID of message.peers) {
|
||||||
|
console.log(log("Connecting to initial peer ", peerID));
|
||||||
|
this.peerManager.connectToPeer(peerID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type === "chunk") {
|
||||||
|
let messageID = message.message_id;
|
||||||
|
if (!this.longMessages.has(messageID)) {
|
||||||
|
this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash });
|
||||||
|
}
|
||||||
|
|
||||||
|
let longMessage = this.longMessages.get(messageID);
|
||||||
|
if (!longMessage) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let chunkHash = await this.hashMessage(message.chunk_hash);
|
||||||
|
|
||||||
|
longMessage.messageChunks.push(message.chunk);
|
||||||
|
|
||||||
|
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`));
|
||||||
|
if (message.chunk_index === longMessage.totalChunks - 1) {
|
||||||
|
let completeMessage = longMessage.messageChunks.join('');
|
||||||
|
let hash = await this.hashMessage(completeMessage);
|
||||||
|
this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`));
|
||||||
|
if (hash !== longMessage.hash) {
|
||||||
|
throw new Error("[chunk] long message hashes don't match.");
|
||||||
|
}
|
||||||
|
this.onMessage(completeMessage);
|
||||||
|
this.longMessages.delete(messageID);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// this.peerManger.onMessage(this.remotePeerID, message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -181,12 +181,12 @@ export class Sync {
|
|||||||
async checkPostIds(userID: string, peerID: string, postIDs: string[]) {
|
async checkPostIds(userID: string, peerID: string, postIDs: string[]) {
|
||||||
let startTime = performance.now();
|
let startTime = performance.now();
|
||||||
let neededPostIds = await checkPostIds(userID, postIDs);
|
let neededPostIds = await checkPostIds(userID, postIDs);
|
||||||
console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime).toFixed(2)}ms`));
|
this.syncSuperlog && console.log.apply(null, log(`[sync] ID Check for user ${logID(userID)} with IDs from peer[${logID(peerID)}] took ${(performance.now() - startTime).toFixed(2)}ms`));
|
||||||
|
|
||||||
if (neededPostIds.length > 0) {
|
if (neededPostIds.length > 0) {
|
||||||
console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));;
|
this.syncSuperlog && console.log.apply(null, log(`[sync] Need posts (${neededPostIds.length}) for user[${logID(userID)}] from peer[${logID(peerID)}]`));;
|
||||||
} else {
|
} else {
|
||||||
console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));;
|
this.syncSuperlog && console.log.apply(null, log(`[sync] Don't need any posts for user[${logID(userID)}] from peer[${logID(peerID)}]`));;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user