From 338266a3c6694ee75d2a2780ad049c7c0c266b19 Mon Sep 17 00:00:00 2001 From: bobbydigitales Date: Mon, 19 May 2025 01:21:32 -0700 Subject: [PATCH] Post syncing logic is wokting. As expected datachannels fail to send posts above a certain size. Need to implement chunking next. --- src/PeerManager.ts | 4 +++ src/Sync.ts | 65 +++++++++++++++++++++++++++++++++++++++++++--- src/main2.ts | 36 ++++++++++++++++++++----- static/main2.js | 25 +++++++++++++++--- 4 files changed, 115 insertions(+), 15 deletions(-) diff --git a/src/PeerManager.ts b/src/PeerManager.ts index e1e89ae..f5631b4 100644 --- a/src/PeerManager.ts +++ b/src/PeerManager.ts @@ -532,6 +532,10 @@ class PeerConnection { this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} closed, disconnecting peer.`)); this.peerManager.disconnectFromPeer(this.remotePeerID); } + + this.dataChannel.onerror = (e:RTCErrorEvent) => { + this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error)); + } } async connect() { diff --git a/src/Sync.ts b/src/Sync.ts index e9343e4..27d7aa0 100644 --- a/src/Sync.ts +++ b/src/Sync.ts @@ -1,13 +1,37 @@ import { openDatabase, getData, addData, addDataArray, clearData, deleteData, mergeDataArray, getAllData, checkPostIds, getAllIds, getPostsByIds } from "db"; import { log, logID } from "log"; + +async function bytesToBase64DataUrl(bytes: Uint8Array, type = "application/octet-stream") { + return await new Promise((resolve, reject) => { + const reader = Object.assign(new FileReader(), { + onload: () => resolve(reader.result), + onerror: () => reject(reader.error), + }); + reader.readAsDataURL(new File([bytes], "", { type })); + }); +} + +async function arrayBufferToBase64(buffer: ArrayBuffer) { + var bytes = new Uint8Array(buffer); + return (await bytesToBase64DataUrl(bytes) as string).replace("data:application/octet-stream;base64,", ""); +} + +async function base64ToArrayBuffer(base64String: string) { + let response = await fetch("data:application/octet-stream;base64," + base64String); + let arrayBuffer = await response.arrayBuffer(); + return arrayBuffer; +} + + export class Sync { + isArchivePeer: boolean = false; userID: string = ""; - userPeers: Map = new Map(); + userPeers: Map> = new Map(); userIDsToSync: Set = new Set(); - syncSuperlog: boolean = true; + syncSuperlog: boolean = false; setArchive(isHeadless: boolean) { this.isArchivePeer = isHeadless; @@ -132,7 +156,7 @@ export class Sync { let postIds = await getAllIds(userID) ?? []; postIds = postIds.filter((postID: string) => !this.postBlockList.has(postID)); if (postIds.length === 0) { - console.log.apply(null, log(`Net: I know about user ${logID(userID)} but I have 0 posts`));; + console.log.apply(null, log(`Net: I know about user ${logID(userID)} but I have 0 posts`)); return null; } @@ -155,7 +179,40 @@ export class Sync { // return []; // } - return postIDs; + return neededPostIds; + } + + async getPostsForUser(userID:string, postIDs: string[]) { + let posts = await getPostsByIds(userID, postIDs) ?? []; + + console.log.apply(null, log(`[sync] got ${posts.length} posts for user ${logID(userID)}`));; + + // app.timerStart(); + let output = []; + + console.log.apply(null, log("Serializing images")); + for (let post of posts) { + let newPost = (post as any).data; + + if (newPost.image_data) { + // let compressedData = await wsConnection.compressArrayBuffer(newPost.image_data); + // console.log.apply(null, log((newPost.image_data.byteLength - compressedData.byteLength) / 1024 / 1024); + + // TODO don't do this, use Blobs direclty! + // https://developer.chrome.com/blog/blob-support-for-Indexeddb-landed-on-chrome-dev + + newPost.image_data = await arrayBufferToBase64(newPost.image_data); + + } + + // let megs = JSON.stringify(newPost).length/1024/1024; + // console.log.apply(null, log(`getPostsForUserHandler id:${newPost.post_id} post length:${megs}`); + output.push(newPost); + } + + return output; + + // console.log.apply(null, log(`getPostsForUser`,output)); } // async getPostIdsForUserHandler(data: any) { diff --git a/src/main2.ts b/src/main2.ts index 56978e5..a04cb85 100644 --- a/src/main2.ts +++ b/src/main2.ts @@ -755,10 +755,14 @@ class App { this.sync.addUserPeer(userID, sendingPeerID); if (this.sync.shouldSyncUserID(userID)) { let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); - console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); - let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + // console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPostIDs = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + // console.log.apply(null, log(`[app] announceUsers needed posts`, neededPostIDs)); - console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + if (neededPostIDs.length > 0) { + let neededPosts = await this.peerManager?.rpc.getPostsForUser(sendingPeerID, this.peerID, userID, neededPostIDs); + console.log(neededPosts); + } } }; @@ -806,17 +810,35 @@ class App { this.announceUser_rpc_response(sendingPeerID, userIDs); }); - this.peerManager.registerRPC('getPeersForUser', (userID: any) => { + this.peerManager.registerRPC('getPeersForUser', (userID: string) => { return [1, 2, 3, 4, 5]; }); - this.peerManager.registerRPC('getPostIDsForUser', async (userID: any) => { + this.peerManager.registerRPC('getPostIDsForUser', async (userID: string) => { let postIDs = await this.sync.getPostIdsForUser(userID); - - return postIDs; + if (postIDs) { + return postIDs; + } }); + this.peerManager.registerRPC('getPostsForUser', async (requestingPeerID:string, userID:string, postIDs: string[]) => { + let posts = await this.sync.getPostsForUser(userID, postIDs); + + + for (let post of posts) { + this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post); + } + // return posts; + + // return postIDs; + }); + + this.peerManager.registerRPC('sendPostForUser', async (requestingPeerID:string, userID:string, post:string) => { + console.log.apply(null, log(`[app] sendPostForUser to [${logID(requestingPeerID)}] `, userID, post)); + }); + + await this.peerManager.connect(); console.log.apply(null, log("*************** after peerManager.connect"));; diff --git a/static/main2.js b/static/main2.js index 1f9bec5..271ad60 100644 --- a/static/main2.js +++ b/static/main2.js @@ -583,9 +583,13 @@ class App { this.sync.addUserPeer(userID, sendingPeerID); if (this.sync.shouldSyncUserID(userID)) { let postIDs = await this.peerManager?.rpc.getPostIDsForUser(sendingPeerID, userID); - console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); - let neededPosts = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); - console.log.apply(null, log(`[app] announceUsers needed posts`, neededPosts)); + // console.log.apply(null, log(`[app] announceUsers response, gotPostIDs`, postIDs)); + let neededPostIDs = await this.sync.checkPostIds(userID, sendingPeerID, postIDs); + // console.log.apply(null, log(`[app] announceUsers needed posts`, neededPostIDs)); + if (neededPostIDs.length > 0) { + let neededPosts = await this.peerManager?.rpc.getPostsForUser(sendingPeerID, this.peerID, userID, neededPostIDs); + console.log(neededPosts); + } } } ; @@ -626,7 +630,20 @@ class App { }); this.peerManager.registerRPC('getPostIDsForUser', async (userID) => { let postIDs = await this.sync.getPostIdsForUser(userID); - return postIDs; + if (postIDs) { + return postIDs; + } + }); + this.peerManager.registerRPC('getPostsForUser', async (requestingPeerID, userID, postIDs) => { + let posts = await this.sync.getPostsForUser(userID, postIDs); + for (let post of posts) { + this.peerManager?.rpc.sendPostForUser(requestingPeerID, userID, post); + } + // return posts; + // return postIDs; + }); + this.peerManager.registerRPC('sendPostForUser', async (requestingPeerID, userID, post) => { + console.log.apply(null, log(`[app] sendPostForUser to [${logID(requestingPeerID)}] `, userID, post)); }); await this.peerManager.connect(); console.log.apply(null, log("*************** after peerManager.connect"));