Working chuking and image syncing. Still some bugs with syncing some large posts.

This commit is contained in:
2025-05-21 11:39:02 -07:00
parent 9df18d7f35
commit a3a9682f86
5 changed files with 396 additions and 1254 deletions

View File

@@ -221,20 +221,20 @@ export class PeerManager {
this.websocket = new WebSocket(wsURL);
// this.websocket.onclose = (e: CloseEvent) => {
// let closedUnexpectedly = !e.wasClean;
// let closedUnexpectedly = !e.wasClean;
// if (closedUnexpectedly) {
// console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`));
// // let alreadyReconnecting = this.reconnectTimer !== null;
// // if (!alreadyReconnecting) {
// this.reconnectTimer = globalThis.setTimeout(() => {
// console.log.apply(null, log(`Reconnecting web socket`));
// if (closedUnexpectedly) {
// console.log.apply(null, log(`Websocket closed unexpectedly. Will try to reconnect in ${this.reconnectPeriod} seconds`));
// // let alreadyReconnecting = this.reconnectTimer !== null;
// // if (!alreadyReconnecting) {
// this.reconnectTimer = globalThis.setTimeout(() => {
// console.log.apply(null, log(`Reconnecting web socket`));
// this.reconnectTimer = null;
// this.connectWebSocket();
// }, this.reconnectPeriod * 1000)
// };
// }
// this.reconnectTimer = null;
// this.connectWebSocket();
// }, this.reconnectPeriod * 1000)
// };
// }
// }
} catch (error: any) {
@@ -451,9 +451,10 @@ class PeerConnection {
private isSettingRemoteAnswerPending: boolean = false;
private polite = true;
private webRTCSuperlog = false;
private dataChannelSuperlog = true;
messageSuperlog: boolean = true;
rpcSuperlog: boolean = true;
private dataChannelSuperlog = false;
private chunkSize = (16 * 1024) - 100;
messageSuperlog: boolean = false;
rpcSuperlog: boolean = false;
pendingRPCs: Map<
string,
{ resolve: Function; reject: Function; functionName: string }
@@ -476,6 +477,9 @@ class PeerConnection {
// { urls: "stun:stun4.l.google.com" },
],
};
// longMessageQueue: string[] = [];
longMessages: Map<string, { messageChunks: string[], totalChunks: number, hash: string }> = new Map();
chunkSuperlog: boolean = false;
async RPCHandler(message: any) {
@@ -533,7 +537,7 @@ class PeerConnection {
this.peerManager.disconnectFromPeer(this.remotePeerID);
}
this.dataChannel.onerror = (e:RTCErrorEvent) => {
this.dataChannel.onerror = (e: RTCErrorEvent) => {
this.dataChannelSuperlog && console.log.apply(null, log(`datachannel from peer ${this.remotePeerID} error:`, e.error));
}
}
@@ -701,96 +705,177 @@ class PeerConnection {
this.rtcPeer = null;
}
send(message: any) {
let messageJSON = JSON.stringify(message);
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
async send(message: any) {
if (messageJSON.length > (32 * 1024)) {
this.messageSuperlog && console.log.apply(null, log(`[datachannel] Not sending long message: `, messageJSON.length));
return;
if (!this.dataChannel) {
throw new Error("Send called but datachannel is null");
}
this.dataChannel?.send(messageJSON);
while (this.dataChannel.bufferedAmount >= 8 * 1024 * 1024) {
await new Promise<void>((resolve, reject) => { setTimeout(()=> resolve(), 1000);
})
}
let messageJSON = JSON.stringify(message);
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-datachannel[${logID(this.peerManager.peerID)}]:`, message.type, message, `message size:${messageJSON.length}`));
if (messageJSON.length > this.chunkSize) {
this.messageSuperlog && console.log.apply(null, log(`[datachannel] sending long message: `, messageJSON.length));
this.sendLongMessage(messageJSON);
return;
}
try {
this.dataChannel?.send(messageJSON);
} catch (e) {
console.log.apply(null, log(e));
}
// this.onMessage(messageJSON);
}
call(functionName: string, args: any) {
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
// Think about a timeout here to auto reject it after a while.
let promise = new Promise((resolve, reject) => {
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
// setTimeout(() => reject("bad"), 1000);
});
let message = {
type: "rpc_call",
transaction_id: transactionID,
function_name: functionName,
args: args,
};
// Get a polyfill for browsers that don't have this API
async hashMessage(message: string) {
let msgUint8 = new TextEncoder().encode(message);
const hashBuffer = await crypto.subtle.digest("SHA-256", msgUint8);
const hashArray = Array.from(new Uint8Array(hashBuffer));
const hashHex = hashArray.map((b) => b.toString(16).padStart(2, "0")).join('');
return hashHex;
}
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
async sendLongMessage(message: string) {
// message = JSON.parse(message);
let chunkSize = this.chunkSize / 2;
// let chunkSize = 1024;
let chunks = Math.ceil(message!.length / chunkSize);
let messageID = generateID();
let hash = await this.hashMessage(message);
this.send(message);
for (let i = 0; i < chunks; i++) {
let offset = i * chunkSize;
let chunk = message?.substring(offset, offset + chunkSize);
// this.send(message?.substring(offset, offset + chunkSize-1));
// console.log("[chunk]", chunk);
let chunkHash = await this.hashMessage(chunk);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunkHash:${logID(chunkHash)} from:${logID(this.peerManager.peerID)} to:${logID(this.remotePeerID)} messageID:${logID(messageID)} hash:${logID(hash)} ${i + 1}/${chunks}`));
return promise;
let netMessage = { type: 'chunk', message_id: messageID, hash: hash, chunk_index: i, total_chunks: chunks, chunk: chunk, chunk_hash: chunkHash };
await this.send(netMessage);
}
}
call(functionName: string, args: any) {
let transactionID = generateID(); // make this faster as we will only ever have a small number of in-flight queries on a peer
// Think about a timeout here to auto reject it after a while.
let promise = new Promise((resolve, reject) => {
this.pendingRPCs.set(transactionID, { resolve, reject, functionName });
// setTimeout(() => reject("bad"), 1000);
});
let message = {
type: "rpc_call",
transaction_id: transactionID,
function_name: functionName,
args: args,
};
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}]`, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
this.send(message);
return promise;
}
async onMessage(messageJSON: any) {
let message: any = {};
try {
message = JSON.parse(messageJSON);
} catch (e) {
console.log.apply(null, log("PeerConnection.onMessage:", e));
}
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
let type = message.type;
if (type === "rpc_response") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
if (!pendingRPC) {
throw new Error();
}
pendingRPC.resolve(message.response);
}
if (type === "rpc_call") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `,message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
if (response === undefined) {
return;
}
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
this.send(responseMessage);
}
if (type === "initial_peers") {
for (let peerID of message.peers) {
console.log(log("Connecting to initial peer ", peerID));
this.peerManager.connectToPeer(peerID);
}
}
// this.peerManger.onMessage(this.remotePeerID, message);
let message: any = {};
try {
message = JSON.parse(messageJSON);
} catch (e) {
console.log.apply(null, log("PeerConnection.onMessage:", e));
}
this.messageSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->datachannel[${logID(this.peerManager.peerID)}]`, message.type, message));
let type = message.type;
if (type === "rpc_response") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]<-[rpc][${logID(this.peerManager.peerID)}] response: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let pendingRPC = this.pendingRPCs.get(message.transaction_id);
if (!pendingRPC) {
throw new Error();
}
pendingRPC.resolve(message.response);
}
if (type === "rpc_call") {
this.rpcSuperlog && console.log.apply(null, log(`[${logID(this.remotePeerID)}]->[rpc][${logID(this.peerManager.peerID)}] call: `, message.function_name, message.transaction_id, JSON.stringify(message.args, null, 2)));
let response = await this.peerManager.callFromRemote(message.function_name, message.args);
this.rpcSuperlog && console.log.apply(null, log(`[rpc] call: response:`, response));
if (response === undefined) {
return;
}
let responseMessage = { type: 'rpc_response', function_name: message.function_name, transaction_id: message.transaction_id, response: response };
this.send(responseMessage);
}
if (type === "initial_peers") {
for (let peerID of message.peers) {
console.log(log("Connecting to initial peer ", peerID));
this.peerManager.connectToPeer(peerID);
}
}
if (type === "chunk") {
let messageID = message.message_id;
if (!this.longMessages.has(messageID)) {
this.longMessages.set(messageID, { messageChunks: [], totalChunks: message.total_chunks, hash: message.hash });
}
let longMessage = this.longMessages.get(messageID);
if (!longMessage) {
return;
}
let chunkHash = await this.hashMessage(message.chunk_hash);
longMessage.messageChunks.push(message.chunk);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] chunked message sent chunkHash:${logID(message.chunk_hash)} computed hash: ${logID(chunkHash)} messageId:${logID(messageID)} chunk ${message.chunk_index + 1}/${longMessage.totalChunks}`));
if (message.chunk_index === longMessage.totalChunks - 1) {
let completeMessage = longMessage.messageChunks.join('');
let hash = await this.hashMessage(completeMessage);
this.chunkSuperlog && console.log.apply(null, log(`[chunk] hashes match: ${hash === longMessage.hash} sent hash: ${logID(longMessage.hash)} computed hash: ${logID(hash)}`));
if (hash !== longMessage.hash) {
throw new Error("[chunk] long message hashes don't match.");
}
this.onMessage(completeMessage);
this.longMessages.delete(messageID);
}
}
// this.peerManger.onMessage(this.remotePeerID, message);
}
}