From a3a9682f86d84b538853367328a6d5ecde2b2357 Mon Sep 17 00:00:00 2001 From: bobbydigitales Date: Wed, 21 May 2025 11:39:02 -0700 Subject: [PATCH] Working chuking and image syncing. Still some bugs with syncing some large posts. --- src/PeerManager.ts | 267 ++++++++++++------ src/Sync.ts | 178 ++++++------ src/log.ts | 4 + src/main2.ts | 672 ++++++--------------------------------------- static/main2.js | 529 +++-------------------------------- 5 files changed, 396 insertions(+), 1254 deletions(-) diff --git a/src/PeerManager.ts b/src/PeerManager.ts index c086765..d1cb777 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -221,20 +221,20 @@ export class PeerManager { this.websocket = new WebSocket(wsURL); // this.websocket.onclose = (e: CloseEvent) => { - // let closedUnexpectedly = !e.wasClean; + // let closedUnexpectedly = !e.wasClean; - // if (closedUnexpectedly) { - // console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`)); - // // let alreadyReconnecting = this.reconnectTimer !== null; - // // if (!alreadyReconnecting) { - // this.reconnectTimer = globalThis.setTimeout(() => { - // console.log.apply(null, log(`Reconnecting web socket`)); + // if (closedUnexpectedly) { + // console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`)); + // // let alreadyReconnecting = this.reconnectTimer !== null; + // // if (!alreadyReconnecting) { + // this.reconnectTimer = globalThis.setTimeout(() => { + // console.log.apply(null, log(`Reconnecting web socket`)); - // this.reconnectTimer = null; - // this.connectWebSocket(); - // }, this.reconnectPeriod * 1000) - // }; - // } + // this.reconnectTimer = null; + // this.connectWebSocket(); + // }, this.reconnectPeriod * 1000) + // }; + // } // } } catch (error: any) { @@ -451,9 +451,10 @@ class PeerConnection { private isSettingRemoteAnswerPending: boolean = false; private polite = true; private webRTCSuperlog = false; - private dataChannelSuperlog = true; - messageSuperlog: boolean = true; - rpcSuperlog: boolean = true; + private dataChannelSuperlog = false; + private chunkSize = (16 * 1024) - 100; + messageSuperlog: boolean = false; + rpcSuperlog: boolean = false; pendingRPCs: Map< string, { resolve: Function; reject: Function; functionName: string } @@ -476,6 +477,9 @@ class PeerConnection { // { urls: "stun:stun4.l.google.com" }, ], }; + // longMessageQueue: string[] = []; + longMessages: Map = new Map(); + chunkSuperlog: boolean = false; async RPCHandler(message: any) { @@ -533,7 +537,7 @@ class PeerConnection { this.peerManager.disconnectFromPeer(this.remotePeerID); } - this.dataChannel.onerror = (e:RTCErrorEvent) => { + this.dataChannel.onerror = (e: RTCErrorEvent) => { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error)); } } @@ -701,96 +705,177 @@ class PeerConnection { this.rtcPeer = null; } - send(message: any) { - - let messageJSON = JSON.stringify(message); - - this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); + async send(message: any) { - if (messageJSON.length > (32 * 1024)) { - this.messageSuperlog && console.log.apply(null, log(`[datachannel] Not sending long message: `, messageJSON.length)); - return; + if (!this.dataChannel) { + throw new Error("Send called but datachannel is null"); } - this.dataChannel?.send(messageJSON); + + while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) { + await new Promise((resolve, reject) => { setTimeout(()=> resolve(), 1000); + }) + } + + let messageJSON = JSON.stringify(message); + +this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`)); + +if (messageJSON.length > this.chunkSize) { + this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length)); + this.sendLongMessage(messageJSON); + return; +} + + + +try { + this.dataChannel?.send(messageJSON); +} catch (e) { + console.log.apply(null, log(e)); + +} // this.onMessage(messageJSON); } - call(functionName: string, args: any) { - let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer - // Think about a timeout here to auto reject it after a while. - let promise = new Promise((resolve, reject) => { - this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); - // setTimeout(() => reject("bad"), 1000); - }); - let message = { - type: "rpc_call", - transaction_id: transactionID, - function_name: functionName, - args: args, - }; + // Get a polyfill for browsers that don't have this API + async hashMessage(message: string) { + let msgUint8 = new TextEncoder().encode(message); + const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8); + const hashArray = Array.from(new Uint8Array(hashBuffer)); + const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join(''); + return hashHex; +} - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + async sendLongMessage(message: string) { + // message = JSON.parse(message); + let chunkSize = this.chunkSize / 2; + // let chunkSize = 1024; + let chunks = Math.ceil(message!.length / chunkSize); + let messageID = generateID(); + let hash = await this.hashMessage(message); - this.send(message); + for (let i = 0; i < chunks; i++) { + let offset = i * chunkSize; + let chunk = message?.substring(offset, offset + chunkSize); + // this.send(message?.substring(offset, offset + chunkSize-1)); + // console.log("[chunk]", chunk); + let chunkHash = await this.hashMessage(chunk); + this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`)); - return promise; + let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash }; + await this.send(netMessage); } +} + + + + +call(functionName: string, args: any) { + let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer + // Think about a timeout here to auto reject it after a while. + let promise = new Promise((resolve, reject) => { + this.pendingRPCs.set(transactionID, { resolve, reject, functionName }); + // setTimeout(() => reject("bad"), 1000); + }); + + let message = { + type: "rpc_call", + transaction_id: transactionID, + function_name: functionName, + args: args, + }; + + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + + this.send(message); + + return promise; +} async onMessage(messageJSON: any) { - let message: any = {}; - try { - message = JSON.parse(messageJSON); - } catch (e) { - console.log.apply(null, log("PeerConnection.onMessage:", e)); - } - - this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); - let type = message.type; - - if (type === "rpc_response") { - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); - - let pendingRPC = this.pendingRPCs.get(message.transaction_id); - - if (!pendingRPC) { - throw new Error(); - } - - pendingRPC.resolve(message.response); - } - - if (type === "rpc_call") { - - this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); - - - let response = await this.peerManager.callFromRemote(message.function_name, message.args); - - this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); - - - - if (response === undefined) { - return; - } - - let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; - this.send(responseMessage); - - } - - if (type === "initial_peers") { - for (let peerID of message.peers) { - console.log(log("Connecting to initial peer ", peerID)); - this.peerManager.connectToPeer(peerID); - } - } - - // this.peerManger.onMessage(this.remotePeerID, message); + let message: any = {}; + try { + message = JSON.parse(messageJSON); + } catch (e) { + console.log.apply(null, log("PeerConnection.onMessage:", e)); } + + this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message)); + let type = message.type; + + if (type === "rpc_response") { + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + let pendingRPC = this.pendingRPCs.get(message.transaction_id); + + if (!pendingRPC) { + throw new Error(); + } + + pendingRPC.resolve(message.response); + } + + if (type === "rpc_call") { + + this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2))); + + + let response = await this.peerManager.callFromRemote(message.function_name, message.args); + + this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response)); + + + + if (response === undefined) { + return; + } + + let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response }; + this.send(responseMessage); + + } + + if (type === "initial_peers") { + for (let peerID of message.peers) { + console.log(log("Connecting to initial peer ", peerID)); + this.peerManager.connectToPeer(peerID); + } + } + + if (type === "chunk") { + let messageID = message.message_id; + if (!this.longMessages.has(messageID)) { + this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash }); + } + + let longMessage = this.longMessages.get(messageID); + if (!longMessage) { + return; + } + + let chunkHash = await this.hashMessage(message.chunk_hash); + + longMessage.messageChunks.push(message.chunk); + + this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`)); + if (message.chunk_index === longMessage.totalChunks - 1) { + let completeMessage = longMessage.messageChunks.join(''); + let hash = await this.hashMessage(completeMessage); + this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`)); + if (hash !== longMessage.hash) { + throw new Error("[chunk] long message hashes don't match."); + } + this.onMessage(completeMessage); + this.longMessages.delete(messageID); + } + } + + // this.peerManger.onMessage(this.remotePeerID, message); +} } diff --git a/src/Sync.ts b/src/Sync.ts index 07e4bbc..22284ff 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -3,24 +3,30 @@ import { log, logID } from "log"; async function bytesToBase64DataUrl(bytes: Uint8Array, type = "application/octet-stream") { - return await new Promise((resolve, reject) => { - const reader = Object.assign(new FileReader(), { - onload: () => resolve(reader.result), - onerror: () => reject(reader.error), + return await new Promise((resolve, reject) => { + const reader = Object.assign(new FileReader(), { + onload: () => resolve(reader.result), + onerror: () => reject(reader.error), + }); + reader.readAsDataURL(new File([bytes], "", { type })); }); - reader.readAsDataURL(new File([bytes], "", { type })); - }); } async function arrayBufferToBase64(buffer: ArrayBuffer) { - var bytes = new Uint8Array(buffer); - return (await bytesToBase64DataUrl(bytes) as string).replace("data:application/octet-stream;base64,", ""); + var bytes = new Uint8Array(buffer); + return (await bytesToBase64DataUrl(bytes) as string).replace("data:application/octet-stream;base64,", ""); } async function base64ToArrayBuffer(base64String: string) { - let response = await fetch("data:application/octet-stream;base64," + base64String); - let arrayBuffer = await response.arrayBuffer(); - return arrayBuffer; + let response; + try { + response = await fetch("data:application/octet-stream;base64," + base64String); + } catch (e) { + console.log("error", e, base64String); + return null; + } + let arrayBuffer = await response.arrayBuffer(); + return arrayBuffer; } @@ -38,6 +44,7 @@ export class Sync { } setUserID(userID: string) { + this.userID = userID; this.userIDsToSync = new Set(this.getFollowing(userID)); } @@ -50,7 +57,7 @@ export class Sync { return this.userIDsToSync.has(userID); } - getPeersForUser(userID:string) { + getPeersForUser(userID: string) { let peers = this.userPeers.get(userID); if (!peers) { return []; @@ -69,8 +76,8 @@ export class Sync { let peers = this.userPeers.get(userID) as Set; peers.add(peerID); - - this.syncSuperlog && console.log.apply(null, log(this.userPeers));; + + // this.syncSuperlog && console.log.apply(null, log(this.userPeers)); } deleteUserPeer(peerIDToDelete: string) { @@ -120,36 +127,35 @@ export class Sync { getFollowing(userID: string): string[] { + let following = ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) + following.push(this.userID); + // Rob if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', + following.push(...[ '6d774268-16cd-4e86-8bbe-847a0328893d', // Sean '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO 'bba3ad24-9181-4e22-90c8-c265c80873ea', // Harry '8f6802be-c3b6-46c1-969c-5f90cbe01479', // Fiona - ] + ]); } // Martin if (userID === '05a495a0-0dd8-4186-94c3-b8309ba6fc4c') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - ] + following.push(...[ + 'b38b623c-c3fa-4351-9cab-50233c99fa4e', // Rob + ]); } // Fiona if (userID === '8f6802be-c3b6-46c1-969c-5f90cbe01479') { - return [ + following.push(...[ 'b38b623c-c3fa-4351-9cab-50233c99fa4e', // Rob - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - ] + ]); } - return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) + return following; } async getPostIdsForUser(userID: string) { @@ -163,79 +169,89 @@ export class Sync { return postIds; } - async checkPostIds(userID:string, peerID:string, postIDs:string[]) { - let startTime = performance.now(); - let neededPostIds = await checkPostIds(userID, postIDs); - console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime) .toFixed(2)}ms`)); + async checkPostIds(userID: string, peerID: string, postIDs: string[]) { + let startTime = performance.now(); + let neededPostIds = await checkPostIds(userID, postIDs); + console.log.apply(null, log(`ID Check for user ${logID(userID)} took ${(performance.now() - startTime).toFixed(2)}ms`)); - if (neededPostIds.length > 0) { - console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));; - } else { - console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));; + if (neededPostIds.length > 0) { + console.log.apply(null, log(`Need posts (${neededPostIds.length}) for user ${logID(userID)} from peer ${logID(peerID)}`));; + } else { + console.log.apply(null, log(`Don't need any posts for user ${logID(userID)} from peer ${logID(peerID)}`));; - } - - // if (postIds.length === 0) { - // return []; - // } + } - return neededPostIds; + // if (postIds.length === 0) { + // return []; + // } + + return neededPostIds; } - async getPostsForUser(userID:string, postIDs: string[]) { - let posts = await getPostsByIds(userID, postIDs) ?? []; - - console.log.apply(null, log(`[sync] got ${posts.length} posts for user ${logID(userID)}`));; - + async getPostsForUser(userID: string, postIDs: string[]) { + let posts = await getPostsByIds(userID, postIDs) ?? []; + + console.log.apply(null, log(`[sync] got ${posts.length} posts for user ${logID(userID)}`));; + // app.timerStart(); - let output = []; - - console.log.apply(null, log("Serializing images")); - for (let post of posts) { + let output = []; + + console.log.apply(null, log("Serializing images")); + for (let post of posts) { let newPost = (post as any).data; - + if (newPost.image_data) { - // let compressedData = await wsConnection.compressArrayBuffer(newPost.image_data); - // console.log.apply(null, log((newPost.image_data.byteLength - compressedData.byteLength) / 1024 / 1024); - - // TODO don't do this, use Blobs direclty! - // https://developer.chrome.com/blog/blob-support-for-Indexeddb-landed-on-chrome-dev - - newPost.image_data = await arrayBufferToBase64(newPost.image_data); - + // let compressedData = await wsConnection.compressArrayBuffer(newPost.image_data); + // console.log.apply(null, log((newPost.image_data.byteLength - compressedData.byteLength) / 1024 / 1024); + + // TODO don't do this, use Blobs direclty! + // https://developer.chrome.com/blog/blob-support-for-Indexeddb-landed-on-chrome-dev + + newPost.image_data = await arrayBufferToBase64(newPost.image_data); + } - + // let megs = JSON.stringify(newPost).length/1024/1024; // console.log.apply(null, log(`getPostsForUserHandler id:${newPost.post_id} post length:${megs}`); output.push(newPost); - } + } - return output; + return output; // console.log.apply(null, log(`getPostsForUser`,output)); } - async writePostForUser(userID:string, post:any) { - // HACK: Some posts have insanely large images, so I'm gonna skip them. - // Once we support delete then we we could delete these posts in a sensible way. - if (this.postBlockList.has(post.post_id)) { - console.log.apply(null, log(`Skipping blocked post: ${post.post_id}`));; - return; - } - - // HACK - some posts had the wrong author ID - if (userID === this.userID) { - post.author_id = this.userID; - } - - post.post_timestamp = new Date(post.post_timestamp); - if (post.image_data) { - post.image_data = await base64ToArrayBuffer(post.image_data); - } - - console.log.apply(null, log(`Merging same user peer posts...`)); + async writePostForUser(userID: string, post: any) { + // HACK: Some posts have insanely large images, so I'm gonna skip them. + // Once we support delete then we we could delete these posts in a sensible way. + if (this.postBlockList.has(post.post_id)) { + console.log.apply(null, log(`Skipping blocked post: ${post.post_id}`));; + return; + } - await mergeDataArray(userID, [post]); + // HACK - some posts had the wrong author ID + if (userID === this.userID) { + post.author_id = this.userID; + } + + post.post_timestamp = new Date(post.post_timestamp); + if (post.image_data) { + let imageDataArrayBuffer = await base64ToArrayBuffer(post.image_data); + + if (imageDataArrayBuffer === null) { + this.syncSuperlog && console.log(`[sync] Failed to create arraybuffer for image for post userID:${userID} postID:${post.post_id} `); + return; + } + + post.image_data = imageDataArrayBuffer; + + // skip posts with images for now. + // return; + } + + console.log.apply(null, log(`Merging same user peer posts...`)); + + await mergeDataArray(userID, [post]); } // async getPostIdsForUserHandler(data: any) { diff --git a/src/log.ts b/src/log.ts index 96c2caf..d8ed441 100644 --- a/src/log.ts +++ b/src/log.ts @@ -3,6 +3,10 @@ let logLength = 100; let logVisible = false; export function logID(ID: string) { + if (!ID) { + return "badID"; + } + return ID.substring(0, 5); } diff --git a/src/main2.ts b/src/main2.ts index 93592bd..f37c89c 100644 --- a/src/main2.ts +++ b/src/main2.ts @@ -58,57 +58,57 @@ declare let QRCode: any; // second: number, // } -function waitMs(durationMs: number) { - return new Promise(resolve => setTimeout(resolve, durationMs)); -} +// function waitMs(durationMs: number) { +// return new Promise(resolve => setTimeout(resolve, durationMs)); +// } -function uuidToBytes(uuid: string): Uint8Array { - return new Uint8Array(uuid.match(/[a-fA-F0-9]{2}/g)!.map((hex) => parseInt(hex, 16))); -} +// function uuidToBytes(uuid: string): Uint8Array { +// return new Uint8Array(uuid.match(/[a-fA-F0-9]{2}/g)!.map((hex) => parseInt(hex, 16))); +// } // Base58 character set -const BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'; +// const BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'; // Base58 encoding // Base58 encoding -function encodeBase58(buffer: Uint8Array): string { - let carry; - const digits = [0]; +// function encodeBase58(buffer: Uint8Array): string { +// let carry; +// const digits = [0]; - for (const byte of buffer) { - carry = byte; - for (let i = 0; i < digits.length; i++) { - carry += digits[i] << 8; - digits[i] = carry % 58; - carry = Math.floor(carry / 58); - } - while (carry > 0) { - digits.push(carry % 58); - carry = Math.floor(carry / 58); - } - } +// for (const byte of buffer) { +// carry = byte; +// for (let i = 0; i < digits.length; i++) { +// carry += digits[i] << 8; +// digits[i] = carry % 58; +// carry = Math.floor(carry / 58); +// } +// while (carry > 0) { +// digits.push(carry % 58); +// carry = Math.floor(carry / 58); +// } +// } - let result = ''; - for (const digit of digits.reverse()) { - result += BASE58_ALPHABET[digit]; - } +// let result = ''; +// for (const digit of digits.reverse()) { +// result += BASE58_ALPHABET[digit]; +// } - // Handle leading zero bytes - for (const byte of buffer) { - if (byte === 0x00) { - result = BASE58_ALPHABET[0] + result; - } else { - break; - } - } +// // Handle leading zero bytes +// for (const byte of buffer) { +// if (byte === 0x00) { +// result = BASE58_ALPHABET[0] + result; +// } else { +// break; +// } +// } - return result; -} +// return result; +// } // Convert UUID v4 to Base58 -function uuidToBase58(uuid: string): string { - const bytes = uuidToBytes(uuid); - return encodeBase58(bytes); -} +// function uuidToBase58(uuid: string): string { +// const bytes = uuidToBytes(uuid); +// return encodeBase58(bytes); +// } // function log(message:string) { // console.log.apply(null, log(message); @@ -120,9 +120,6 @@ function uuidToBase58(uuid: string): string { // } - - - interface StoragePost { data: Post; } @@ -210,11 +207,11 @@ async function arrayBufferToBase64(buffer: ArrayBuffer) { return (await bytesToBase64DataUrl(bytes) as string).replace("data:application/octet-stream;base64,", ""); } -async function base64ToArrayBuffer(base64String: string) { - let response = await fetch("data:application/octet-stream;base64," + base64String); - let arrayBuffer = await response.arrayBuffer(); - return arrayBuffer; -} +// async function base64ToArrayBuffer(base64String: string) { +// let response = await fetch("data:application/octet-stream;base64," + base64String); +// let arrayBuffer = await response.arrayBuffer(); +// return arrayBuffer; +// } async function compressString(input: string) { // Convert the string to a Uint8Array @@ -236,492 +233,8 @@ async function compressString(input: string) { return new Uint8Array(compressedArray); } -interface PeerMessage { - type: string; - from: string; - to: string; - from_peername: string; - from_username: string; - message: any; -} -// Connect websocket -// send hello -// get bootstrap peer ID -// WebRTC connect to bootstrap peer -// Bootstrap peer will send the last N peers it saw. -// Connect to those new peers, tell those peers about users we know about -// ask for peers that have users we care about -// WebRTC Connect to peers that might have posts we need -// query those peers and do existing logic. - -class wsConnection { - websocket: WebSocket | null = null; - sessionID = ""; - userID = ""; - peerID = ""; - // rtcPeerDescription: RTCSessionDescription | null = null; - UserIDsToSync: Set; - websocketPingInterval: number = 0; - helloRefreshInterval: number = 0; - retry = 10; - state = 'disconnected'; - // peers: Map = new Map(); - - messageHandlers: Map void> = new Map(); - peerMessageHandlers: Map void> = new Map(); - seenPeers: Map = new Map(); - - - constructor(userID: string, peerID: string, IDsToSync: Set, rtcPeerDescription: RTCSessionDescription) { - // this.rtcPeerDescription = rtcPeerDescription; - this.sessionID = generateID(); - this.userID = userID; - this.peerID = peerID; - this.UserIDsToSync = new Set(IDsToSync); - - this.messageHandlers.set('hello', this.helloResponseHandler.bind(this)); - this.messageHandlers.set('hello2', this.hello2ResponseHandler.bind(this)); - this.messageHandlers.set('pong', this.pongHandler); - this.messageHandlers.set('peer_message', this.peerMessageHandler.bind(this)); - - - // - this.peerMessageHandlers.set('get_post_ids_for_user', this.getPostIdsForUserHandler.bind(this)); - this.peerMessageHandlers.set('get_post_ids_for_user_response', this.getPostIdsForUserResponseHandler.bind(this)); - - this.peerMessageHandlers.set('get_posts_for_user', this.getPostsForUserHandler.bind(this)); - this.peerMessageHandlers.set('get_posts_for_user_response', this.getPostsForUserReponseHandler.bind(this)); - - // this.peerMessageHandlers.set('send_webrtc_offer', this.sendWebRTCOfferHandler.bind(this)); - // this.peerMessageHandlers.set('send_webrtc_offer_response', this.getPostIdsForUserResponseHandler.bind(this)); - - - - window.addEventListener('beforeunload', () => this.disconnect()); - - this.connect(); - } - - // So we don't need custom logic everywhere we use this, I just wrapped it. - shouldSyncUserID(userID: string) { - if (app.isHeadless) { - return true; - } - - return this.UserIDsToSync.has(userID); - } - - async send(message: any) { - let json = "" - try { - json = JSON.stringify(message); - // console.log.apply(null, log("*******", (await compressString(json)).byteLength, json.length); - } catch (e) { - console.log.apply(null, log(e, "wsConnection send: Couldn't serialize message", message)); - } - // log(`ws->${json.slice(0, 240)}`) - this.websocket!.send(json); - - } - - pongHandler(data: any) { - } - - - async sendWebRTCDescription(description: RTCSessionDescription | null) { - - console.log.apply(null, log("description:", description)); - this.send({ type: "rtc_session_description", description: description }); - } - - async getPostIdsForUserResponseHandler(data: any) { - // log(`getPostsForUserResponse: ${data}`) - - let message = data.message; - console.log.apply(null, log(`Net: got ${message.post_ids.length} post IDs for user ${logID(message.user_id)} from peer ${logID(data.from)}`));; - - - let startTime = app.timerStart(); - let postIds = await checkPostIds(message.user_id, message.post_ids); - console.log.apply(null, log(`ID Check for user ${logID(message.user_id)} took ${app.timerDelta().toFixed(2)}ms`));; - console.log.apply(null, log(`Need ${postIds.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`));; - - if (postIds.length === 0) { - return; - } - - console.log.apply(null, log(`Net: Req ${postIds.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } } - - this.send(responseMessage); - } - - - - // static async compressArrayBuffer(data: ArrayBuffer): Promise { - // const compressionStream = new CompressionStream('gzip'); // You can also use 'deflate', 'deflate-raw', etc. - - // const compressedStream = new Response( - // new Blob([data]).stream().pipeThrough(compressionStream) - // ); - - // const compressedArrayBuffer = await compressedStream.arrayBuffer(); - - // return compressedArrayBuffer; - // } - postBlockList = new Set([ - '1c71f53c-c467-48e4-bc8c-39005b37c0d5', - '64203497-f77b-40d6-9e76-34d17372e72a', - '243130d8-4a41-471e-8898-5075f1bd7aec', - 'e01eff89-5100-4b35-af4c-1c1bcb007dd0', - '194696a2-d850-4bb0-98f7-47416b3d1662', - 'f6b21eb1-a0ff-435b-8efc-6a3dd70c0dca', - 'dd1d92aa-aa24-4166-a925-94ba072a9048' - ]); - - async getPostIdsForUserHandler(data: any) { - let message = data.message; - let postIds = await getAllIds(message.user_id) ?? []; - postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID)); - if (postIds.length === 0) { - console.log.apply(null, log(`Net: I know about user ${logID(message.user_id)} but I have 0 posts, so I'm not sending any to to peer ${logID(data.from)}`));; - return; - } - console.log.apply(null, log(`Net: Sending ${postIds.length} post Ids for user ${logID(message.user_id)} to peer ${logID(data.from)}`)); - - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_post_ids_for_user_response", post_ids: postIds, user_id: message.user_id } } - this.send(responseMessage); - } - - async broadcastNewPost(userID: string, post: any) { - - let newPost = { ...post } - if (post.image_data) { - newPost.image_data = await arrayBufferToBase64(post.image_data); - } - - for (let [peerID, peerInfo] of this.seenPeers.entries()) { - console.log.apply(null, log(`broadcastNewPost: sending new post to ${logID(peerID)}:${peerInfo.peerName}:${peerInfo.userName}`));; - - this.sendPostsForUser(peerID, app.userID, [newPost]) - } - } - - - async sendPostsForUser(toPeerID: string, userID: string, posts: any) { - let responseMessage = { - type: "peer_message", - from: app.peerID, - to: toPeerID, - from_username: app.username, - from_peername: app.peername, - message: { - type: "get_posts_for_user_response", - posts: posts, - user_id: userID - } - } - - return this.send(responseMessage); - - } - - // Send posts to peer - async getPostsForUserHandler(data: any) { - let message = data.message; - let posts = await getPostsByIds(message.user_id, message.post_ids) ?? []; - - console.log.apply(null, log(`Net: Sending ${posts.length} posts for user ${logID(message.user_id)} to peer ${logID(data.from)}`));; - - app.timerStart(); - let output = []; - - console.log.apply(null, log("Serializing images")); - for (let post of posts) { - let newPost = (post as any).data; - - if (newPost.image_data) { - // let compressedData = await wsConnection.compressArrayBuffer(newPost.image_data); - // console.log.apply(null, log((newPost.image_data.byteLength - compressedData.byteLength) / 1024 / 1024); - - // TODO don't do this, use Blobs direclty! - // https://developer.chrome.com/blog/blob-support-for-Indexeddb-landed-on-chrome-dev - - newPost.image_data = await arrayBufferToBase64(newPost.image_data); - - } - - // let megs = JSON.stringify(newPost).length/1024/1024; - // console.log.apply(null, log(`getPostsForUserHandler id:${newPost.post_id} post length:${megs}`); - output.push(newPost); - } - - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_posts_for_user_response", posts: output, user_id: message.user_id } } - - console.log.apply(null, log("Sending posts")); - await this.sendPostsForUser(data.from, message.user_id, output); - let sendTime = app.timerDelta(); - console.log.apply(null, log(`getPostsForUserHandler send took: ${sendTime.toFixed(2)}ms`));; - - } - - - - // Got posts from peer - async getPostsForUserReponseHandler(data: any) { - app.timerStart(); - let message = data.message; - console.log.apply(null, log(`Net: got ${message.posts.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - for (let post of message.posts) { - - // HACK: Some posts have insanely large images, so I'm gonna skip them. - // Once we support delete then we we could delete these posts in a sensible way. - if (this.postBlockList.has(post.post_id)) { - console.log.apply(null, log(`Skipping blocked post: ${post.post_id}`));; - continue; - } - - // HACK - some posts had the wrong author ID - if (message.user_id === app.userID) { - post.author_id = app.userID; - } - - post.post_timestamp = new Date(post.post_timestamp); - if (post.image_data) { - post.image_data = await base64ToArrayBuffer(post.image_data); - } - } - console.log.apply(null, log(`Merging same user peer posts...`)); - await mergeDataArray(message.user_id, data.message.posts); - - let receiveTime = app.timerDelta(); - - console.log.apply(null, log(`getPostsForUserReponseHandler receive took: ${receiveTime.toFixed(2)}ms`));; - - - if (message.user_id === app.getPreferentialUserID() || app.following.has(message.user_id)) { - app.render(); - } - } - - - - - async peerMessageHandler(data: PeerMessage) { - // log(`peerMessageHandler ${JSON.stringify(data)}`) - - this.seenPeers.set(data.from, { peerName: data.from_peername, userName: data.from_username }); - - let peerMessageType = data.message.type; - - let handler = this.peerMessageHandlers.get(peerMessageType); - - if (!handler) { - console.error(`got peer message type we don't have a handler for: ${peerMessageType}`); - return; - } - - handler(data); - } - - userBlockList = new Set([ - '5d63f0b2-a842-41bf-bf06-e0e4f6369271', - '5f1b85c4-b14c-454c-8df1-2cacc93f8a77', - // 'bba3ad24-9181-4e22-90c8-c265c80873ea' - ]) - - - // Hello2 - // Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync. - // how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP. - - async sendHello2() { - this.send({ - type: "hello2", - user_id: this.userID, - user_name: app.username, - peer_id: this.peerID, - session_id: this.sessionID, - peer_name: app.peername, - is_bootstrap_peer: app.isBootstrapPeer, - // peer_description: this.rtcPeerDescription - }); - } - - async getKnownUsers() { - let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined); - knownUsers = knownUsers - .filter(userID => this.shouldSyncUserID(userID)) - .filter(userID => !this.userBlockList.has(userID)) - .filter(async userID => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID. - } - - async sendHello() { - // TODO only get users you're following here. ✅ - let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined); - knownUsers = knownUsers - .filter(userID => this.shouldSyncUserID(userID)) - .filter(userID => !this.userBlockList.has(userID)) - .filter(async userID => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID. - - console.log.apply(null, log('Net: Sending known users', knownUsers.map(userID => logID(userID ?? "")))); - return await this.send({ type: "hello", user_id: this.userID, user_name: app.username, peer_id: this.peerID, peer_name: app.peername, known_users: knownUsers }); - } - - hello2ResponseHandler(data: any) { - - } - - helloResponseHandler(data: any) { - - let users = []; - let receivedUsers = Object.entries(data.userPeers); - console.log.apply(null, log(`Net: got ${receivedUsers.length} users from bootstrap peer.`)); - - try { - let preferentialUserID = app.getPreferentialUserID(); - let currentUserPeers = data.userPeers[preferentialUserID]; - users.push([preferentialUserID, currentUserPeers]); - delete data.userPeers[preferentialUserID]; - } catch (e) { - console.log.apply(null, log('helloResponseHandler', e)); - } - - let getAllUsers = app.router.route !== App.Route.USER - if (getAllUsers) { - users = [...users, ...Object.entries(data.userPeers).filter(userID => this.shouldSyncUserID(userID[0]))]; - } - - // log(`Net: got ${users.length} users from bootstrap peer. \n${users.map((user)=>user[0]).join('\n')}`) - - for (let [userID, peerIDs] of users) { - if (this.userBlockList.has(userID)) { - console.log.apply(null, log("Skipping user on blocklist:", userID)); - continue; - } - - // this.peers.set(userID, [...peerIDs]); - - for (let peerID of [...peerIDs]) { - if (peerID === this.peerID) { - continue; - } - - console.log.apply(null, log(`Net: Req post IDs for user ${logID(userID)} from peer ${logID(peerID)}`));; - this.send({ - type: "peer_message", - from: this.peerID, - from_username: app.username, - from_peername: app.peername, - to: peerID, - message: { type: "get_post_ids_for_user", user_id: userID } - }) - } - } - } - - connect(): void { - if (this.websocket?.readyState === WebSocket.OPEN) { - return; - } - - window.clearInterval(this.websocketPingInterval); - if (this.websocket) { this.websocket.close() }; - - try { - this.websocket = new WebSocket(`wss://${window.location.hostname}:${window.location.port}/ws`); - } catch (error: any) { - console.log.apply(null, log(error.message)); - return; - } - - this.websocket.onopen = async (event) => { - console.log.apply(null, log("ws:connected"));; - await this.sendHello2(); - - // If we're running as a headless peer, send a hello message every N seconds to refresh the posts we have. - // let helloRefreshIntervalPeriod = 120; - // if (app.isHeadless) { - // console.log.apply(null, log("wsConnection: Setting hello refresh interval to ", helloRefreshIntervalPeriod) - // this.helloRefreshInterval = window.setInterval(() => { - // console.log.apply(null, log("wsConnection: Hello refresh.") - - // if (!navigator.onLine) { - // return; - // } - // this.sendHello(); - // }, helloRefreshIntervalPeriod * 1000); - // } - - this.websocketPingInterval = window.setInterval(() => { - if (!navigator.onLine) { - return; - } - this.send({ type: "ping", peer_id: this.peerID, peer_name: app.peername, user_id: app.userID, user_name: app.username }); - }, 10_000) - }; - - // this.websocket.onopen = async (event) => { - // console.log.apply(null, log("ws:connected"));; - // await this.sendHello(); - - // // If we're running as a headless peer, send a hello message every N seconds to refresh the posts we have. - // let helloRefreshIntervalPeriod = 120; - // if (app.isHeadless) { - // console.log.apply(null, log("wsConnection: Setting hello refresh interval to ", helloRefreshIntervalPeriod) - // this.helloRefreshInterval = window.setInterval(() => { - // console.log.apply(null, log("wsConnection: Hello refresh.") - - // if (!navigator.onLine) { - // return; - // } - // this.sendHello(); - // }, helloRefreshIntervalPeriod * 1000); - // } - - // this.websocketPingInterval = window.setInterval(() => { - // if (!navigator.onLine) { - // return; - // } - // this.send({ type: "ping", peer_id: this.peerID, peer_name: app.peername, user_id: app.userID, user_name: app.username }); - // }, 10_000) - // }; - - this.websocket.onclose = (event) => { - console.log.apply(null, log("ws:disconnected"));; - // this.retry *= 2; - console.log.apply(null, log(`Retrying in ${this.retry} seconds`));; - window.setTimeout(() => { this.connect(); }, this.retry * 1000); - }; - - this.websocket.onmessage = (event) => { - // log('ws:<-' + event.data.slice(0, 240)); - let data = JSON.parse(event.data); - - let { type } = data; - - let handler = this.messageHandlers.get(type); - if (!handler) { - console.warn(`Got a message we can't handle:`, type); - return; - } - - handler(data); - - }; - - this.websocket.onerror = (event) => { - console.log.apply(null, log('ws:error: ' + event));; - }; - } - - disconnect() { - this.websocket?.close(); - } -} - class App { username: string = ''; peername: string = ''; @@ -735,13 +248,14 @@ class App { showLog: boolean = false; markedAvailable = false; limitPosts = 50; - websocket: wsConnection | null = null; + // websocket: wsConnection | null = null; // vizGraph: any | null = null; qrcode: any = null; connectURL: string = ""; firstRun = false; peerManager: PeerManager | null = null; sync: Sync = new Sync(); + renderTimer: number = 0; async announceUser_rpc_response(sendingPeerID: string, userIDs: string[]) { if (this.isBootstrapPeer) { @@ -751,9 +265,9 @@ class App { console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + // console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); this.sync.addUserPeer(userID, sendingPeerID); - if (this.sync.shouldSyncUserID(userID)) { + if (this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID)) { let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); // console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); let neededPostIDs = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); @@ -827,6 +341,8 @@ class App { for (let post of posts) { + console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.id)}] to [${logID(requestingPeerID)}]`, userID, post)); + this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post); } // return posts; @@ -834,12 +350,20 @@ class App { // return postIDs; }); - this.peerManager.registerRPC('sendPostForUser', async (userID:string, post:string) => { - console.log.apply(null, log(`[app] sendPostForUser`, userID, post)); + this.peerManager.registerRPC('sendPostForUser', async (userID:string, post:Post) => { + console.log.apply(null, log(`[app] sendPostForUser got post`, userID, post)); + // if (post.text === "image...") { + // debugger; + // } await this.sync.writePostForUser(userID, post); - if (userID === this.userID) { - await this.render(); + // if (userID === this.userID) { + + if (this.renderTimer) { + clearTimeout(this.renderTimer); } + + this.renderTimer = setTimeout(()=>{this.render()}, 1000) + // } }); @@ -1213,7 +737,7 @@ class App { // localStorage.setItem(key, JSON.stringify(posts)); addData(userID, post); - this.websocket?.broadcastNewPost(userID, post); + // this.websocket?.broadcastNewPost(userID, post); this.render(); @@ -1307,22 +831,22 @@ class App { textArea.style.fontSize = fontSize; } - initOffline(connection: wsConnection) { - // Event listener for going offline - window.addEventListener('offline', () => { - console.log.apply(null, log("offline")); - }); + // initOffline(connection: wsConnection) { + // // Event listener for going offline + // window.addEventListener('offline', () => { + // console.log.apply(null, log("offline")); + // }); - // Event listener for going online - window.addEventListener('online', async () => { - console.log.apply(null, log("online")); - // connection.connect(); - this.render(); - }); + // // Event listener for going online + // window.addEventListener('online', async () => { + // console.log.apply(null, log("online")); + // // connection.connect(); + // this.render(); + // }); - console.log.apply(null, log(`Online status: ${navigator.onLine ? "online" : "offline"}`)); + // console.log.apply(null, log(`Online status: ${navigator.onLine ? "online" : "offline"}`)); - } + // } selectFile(contentType: string): Promise { return new Promise(resolve => { @@ -1546,37 +1070,7 @@ class App { } async loadFollowersFromStorage(userID: string): Promise { - - // Rob - if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', - '6d774268-16cd-4e86-8bbe-847a0328893d', // Sean - '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - 'bba3ad24-9181-4e22-90c8-c265c80873ea', // Harry - '8f6802be-c3b6-46c1-969c-5f90cbe01479', // Fiona - ] - } - - // Martin - if (userID === '05a495a0-0dd8-4186-94c3-b8309ba6fc4c') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - ] - } - - // Fiona - if (userID === '8f6802be-c3b6-46c1-969c-5f90cbe01479') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', // Rob - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - ] - } - - return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) + return this.sync.getFollowing(userID); } async loadPostsFromStorage(userID: string, postID?: string) { @@ -1716,17 +1210,15 @@ class App { this.sync.setUserID(this.userID) this.sync.setArchive(this.isArchivePeer); - - this.connect(); - + this.getRoute(); - if (this.router.route === App.Route.CONNECT) { console.log.apply(null, log('connect', this.router.userID)); localStorage.setItem("dandelion_id", this.router.userID); localStorage.removeItem("dandelion_username"); } - + + this.connect(); await this.initDB(); @@ -2032,7 +1524,7 @@ class App { const blob = new Blob([post.image_data as ArrayBuffer]); const url = URL.createObjectURL(blob); image.onload = () => { - URL.revokeObjectURL(url); + // URL.revokeObjectURL(url); }; image.src = url; diff --git a/static/main2.js b/static/main2.js index 4c22da3..f42d826 100644 --- a/static/main2.js +++ b/static/main2.js @@ -26,67 +26,11 @@ Restruucture the app around the data. App/WS split is messy. Clean it up. */ // import * as ForceGraph3D from "3d-force-graph"; -import { openDatabase, getData, addData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db"; +import { openDatabase, getData, addData, deleteData, getAllData } from "db"; import { generateID } from "IDUtils"; import { PeerManager, PeerEventTypes } from "PeerManager"; import { Sync } from "Sync"; import { log, logID, renderLog, setLogVisibility } from "log"; -// let posts:any; -// let keyBase = "dandelion_posts_v1_" -// let key:string = ""; -// interface PostTimestamp { -// year: number, -// month: number, -// day: number, -// hour: number, -// minute: number, -// second: number, -// } -function waitMs(durationMs) { - return new Promise(resolve => setTimeout(resolve, durationMs)); -} -function uuidToBytes(uuid) { - return new Uint8Array(uuid.match(/[a-fA-F0-9]{2}/g).map((hex) => parseInt(hex, 16))); -} -// Base58 character set -const BASE58_ALPHABET = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'; -// Base58 encoding -// Base58 encoding -function encodeBase58(buffer) { - let carry; - const digits = [0]; - for (const byte of buffer) { - carry = byte; - for (let i = 0; i < digits.length; i++) { - carry += digits[i] << 8; - digits[i] = carry % 58; - carry = Math.floor(carry / 58); - } - while (carry > 0) { - digits.push(carry % 58); - carry = Math.floor(carry / 58); - } - } - let result = ''; - for (const digit of digits.reverse()) { - result += BASE58_ALPHABET[digit]; - } - // Handle leading zero bytes - for (const byte of buffer) { - if (byte === 0x00) { - result = BASE58_ALPHABET[0] + result; - } - else { - break; - } - } - return result; -} -// Convert UUID v4 to Base58 -function uuidToBase58(uuid) { - const bytes = uuidToBytes(uuid); - return encodeBase58(bytes); -} class Post { constructor(author, author_id, text, post_timestamp, imageData = null, importedFrom = null, importSource = null) { this.post_timestamp = post_timestamp; @@ -135,11 +79,11 @@ async function arrayBufferToBase64(buffer) { var bytes = new Uint8Array(buffer); return (await bytesToBase64DataUrl(bytes)).replace("data:application/octet-stream;base64,", ""); } -async function base64ToArrayBuffer(base64String) { - let response = await fetch("data:application/octet-stream;base64," + base64String); - let arrayBuffer = await response.arrayBuffer(); - return arrayBuffer; -} +// async function base64ToArrayBuffer(base64String: string) { +// let response = await fetch("data:application/octet-stream;base64," + base64String); +// let arrayBuffer = await response.arrayBuffer(); +// return arrayBuffer; +// } async function compressString(input) { // Convert the string to a Uint8Array const textEncoder = new TextEncoder(); @@ -155,387 +99,6 @@ async function compressString(input) { // Convert the compressed data to a Uint8Array return new Uint8Array(compressedArray); } -// Connect websocket -// send hello -// get bootstrap peer ID -// WebRTC connect to bootstrap peer -// Bootstrap peer will send the last N peers it saw. -// Connect to those new peers, tell those peers about users we know about -// ask for peers that have users we care about -// WebRTC Connect to peers that might have posts we need -// query those peers and do existing logic. -class wsConnection { - constructor(userID, peerID, IDsToSync, rtcPeerDescription) { - this.websocket = null; - this.sessionID = ""; - this.userID = ""; - this.peerID = ""; - this.websocketPingInterval = 0; - this.helloRefreshInterval = 0; - this.retry = 10; - this.state = 'disconnected'; - // peers: Map = new Map(); - this.messageHandlers = new Map(); - this.peerMessageHandlers = new Map(); - this.seenPeers = new Map(); - // static async compressArrayBuffer(data: ArrayBuffer): Promise { - // const compressionStream = new CompressionStream('gzip'); // You can also use 'deflate', 'deflate-raw', etc. - // const compressedStream = new Response( - // new Blob([data]).stream().pipeThrough(compressionStream) - // ); - // const compressedArrayBuffer = await compressedStream.arrayBuffer(); - // return compressedArrayBuffer; - // } - this.postBlockList = new Set([ - '1c71f53c-c467-48e4-bc8c-39005b37c0d5', - '64203497-f77b-40d6-9e76-34d17372e72a', - '243130d8-4a41-471e-8898-5075f1bd7aec', - 'e01eff89-5100-4b35-af4c-1c1bcb007dd0', - '194696a2-d850-4bb0-98f7-47416b3d1662', - 'f6b21eb1-a0ff-435b-8efc-6a3dd70c0dca', - 'dd1d92aa-aa24-4166-a925-94ba072a9048' - ]); - this.userBlockList = new Set([ - '5d63f0b2-a842-41bf-bf06-e0e4f6369271', - '5f1b85c4-b14c-454c-8df1-2cacc93f8a77', - // 'bba3ad24-9181-4e22-90c8-c265c80873ea' - ]); - // this.rtcPeerDescription = rtcPeerDescription; - this.sessionID = generateID(); - this.userID = userID; - this.peerID = peerID; - this.UserIDsToSync = new Set(IDsToSync); - this.messageHandlers.set('hello', this.helloResponseHandler.bind(this)); - this.messageHandlers.set('hello2', this.hello2ResponseHandler.bind(this)); - this.messageHandlers.set('pong', this.pongHandler); - this.messageHandlers.set('peer_message', this.peerMessageHandler.bind(this)); - // - this.peerMessageHandlers.set('get_post_ids_for_user', this.getPostIdsForUserHandler.bind(this)); - this.peerMessageHandlers.set('get_post_ids_for_user_response', this.getPostIdsForUserResponseHandler.bind(this)); - this.peerMessageHandlers.set('get_posts_for_user', this.getPostsForUserHandler.bind(this)); - this.peerMessageHandlers.set('get_posts_for_user_response', this.getPostsForUserReponseHandler.bind(this)); - // this.peerMessageHandlers.set('send_webrtc_offer', this.sendWebRTCOfferHandler.bind(this)); - // this.peerMessageHandlers.set('send_webrtc_offer_response', this.getPostIdsForUserResponseHandler.bind(this)); - window.addEventListener('beforeunload', () => this.disconnect()); - this.connect(); - } - // So we don't need custom logic everywhere we use this, I just wrapped it. - shouldSyncUserID(userID) { - if (app.isHeadless) { - return true; - } - return this.UserIDsToSync.has(userID); - } - async send(message) { - let json = ""; - try { - json = JSON.stringify(message); - // console.log.apply(null, log("*******", (await compressString(json)).byteLength, json.length); - } - catch (e) { - console.log.apply(null, log(e, "wsConnection send: Couldn't serialize message", message)); - } - // log(`ws->${json.slice(0, 240)}`) - this.websocket.send(json); - } - pongHandler(data) { - } - async sendWebRTCDescription(description) { - console.log.apply(null, log("description:", description)); - this.send({ type: "rtc_session_description", description: description }); - } - async getPostIdsForUserResponseHandler(data) { - // log(`getPostsForUserResponse: ${data}`) - let message = data.message; - console.log.apply(null, log(`Net: got ${message.post_ids.length} post IDs for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - ; - let startTime = app.timerStart(); - let postIds = await checkPostIds(message.user_id, message.post_ids); - console.log.apply(null, log(`ID Check for user ${logID(message.user_id)} took ${app.timerDelta().toFixed(2)}ms`)); - ; - console.log.apply(null, log(`Need ${postIds.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - ; - if (postIds.length === 0) { - return; - } - console.log.apply(null, log(`Net: Req ${postIds.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_posts_for_user", post_ids: postIds, user_id: message.user_id } }; - this.send(responseMessage); - } - async getPostIdsForUserHandler(data) { - let message = data.message; - let postIds = await getAllIds(message.user_id) ?? []; - postIds = postIds.filter((postID) => !this.postBlockList.has(postID)); - if (postIds.length === 0) { - console.log.apply(null, log(`Net: I know about user ${logID(message.user_id)} but I have 0 posts, so I'm not sending any to to peer ${logID(data.from)}`)); - ; - return; - } - console.log.apply(null, log(`Net: Sending ${postIds.length} post Ids for user ${logID(message.user_id)} to peer ${logID(data.from)}`)); - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_post_ids_for_user_response", post_ids: postIds, user_id: message.user_id } }; - this.send(responseMessage); - } - async broadcastNewPost(userID, post) { - let newPost = { ...post }; - if (post.image_data) { - newPost.image_data = await arrayBufferToBase64(post.image_data); - } - for (let [peerID, peerInfo] of this.seenPeers.entries()) { - console.log.apply(null, log(`broadcastNewPost: sending new post to ${logID(peerID)}:${peerInfo.peerName}:${peerInfo.userName}`)); - ; - this.sendPostsForUser(peerID, app.userID, [newPost]); - } - } - async sendPostsForUser(toPeerID, userID, posts) { - let responseMessage = { - type: "peer_message", - from: app.peerID, - to: toPeerID, - from_username: app.username, - from_peername: app.peername, - message: { - type: "get_posts_for_user_response", - posts: posts, - user_id: userID - } - }; - return this.send(responseMessage); - } - // Send posts to peer - async getPostsForUserHandler(data) { - let message = data.message; - let posts = await getPostsByIds(message.user_id, message.post_ids) ?? []; - console.log.apply(null, log(`Net: Sending ${posts.length} posts for user ${logID(message.user_id)} to peer ${logID(data.from)}`)); - ; - app.timerStart(); - let output = []; - console.log.apply(null, log("Serializing images")); - for (let post of posts) { - let newPost = post.data; - if (newPost.image_data) { - // let compressedData = await wsConnection.compressArrayBuffer(newPost.image_data); - // console.log.apply(null, log((newPost.image_data.byteLength - compressedData.byteLength) / 1024 / 1024); - // TODO don't do this, use Blobs direclty! - // https://developer.chrome.com/blog/blob-support-for-Indexeddb-landed-on-chrome-dev - newPost.image_data = await arrayBufferToBase64(newPost.image_data); - } - // let megs = JSON.stringify(newPost).length/1024/1024; - // console.log.apply(null, log(`getPostsForUserHandler id:${newPost.post_id} post length:${megs}`); - output.push(newPost); - } - let responseMessage = { type: "peer_message", from: app.peerID, to: data.from, from_username: app.username, from_peername: app.peername, message: { type: "get_posts_for_user_response", posts: output, user_id: message.user_id } }; - console.log.apply(null, log("Sending posts")); - await this.sendPostsForUser(data.from, message.user_id, output); - let sendTime = app.timerDelta(); - console.log.apply(null, log(`getPostsForUserHandler send took: ${sendTime.toFixed(2)}ms`)); - ; - } - // Got posts from peer - async getPostsForUserReponseHandler(data) { - app.timerStart(); - let message = data.message; - console.log.apply(null, log(`Net: got ${message.posts.length} posts for user ${logID(message.user_id)} from peer ${logID(data.from)}`)); - for (let post of message.posts) { - // HACK: Some posts have insanely large images, so I'm gonna skip them. - // Once we support delete then we we could delete these posts in a sensible way. - if (this.postBlockList.has(post.post_id)) { - console.log.apply(null, log(`Skipping blocked post: ${post.post_id}`)); - ; - continue; - } - // HACK - some posts had the wrong author ID - if (message.user_id === app.userID) { - post.author_id = app.userID; - } - post.post_timestamp = new Date(post.post_timestamp); - if (post.image_data) { - post.image_data = await base64ToArrayBuffer(post.image_data); - } - } - console.log.apply(null, log(`Merging same user peer posts...`)); - await mergeDataArray(message.user_id, data.message.posts); - let receiveTime = app.timerDelta(); - console.log.apply(null, log(`getPostsForUserReponseHandler receive took: ${receiveTime.toFixed(2)}ms`)); - ; - if (message.user_id === app.getPreferentialUserID() || app.following.has(message.user_id)) { - app.render(); - } - } - async peerMessageHandler(data) { - // log(`peerMessageHandler ${JSON.stringify(data)}`) - this.seenPeers.set(data.from, { peerName: data.from_peername, userName: data.from_username }); - let peerMessageType = data.message.type; - let handler = this.peerMessageHandlers.get(peerMessageType); - if (!handler) { - console.error(`got peer message type we don't have a handler for: ${peerMessageType}`); - return; - } - handler(data); - } - // Hello2 - // Goal, connect to bootstrap peer, ask bootstrap peer for peers that have posts from users that we care about. get peers, connect to those peers, sync. - // how? do "perfect negotiation" with bootstrap peer. All logic here moves to BP. - async sendHello2() { - this.send({ - type: "hello2", - user_id: this.userID, - user_name: app.username, - peer_id: this.peerID, - session_id: this.sessionID, - peer_name: app.peername, - is_bootstrap_peer: app.isBootstrapPeer, - // peer_description: this.rtcPeerDescription - }); - } - async getKnownUsers() { - let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined); - knownUsers = knownUsers - .filter(userID => this.shouldSyncUserID(userID)) - .filter(userID => !this.userBlockList.has(userID)) - .filter(async (userID) => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID. - } - async sendHello() { - // TODO only get users you're following here. ✅ - let knownUsers = [...(await indexedDB.databases())].map(db => db.name?.replace('user_', '')).filter(userID => userID !== undefined); - knownUsers = knownUsers - .filter(userID => this.shouldSyncUserID(userID)) - .filter(userID => !this.userBlockList.has(userID)) - .filter(async (userID) => (await getAllIds(userID)).length > 0); // TODO:EASYOPT getting all the IDs is unecessary, replace it with a test to get a single ID. - console.log.apply(null, log('Net: Sending known users', knownUsers.map(userID => logID(userID ?? "")))); - return await this.send({ type: "hello", user_id: this.userID, user_name: app.username, peer_id: this.peerID, peer_name: app.peername, known_users: knownUsers }); - } - hello2ResponseHandler(data) { - } - helloResponseHandler(data) { - let users = []; - let receivedUsers = Object.entries(data.userPeers); - console.log.apply(null, log(`Net: got ${receivedUsers.length} users from bootstrap peer.`)); - try { - let preferentialUserID = app.getPreferentialUserID(); - let currentUserPeers = data.userPeers[preferentialUserID]; - users.push([preferentialUserID, currentUserPeers]); - delete data.userPeers[preferentialUserID]; - } - catch (e) { - console.log.apply(null, log('helloResponseHandler', e)); - } - let getAllUsers = app.router.route !== App.Route.USER; - if (getAllUsers) { - users = [...users, ...Object.entries(data.userPeers).filter(userID => this.shouldSyncUserID(userID[0]))]; - } - // log(`Net: got ${users.length} users from bootstrap peer. \n${users.map((user)=>user[0]).join('\n')}`) - for (let [userID, peerIDs] of users) { - if (this.userBlockList.has(userID)) { - console.log.apply(null, log("Skipping user on blocklist:", userID)); - continue; - } - // this.peers.set(userID, [...peerIDs]); - for (let peerID of [...peerIDs]) { - if (peerID === this.peerID) { - continue; - } - console.log.apply(null, log(`Net: Req post IDs for user ${logID(userID)} from peer ${logID(peerID)}`)); - ; - this.send({ - type: "peer_message", - from: this.peerID, - from_username: app.username, - from_peername: app.peername, - to: peerID, - message: { type: "get_post_ids_for_user", user_id: userID } - }); - } - } - } - connect() { - if (this.websocket?.readyState === WebSocket.OPEN) { - return; - } - window.clearInterval(this.websocketPingInterval); - if (this.websocket) { - this.websocket.close(); - } - ; - try { - this.websocket = new WebSocket(`wss://${window.location.hostname}:${window.location.port}/ws`); - } - catch (error) { - console.log.apply(null, log(error.message)); - return; - } - this.websocket.onopen = async (event) => { - console.log.apply(null, log("ws:connected")); - ; - await this.sendHello2(); - // If we're running as a headless peer, send a hello message every N seconds to refresh the posts we have. - // let helloRefreshIntervalPeriod = 120; - // if (app.isHeadless) { - // console.log.apply(null, log("wsConnection: Setting hello refresh interval to ", helloRefreshIntervalPeriod) - // this.helloRefreshInterval = window.setInterval(() => { - // console.log.apply(null, log("wsConnection: Hello refresh.") - // if (!navigator.onLine) { - // return; - // } - // this.sendHello(); - // }, helloRefreshIntervalPeriod * 1000); - // } - this.websocketPingInterval = window.setInterval(() => { - if (!navigator.onLine) { - return; - } - this.send({ type: "ping", peer_id: this.peerID, peer_name: app.peername, user_id: app.userID, user_name: app.username }); - }, 10000); - }; - // this.websocket.onopen = async (event) => { - // console.log.apply(null, log("ws:connected"));; - // await this.sendHello(); - // // If we're running as a headless peer, send a hello message every N seconds to refresh the posts we have. - // let helloRefreshIntervalPeriod = 120; - // if (app.isHeadless) { - // console.log.apply(null, log("wsConnection: Setting hello refresh interval to ", helloRefreshIntervalPeriod) - // this.helloRefreshInterval = window.setInterval(() => { - // console.log.apply(null, log("wsConnection: Hello refresh.") - // if (!navigator.onLine) { - // return; - // } - // this.sendHello(); - // }, helloRefreshIntervalPeriod * 1000); - // } - // this.websocketPingInterval = window.setInterval(() => { - // if (!navigator.onLine) { - // return; - // } - // this.send({ type: "ping", peer_id: this.peerID, peer_name: app.peername, user_id: app.userID, user_name: app.username }); - // }, 10_000) - // }; - this.websocket.onclose = (event) => { - console.log.apply(null, log("ws:disconnected")); - ; - // this.retry *= 2; - console.log.apply(null, log(`Retrying in ${this.retry} seconds`)); - ; - window.setTimeout(() => { this.connect(); }, this.retry * 1000); - }; - this.websocket.onmessage = (event) => { - // log('ws:<-' + event.data.slice(0, 240)); - let data = JSON.parse(event.data); - let { type } = data; - let handler = this.messageHandlers.get(type); - if (!handler) { - console.warn(`Got a message we can't handle:`, type); - return; - } - handler(data); - }; - this.websocket.onerror = (event) => { - console.log.apply(null, log('ws:error: ' + event)); - ; - }; - } - disconnect() { - this.websocket?.close(); - } -} class App { constructor() { this.username = ''; @@ -550,13 +113,14 @@ class App { this.showLog = false; this.markedAvailable = false; this.limitPosts = 50; - this.websocket = null; + // websocket: wsConnection | null = null; // vizGraph: any | null = null; this.qrcode = null; this.connectURL = ""; this.firstRun = false; this.peerManager = null; this.sync = new Sync(); + this.renderTimer = 0; this.time = 0; this.animals = ['shrew', 'jerboa', 'lemur', 'weasel', 'possum', 'possum', 'marmoset', 'planigale', 'mole', 'narwhal']; this.adjectives = ['snazzy', 'whimsical', 'jazzy', 'bonkers', 'wobbly', 'spiffy', 'chirpy', 'zesty', 'bubbly', 'perky', 'sassy']; @@ -579,9 +143,9 @@ class App { } console.log.apply(null, log(`announceUsers from ${sendingPeerID}`, userIDs)); for (let userID of userIDs) { - console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); + // console.log.apply(null, log(`[app] announceUsers, got user:${userID} from peer ${sendingPeerID}`)); this.sync.addUserPeer(userID, sendingPeerID); - if (this.sync.shouldSyncUserID(userID)) { + if (this.sync.shouldSyncUserID(userID) || (this.router.route === App.Route.USER && userID === this.router.userID)) { let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); // console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); let neededPostIDs = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); @@ -637,17 +201,24 @@ class App { this.peerManager.registerRPC('getPostsForUser', async (requestingPeerID, userID, postIDs) => { let posts = await this.sync.getPostsForUser(userID, postIDs); for (let post of posts) { + console.log.apply(null, log(`[app] sendPostForUser sending post [${logID(post.id)}] to [${logID(requestingPeerID)}]`, userID, post)); this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post); } // return posts; // return postIDs; }); this.peerManager.registerRPC('sendPostForUser', async (userID, post) => { - console.log.apply(null, log(`[app] sendPostForUser`, userID, post)); + console.log.apply(null, log(`[app] sendPostForUser got post`, userID, post)); + // if (post.text === "image...") { + // debugger; + // } await this.sync.writePostForUser(userID, post); - if (userID === this.userID) { - await this.render(); + // if (userID === this.userID) { + if (this.renderTimer) { + clearTimeout(this.renderTimer); } + this.renderTimer = setTimeout(() => { this.render(); }, 1000); + // } }); await this.peerManager.connect(); console.log.apply(null, log("*************** after peerManager.connect")); @@ -917,7 +488,7 @@ class App { // this.posts.push(post); // localStorage.setItem(key, JSON.stringify(posts)); addData(userID, post); - this.websocket?.broadcastNewPost(userID, post); + // this.websocket?.broadcastNewPost(userID, post); this.render(); } getPeerID() { @@ -987,19 +558,19 @@ class App { textArea.style.fontFamily = fontName; textArea.style.fontSize = fontSize; } - initOffline(connection) { - // Event listener for going offline - window.addEventListener('offline', () => { - console.log.apply(null, log("offline")); - }); - // Event listener for going online - window.addEventListener('online', async () => { - console.log.apply(null, log("online")); - // connection.connect(); - this.render(); - }); - console.log.apply(null, log(`Online status: ${navigator.onLine ? "online" : "offline"}`)); - } + // initOffline(connection: wsConnection) { + // // Event listener for going offline + // window.addEventListener('offline', () => { + // console.log.apply(null, log("offline")); + // }); + // // Event listener for going online + // window.addEventListener('online', async () => { + // console.log.apply(null, log("online")); + // // connection.connect(); + // this.render(); + // }); + // console.log.apply(null, log(`Online status: ${navigator.onLine ? "online" : "offline"}`)); + // } selectFile(contentType) { return new Promise(resolve => { let input = document.createElement('input'); @@ -1175,33 +746,7 @@ class App { return posts; } async loadFollowersFromStorage(userID) { - // Rob - if (userID === 'b38b623c-c3fa-4351-9cab-50233c99fa4e') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', - '6d774268-16cd-4e86-8bbe-847a0328893d', // Sean - '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - 'bba3ad24-9181-4e22-90c8-c265c80873ea', // Harry - '8f6802be-c3b6-46c1-969c-5f90cbe01479', // Fiona - ]; - } - // Martin - if (userID === '05a495a0-0dd8-4186-94c3-b8309ba6fc4c') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - ]; - } - // Fiona - if (userID === '8f6802be-c3b6-46c1-969c-5f90cbe01479') { - return [ - 'b38b623c-c3fa-4351-9cab-50233c99fa4e', // Rob - 'a0e42390-08b5-4b07-bc2b-787f8e5f1297', // BMO - '05a495a0-0dd8-4186-94c3-b8309ba6fc4c', // Martin - ]; - } - return ['a0e42390-08b5-4b07-bc2b-787f8e5f1297']; // Follow BMO by default :) + return this.sync.getFollowing(userID); } async loadPostsFromStorage(userID, postID) { this.timerStart(); @@ -1300,13 +845,13 @@ class App { this.username = this.getUsername(); this.sync.setUserID(this.userID); this.sync.setArchive(this.isArchivePeer); - this.connect(); this.getRoute(); if (this.router.route === App.Route.CONNECT) { console.log.apply(null, log('connect', this.router.userID)); localStorage.setItem("dandelion_id", this.router.userID); localStorage.removeItem("dandelion_username"); } + this.connect(); await this.initDB(); this.connectURL = `${document.location.origin}/connect/${this.userID}`; document.getElementById('connectURL').innerHTML = `connect`; @@ -1530,7 +1075,7 @@ class App { const blob = new Blob([post.image_data]); const url = URL.createObjectURL(blob); image.onload = () => { - URL.revokeObjectURL(url); + // URL.revokeObjectURL(url); }; image.src = url; // image.src = image.src = "data:image/png;base64," + post.image;