Use persistent WebTransport frame stream

This commit is contained in:
2026-06-25 10:48:09 -07:00
parent 3e2ca8057b
commit f6e2b7edda
4 changed files with 79 additions and 25 deletions

View File

@@ -73,6 +73,8 @@ const METADATA_REFRESH_INTERVAL_MS = 650;
const WT_STREAM_CONTROL_TO_CLIENT = 1;
const WT_STREAM_FRAME = 2;
const WT_STREAM_CONTROL_TO_SERVER = 3;
const WT_FRAME_RECORD_HEADER_BYTES = 4;
const MAX_FRAME_PACKET_BYTES = 16 * 1024 * 1024;
const FRAME_CONNECTION_OPEN = 1;
const FRAME_CONNECTION_CLOSING = 2;
const FRAME_CONNECTION_CLOSED = 3;
@@ -741,35 +743,64 @@ async function readWebTransportControlStream(reader, initialPayload, events) {
}
async function readWebTransportFrameStream(reader, initialPayload, events) {
const chunks = [];
let byteLength = 0;
let pending = new Uint8Array(0);
if (initialPayload.byteLength > 0) {
chunks.push(initialPayload);
byteLength += initialPayload.byteLength;
pending = appendWebTransportFrameChunk(pending, initialPayload);
pending = dispatchWebTransportFrameRecords(pending, events);
}
while (true) {
const { value, done } = await reader.read();
if (done) {
const packet = new Uint8Array(byteLength);
let offset = 0;
for (const chunk of chunks) {
packet.set(chunk, offset);
offset += chunk.byteLength;
if (pending.byteLength > 0) {
throw new Error('WebTransport frame stream ended with a partial frame record.');
}
events.dispatchEvent(new MessageEvent('message', { data: packet.buffer }));
return;
}
chunks.push(value);
byteLength += value.byteLength;
pending = appendWebTransportFrameChunk(pending, value);
pending = dispatchWebTransportFrameRecords(pending, events);
}
}
function appendWebTransportFrameChunk(pending, chunk) {
if (pending.byteLength === 0) {
return chunk;
}
const merged = new Uint8Array(pending.byteLength + chunk.byteLength);
merged.set(pending, 0);
merged.set(chunk, pending.byteLength);
return merged;
}
function dispatchWebTransportFrameRecords(buffer, events) {
let offset = 0;
while (buffer.byteLength - offset >= WT_FRAME_RECORD_HEADER_BYTES) {
const packetLength = new DataView(buffer.buffer, buffer.byteOffset + offset, WT_FRAME_RECORD_HEADER_BYTES).getUint32(0, true);
if (packetLength <= 8 || packetLength > MAX_FRAME_PACKET_BYTES) {
throw new Error(`Invalid WebTransport frame packet length: ${packetLength}`);
}
const recordBytes = WT_FRAME_RECORD_HEADER_BYTES + packetLength;
if (buffer.byteLength - offset < recordBytes) {
break;
}
const packet = buffer.slice(offset + WT_FRAME_RECORD_HEADER_BYTES, offset + recordBytes);
events.dispatchEvent(new MessageEvent('message', { data: packet.buffer }));
offset += recordBytes;
}
return offset === buffer.byteLength ? new Uint8Array(0) : buffer.slice(offset);
}
function dispatchWebTransportControlLines(text, events) {
const lines = text.split(/\n/);
const pending = lines.pop() ?? '';