Add optional WebTransport frame transport

This commit is contained in:
2026-06-24 21:28:15 -07:00
parent d37694e4d3
commit 3e2ca8057b
9 changed files with 829 additions and 31 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
node_modules/ node_modules/
data/recent-urls.json data/recent-urls.json
data/webtransport-*.pem
.env .env
.DS_Store .DS_Store
npm-debug.log* npm-debug.log*

View File

@@ -19,7 +19,7 @@ The app is plain Node/Express plus browser JavaScript:
- `server/index.js`: API, WebSocket, source proxy/relay, ffmpeg process lifecycle, recent URL and favorites persistence. - `server/index.js`: API, WebSocket, source proxy/relay, ffmpeg process lifecycle, recent URL and favorites persistence.
- `public/index.html`: frontend markup. - `public/index.html`: frontend markup.
- `public/app.js`: URL submission, WebSocket frame receiving, audio element coordination, canvas drawing, overlay controls. - `public/app.js`: URL submission, WebSocket/WebTransport frame receiving, audio element coordination, canvas drawing, overlay controls.
- `public/styles.css`: two-screen player UI. - `public/styles.css`: two-screen player UI.
- `Dockerfile`: production image with Node and ffmpeg. - `Dockerfile`: production image with Node and ffmpeg.
- `docker-compose-example.yml`: operational example and default env knobs. - `docker-compose-example.yml`: operational example and default env knobs.
@@ -34,6 +34,7 @@ Main public endpoints:
- `PUT /api/favorites`: replaces the global favorites list. Each favorite has a user-provided `title` and stream `url`. - `PUT /api/favorites`: replaces the global favorites list. Each favorite has a user-provided `title` and stream `url`.
- `GET /audio/:sessionId`: serves MP3 audio to the browser audio element. - `GET /audio/:sessionId`: serves MP3 audio to the browser audio element.
- `WS /frames/:sessionId`: sends timed JPEG frame packets to the browser. - `WS /frames/:sessionId`: sends timed JPEG frame packets to the browser.
- `WebTransport /frames/:sessionId`: optional QUIC frame path on the separate WebTransport UDP listener.
- `GET /api/health`: exposes basic health and active playback connection mode. - `GET /api/health`: exposes basic health and active playback connection mode.
Internal endpoint: Internal endpoint:
@@ -42,7 +43,7 @@ Internal endpoint:
## Browser Playback Model ## Browser Playback Model
Audio is the playback clock. The server sends JPEG frames over WebSocket. Each binary frame packet is: Audio is the playback clock. The server sends JPEG frames over WebSocket by default, or over WebTransport when explicitly enabled. Each binary frame packet is:
- First 8 bytes: little-endian float64 timestamp in seconds. - First 8 bytes: little-endian float64 timestamp in seconds.
- Remaining bytes: one complete JPEG image. - Remaining bytes: one complete JPEG image.
@@ -191,6 +192,15 @@ Runtime:
- `FAVORITES_PATH`: favorites JSON path. - `FAVORITES_PATH`: favorites JSON path.
- `FAVORITES_LIMIT`: favorites count, default `50`. - `FAVORITES_LIMIT`: favorites count, default `50`.
- `LOCAL_VIDEOS`: optional local video directory. When set, the UI shows `Play Local` and lists regular files under this directory recursively. - `LOCAL_VIDEOS`: optional local video directory. When set, the UI shows `Play Local` and lists regular files under this directory recursively.
- `FRAME_TRANSPORT`: `websocket`, `webtransport`, or `auto`, default `websocket`.
- `WEBTRANSPORT_ENABLED`: enables the WebTransport listener, default true only when `FRAME_TRANSPORT` is `webtransport` or `auto`.
- `WEBTRANSPORT_HOST`: WebTransport UDP bind host, default `0.0.0.0`.
- `WEBTRANSPORT_PORT`: WebTransport UDP port, default `PORT + 1`.
- `WEBTRANSPORT_PUBLIC_HOST`: optional host advertised to browsers for WebTransport.
- `WEBTRANSPORT_PUBLIC_PORT`: port advertised to browsers for WebTransport, default `WEBTRANSPORT_PORT`.
- `WEBTRANSPORT_CERT_PATH`: optional WebTransport certificate path. If unset, the server generates a short-lived ECDSA cert under `data/`.
- `WEBTRANSPORT_KEY_PATH`: optional WebTransport private key path. Must be set together with `WEBTRANSPORT_CERT_PATH`.
- Browser WebTransport requires a secure context. Localhost is usually treated as secure, but plain HTTP over a LAN address may not expose `window.WebTransport`; the frontend must fall back to WebSocket in that case.
- `DEFAULT_FPS`: default frame rate, fallback `24`, clamped `1..30`. - `DEFAULT_FPS`: default frame rate, fallback `24`, clamped `1..30`.
- `DEFAULT_FRAME_WIDTH`: default maximum frame width, fallback `960`, clamped `160..1920`. - `DEFAULT_FRAME_WIDTH`: default maximum frame width, fallback `960`, clamped `160..1920`.
- `JPEG_QUALITY`: default JPEG quality, fallback `7`, clamped `2..18`; lower is better for ffmpeg `-q:v`. - `JPEG_QUALITY`: default JPEG quality, fallback `7`, clamped `2..18`; lower is better for ffmpeg `-q:v`.

View File

@@ -1,4 +1,4 @@
FROM node:22-bookworm-slim FROM node:22-trixie-slim
ENV NODE_ENV=production ENV NODE_ENV=production
ENV PORT=3000 ENV PORT=3000
@@ -10,7 +10,7 @@ WORKDIR /app
ADD https://api.github.com/repos/yt-dlp/yt-dlp/commits/master /tmp/yt-dlp-master.json ADD https://api.github.com/repos/yt-dlp/yt-dlp/commits/master /tmp/yt-dlp-master.json
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends ca-certificates ffmpeg python3 python3-pip \ && apt-get install -y --no-install-recommends ca-certificates ffmpeg openssl python3 python3-pip \
&& python3 -m pip install --no-cache-dir --break-system-packages --upgrade "$YT_DLP_PIP_SPEC" \ && python3 -m pip install --no-cache-dir --break-system-packages --upgrade "$YT_DLP_PIP_SPEC" \
&& yt-dlp --version \ && yt-dlp --version \
&& apt-get purge -y --auto-remove python3-pip \ && apt-get purge -y --auto-remove python3-pip \
@@ -30,6 +30,7 @@ RUN mkdir -p /app/data \
USER node USER node
EXPOSE 3000 EXPOSE 3000
EXPOSE 3001/udp
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD node -e "fetch('http://127.0.0.1:' + (process.env.PORT || 3000) + '/api/health').then(r => process.exit(r.ok ? 0 : 1)).catch(() => process.exit(1))" CMD node -e "fetch('http://127.0.0.1:' + (process.env.PORT || 3000) + '/api/health').then(r => process.exit(r.ok ? 0 : 1)).catch(() => process.exit(1))"

View File

@@ -3,7 +3,7 @@
A small web app that plays a remote video stream without using browser video decoding. The server uses `ffmpeg` to decode the input URL into: A small web app that plays a remote video stream without using browser video decoding. The server uses `ffmpeg` to decode the input URL into:
- an MP3 audio stream served to a normal `<audio>` element - an MP3 audio stream served to a normal `<audio>` element
- timed JPEG image frames sent over a WebSocket and painted onto a `<canvas>` - timed JPEG image frames sent over a WebSocket, or optionally WebTransport/QUIC, and painted onto a `<canvas>`
This is meant for machines where image and audio decoding work but browser video decoding is unavailable or unreliable. This is meant for machines where image and audio decoding work but browser video decoding is unavailable or unreliable.
@@ -58,6 +58,8 @@ YouTube URLs are resolved server-side with `yt-dlp` before they enter the existi
JPEG frames are dropped when the browser WebSocket falls behind instead of letting stale frames queue indefinitely. Tune the server-side backlog cap with `MAX_WS_BUFFER_BYTES`; the default is `2097152`. JPEG frames are dropped when the browser WebSocket falls behind instead of letting stale frames queue indefinitely. Tune the server-side backlog cap with `MAX_WS_BUFFER_BYTES`; the default is `2097152`.
WebSocket remains the default frame transport. To try WebTransport/QUIC for frame delivery, set `FRAME_TRANSPORT=webtransport` or `FRAME_TRANSPORT=auto` and expose the WebTransport UDP port. The default WebTransport port is `PORT + 1`; override it with `WEBTRANSPORT_PORT` and, when needed behind Docker or a proxy, `WEBTRANSPORT_PUBLIC_HOST` and `WEBTRANSPORT_PUBLIC_PORT`. The server generates a short-lived local ECDSA certificate under `data/` unless `WEBTRANSPORT_CERT_PATH` and `WEBTRANSPORT_KEY_PATH` are both set. Browser WebTransport requires a secure context: localhost is usually allowed, but plain HTTP over a LAN address may not expose the API. Browsers that cannot connect with WebTransport fall back to WebSocket.
In single mode, audio output from `ffmpeg` is buffered before it is written to the browser so short HTTP backpressure pauses are less likely to stall frame generation. Tune the cap with `MAX_AUDIO_QUEUE_BYTES`; the default is `4194304`. In single mode, audio output from `ffmpeg` is buffered before it is written to the browser so short HTTP backpressure pauses are less likely to stall frame generation. Tune the cap with `MAX_AUDIO_QUEUE_BYTES`; the default is `4194304`.
Playback uses `PLAYBACK_CONNECTION_MODE=split` by default. The Docker Compose example sets `PLAYBACK_CONNECTION_MODE=relay` so IPTV-style streams can be tested with one upstream connection. Playback uses `PLAYBACK_CONNECTION_MODE=split` by default. The Docker Compose example sets `PLAYBACK_CONNECTION_MODE=relay` so IPTV-style streams can be tested with one upstream connection.

View File

@@ -6,6 +6,8 @@ services:
restart: unless-stopped restart: unless-stopped
ports: ports:
- "3000:3000" - "3000:3000"
# Uncomment to try WebTransport frame delivery over QUIC/UDP.
# - "3001:3001/udp"
environment: environment:
PORT: "3000" PORT: "3000"
NODE_ENV: production NODE_ENV: production
@@ -31,6 +33,10 @@ services:
MAX_WS_BUFFER_BYTES: "2097152" MAX_WS_BUFFER_BYTES: "2097152"
MAX_AUDIO_QUEUE_BYTES: "4194304" MAX_AUDIO_QUEUE_BYTES: "4194304"
MAX_RELAY_BRANCH_QUEUE_BYTES: "8388608" MAX_RELAY_BRANCH_QUEUE_BYTES: "8388608"
# websocket: default. webtransport: try QUIC frames first. auto: try QUIC when available, fall back to WebSocket.
FRAME_TRANSPORT: websocket
WEBTRANSPORT_PORT: "3001"
WEBTRANSPORT_PUBLIC_PORT: "3001"
RECENT_URLS_PATH: /app/data/recent-urls.json RECENT_URLS_PATH: /app/data/recent-urls.json
FAVORITES_PATH: /app/data/favorites.json FAVORITES_PATH: /app/data/favorites.json
# Set this and mount a host directory below to enable Play Local. # Set this and mount a host directory below to enable Play Local.

116
package-lock.json generated
View File

@@ -9,10 +9,89 @@
"version": "1.0.0", "version": "1.0.0",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@moq/web-transport": "^0.1.3",
"express": "^5.2.1", "express": "^5.2.1",
"ws": "^8.20.0" "ws": "^8.20.0"
} }
}, },
"node_modules/@moq/web-transport": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport/-/web-transport-0.1.3.tgz",
"integrity": "sha512-hg3mKWUJaEwtJDGf8Ck62XP8ya+7wyMV/9ycV1A+M3iVly3xBOTPHpT8BrpOLomsBIB9iXQY6Z85MjknhUJW8w==",
"license": "(MIT OR Apache-2.0)",
"optionalDependencies": {
"@moq/web-transport-darwin-arm64": "0.1.3",
"@moq/web-transport-darwin-x64": "0.1.3",
"@moq/web-transport-linux-arm64-gnu": "0.1.3",
"@moq/web-transport-linux-x64-gnu": "0.1.3",
"@moq/web-transport-win32-x64-msvc": "0.1.3"
}
},
"node_modules/@moq/web-transport-darwin-arm64": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport-darwin-arm64/-/web-transport-darwin-arm64-0.1.3.tgz",
"integrity": "sha512-MSKd2lmgjABS6A4CoaYA2m5XibJFUi3hzz/5TTrWAiRHU8t6WV5iHcPKLMblgXi+QSZ7iLlEL0tXzmN9u0qkJw==",
"cpu": [
"arm64"
],
"license": "(MIT OR Apache-2.0)",
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@moq/web-transport-darwin-x64": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport-darwin-x64/-/web-transport-darwin-x64-0.1.3.tgz",
"integrity": "sha512-Kml/EUk8EWiKNkBRcLR8JEpMOqxEpDEHHaDlLd+ro84BY/MAgiJGiVKebGZI6QP2C2OalfAqZZQ0Eh8bB6w62Q==",
"cpu": [
"x64"
],
"license": "(MIT OR Apache-2.0)",
"optional": true,
"os": [
"darwin"
]
},
"node_modules/@moq/web-transport-linux-arm64-gnu": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport-linux-arm64-gnu/-/web-transport-linux-arm64-gnu-0.1.3.tgz",
"integrity": "sha512-Y3/Uko9N3X1b68hTgKVrLwpbMgtbvert0ADvIoziE1wczMcoORsys6dE9APH1jeZaXYgLAN5sVf7fdSMFI6HAQ==",
"cpu": [
"arm64"
],
"license": "(MIT OR Apache-2.0)",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@moq/web-transport-linux-x64-gnu": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport-linux-x64-gnu/-/web-transport-linux-x64-gnu-0.1.3.tgz",
"integrity": "sha512-WVFRgrhrQUWEKM4uBgBwfz1vGmGOU7DtM38VqlW1+6+bEm6+a0jMJ/6HUBz1bEdzzKZsVcVoT66cZoymyvzjHA==",
"cpu": [
"x64"
],
"license": "(MIT OR Apache-2.0)",
"optional": true,
"os": [
"linux"
]
},
"node_modules/@moq/web-transport-win32-x64-msvc": {
"version": "0.1.3",
"resolved": "https://registry.npmjs.org/@moq/web-transport-win32-x64-msvc/-/web-transport-win32-x64-msvc-0.1.3.tgz",
"integrity": "sha512-yPWBiujSW4bOByb1kfVyBd28MWoLpKneqel4k5KqW941Ofc6H/czegvtmDiFswx1KH531EjKljmuyilH+2g8Rw==",
"cpu": [
"x64"
],
"license": "(MIT OR Apache-2.0)",
"optional": true,
"os": [
"win32"
]
},
"node_modules/accepts": { "node_modules/accepts": {
"version": "2.0.0", "version": "2.0.0",
"resolved": "https://registry.npmjs.org/accepts/-/accepts-2.0.0.tgz", "resolved": "https://registry.npmjs.org/accepts/-/accepts-2.0.0.tgz",
@@ -202,9 +281,9 @@
} }
}, },
"node_modules/es-object-atoms": { "node_modules/es-object-atoms": {
"version": "1.1.1", "version": "1.1.2",
"resolved": "https://registry.npmjs.org/es-object-atoms/-/es-object-atoms-1.1.1.tgz", "resolved": "https://registry.npmjs.org/es-object-atoms/-/es-object-atoms-1.1.2.tgz",
"integrity": "sha512-FGgH2h8zKNim9ljj7dankFPcICIK9Cp5bm+c2gQSYePhpaG5+esrLODihIorn+Pe6FGJzWhXQotPv73jTaldXA==", "integrity": "sha512-HWcBoN6NileqtSydK2FqHbS/LoDd2pqrnQHLyJzBj4kOp/ky2MWMN694xOfkK8/SnUsW2DH7EfyVlydKCsm1Zw==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"es-errors": "^1.3.0" "es-errors": "^1.3.0"
@@ -381,9 +460,9 @@
} }
}, },
"node_modules/hasown": { "node_modules/hasown": {
"version": "2.0.3", "version": "2.0.4",
"resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.3.tgz", "resolved": "https://registry.npmjs.org/hasown/-/hasown-2.0.4.tgz",
"integrity": "sha512-ej4AhfhfL2Q2zpMmLo7U1Uv9+PyhIZpgQLGT1F9miIGmiCJIoCgSmczFdrc97mWT4kVY72KA+WnnhJ5pghSvSg==", "integrity": "sha512-T2UbfbBEF32wiepXIsMlTW9+dDYC6wMh/t/vYA4tuOMKqWz/n3vr1NFSxQiyP+zk2mXsoMA/i/7qV6LKut1t1A==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"function-bind": "^1.1.2" "function-bind": "^1.1.2"
@@ -585,12 +664,13 @@
} }
}, },
"node_modules/qs": { "node_modules/qs": {
"version": "6.15.1", "version": "6.15.3",
"resolved": "https://registry.npmjs.org/qs/-/qs-6.15.1.tgz", "resolved": "https://registry.npmjs.org/qs/-/qs-6.15.3.tgz",
"integrity": "sha512-6YHEFRL9mfgcAvql/XhwTvf5jKcOiiupt2FiJxHkiX1z4j7WL8J/jRHYLluORvc1XxB5rV20KoeK00gVJamspg==", "integrity": "sha512-O9gl3zCl5h5blw1KGUzQKhA5oUXSl8rwUIM5o0S3nCXMliSvy5Dzx7/DJcI+SwgICv+IneSZwhBh1oSyEHA71A==",
"license": "BSD-3-Clause", "license": "BSD-3-Clause",
"dependencies": { "dependencies": {
"side-channel": "^1.1.0" "es-define-property": "^1.0.1",
"side-channel": "^1.1.1"
}, },
"engines": { "engines": {
"node": ">=0.6" "node": ">=0.6"
@@ -697,14 +777,14 @@
"license": "ISC" "license": "ISC"
}, },
"node_modules/side-channel": { "node_modules/side-channel": {
"version": "1.1.0", "version": "1.1.1",
"resolved": "https://registry.npmjs.org/side-channel/-/side-channel-1.1.0.tgz", "resolved": "https://registry.npmjs.org/side-channel/-/side-channel-1.1.1.tgz",
"integrity": "sha512-ZX99e6tRweoUXqR+VBrslhda51Nh5MTQwou5tnUDgbtyM0dBgmhEDtWGP/xbKn6hqfPRHujUNwz5fy/wbbhnpw==", "integrity": "sha512-6x6dK6zJdpTzF4sQeNYxwtvBzf6Eg4GtlesS94HOvTudUeyK2WXAaIfmDgsyslYrRBeFIlsi54AYsFGUuhmvrQ==",
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
"es-errors": "^1.3.0", "es-errors": "^1.3.0",
"object-inspect": "^1.13.3", "object-inspect": "^1.13.4",
"side-channel-list": "^1.0.0", "side-channel-list": "^1.0.1",
"side-channel-map": "^1.0.1", "side-channel-map": "^1.0.1",
"side-channel-weakmap": "^1.0.2" "side-channel-weakmap": "^1.0.2"
}, },
@@ -825,9 +905,9 @@
"license": "ISC" "license": "ISC"
}, },
"node_modules/ws": { "node_modules/ws": {
"version": "8.20.0", "version": "8.21.0",
"resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", "resolved": "https://registry.npmjs.org/ws/-/ws-8.21.0.tgz",
"integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", "integrity": "sha512-Vsp28b7DRcimFQvrqu2Wek3z1iYxDCWqHYB8Qsnk/S4RfaCQzPGPyBNuVjJV3cd6UiKtUtp6sNM77gWvzcCH+g==",
"license": "MIT", "license": "MIT",
"engines": { "engines": {
"node": ">=10.0.0" "node": ">=10.0.0"

View File

@@ -12,6 +12,7 @@
"author": "", "author": "",
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@moq/web-transport": "^0.1.3",
"express": "^5.2.1", "express": "^5.2.1",
"ws": "^8.20.0" "ws": "^8.20.0"
} }

View File

@@ -70,6 +70,13 @@ const PLAYBACK_RESTART_DELAY_MS = 750;
const MIN_RECOVERY_RESUME_SECONDS = 2; const MIN_RECOVERY_RESUME_SECONDS = 2;
const METADATA_REFRESH_ATTEMPTS = 20; const METADATA_REFRESH_ATTEMPTS = 20;
const METADATA_REFRESH_INTERVAL_MS = 650; const METADATA_REFRESH_INTERVAL_MS = 650;
const WT_STREAM_CONTROL_TO_CLIENT = 1;
const WT_STREAM_FRAME = 2;
const WT_STREAM_CONTROL_TO_SERVER = 3;
const FRAME_CONNECTION_OPEN = 1;
const FRAME_CONNECTION_CLOSING = 2;
const FRAME_CONNECTION_CLOSED = 3;
const textEncoder = new TextEncoder();
const state = { const state = {
generation: 0, generation: 0,
@@ -508,10 +515,26 @@ function connectPlaybackStreams() {
state.websocket = null; state.websocket = null;
} }
const websocketProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; void openPlaybackFrameConnection(session, streamGeneration);
const websocketUrl = `${websocketProtocol}//${window.location.host}/frames/${session.id}?g=${session.seekGeneration ?? 0}`; }
const websocket = new WebSocket(websocketUrl);
websocket.binaryType = 'arraybuffer'; async function openPlaybackFrameConnection(session, streamGeneration) {
let websocket;
try {
websocket = await createFrameConnection(session);
} catch (error) {
console.warn('Frame transport failed', error);
showPlayerMessage('Stream failed');
setControlsVisible(true);
return;
}
if (streamGeneration !== state.streamGeneration || state.session?.id !== session.id) {
websocket.close(1000, 'stale frame connection');
return;
}
state.websocket = websocket; state.websocket = websocket;
websocket.addEventListener('message', (event) => { websocket.addEventListener('message', (event) => {
@@ -548,6 +571,249 @@ function connectPlaybackStreams() {
startRenderLoop(); startRenderLoop();
} }
async function createFrameConnection(session) {
const preferred = session.frameTransport?.preferred ?? 'websocket';
const webTransportConfig = session.frameTransport?.webTransport;
const shouldTryWebTransport = (preferred === 'webtransport' || preferred === 'auto') && webTransportConfig && 'WebTransport' in window;
if (shouldTryWebTransport) {
try {
return await createWebTransportFrameConnection(session, webTransportConfig);
} catch (error) {
console.warn('WebTransport frame connection failed; falling back to WebSocket', error);
}
}
return createWebSocketFrameConnection(session);
}
function createWebSocketFrameConnection(session) {
const websocketProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const websocketUrl = `${websocketProtocol}//${window.location.host}/frames/${session.id}?g=${session.seekGeneration ?? 0}`;
const websocket = new WebSocket(websocketUrl);
websocket.binaryType = 'arraybuffer';
return websocket;
}
async function createWebTransportFrameConnection(session, config) {
const url = buildWebTransportFrameUrl(session, config);
const options = {
allowPooling: false,
congestionControl: 'low-latency',
};
if (config.certificateHash) {
options.serverCertificateHashes = [{
algorithm: config.certificateHashAlgorithm ?? 'sha-256',
value: base64ToArrayBuffer(config.certificateHash),
}];
}
const transport = new WebTransport(url, options);
await transport.ready;
return createWebTransportFrameConnectionAdapter(transport);
}
function createWebTransportFrameConnectionAdapter(transport) {
const events = new EventTarget();
let readyState = FRAME_CONNECTION_OPEN;
let clientControlWriterPromise = null;
let clientControlWrite = Promise.resolve();
const connection = {
get readyState() {
return readyState;
},
addEventListener(type, listener, options) {
events.addEventListener(type, listener, options);
},
removeEventListener(type, listener, options) {
events.removeEventListener(type, listener, options);
},
send(data) {
if (readyState !== FRAME_CONNECTION_OPEN) {
throw new Error('Frame connection is closed.');
}
clientControlWriterPromise ??= openWebTransportClientControlWriter(transport);
const payload = textEncoder.encode(`${String(data)}\n`);
clientControlWrite = clientControlWrite.then(async () => {
const writer = await clientControlWriterPromise;
await writer.write(payload);
}).catch((error) => {
dispatchFrameConnectionError(events, error);
});
},
close(code = 1000, reason = '') {
if (readyState === FRAME_CONNECTION_CLOSING || readyState === FRAME_CONNECTION_CLOSED) {
return;
}
readyState = FRAME_CONNECTION_CLOSING;
transport.close({ closeCode: code, reason });
},
};
void readWebTransportIncomingStreams(transport, events);
transport.closed.then(() => {
readyState = FRAME_CONNECTION_CLOSED;
events.dispatchEvent(new Event('close'));
}).catch(() => {
readyState = FRAME_CONNECTION_CLOSED;
events.dispatchEvent(new Event('close'));
});
return connection;
}
async function openWebTransportClientControlWriter(transport) {
const stream = await transport.createUnidirectionalStream();
const writer = stream.getWriter();
await writer.write(new Uint8Array([WT_STREAM_CONTROL_TO_SERVER]));
return writer;
}
async function readWebTransportIncomingStreams(transport, events) {
const reader = transport.incomingUnidirectionalStreams.getReader();
try {
while (true) {
const { value: stream, done } = await reader.read();
if (done) {
return;
}
void handleWebTransportIncomingStream(stream, events);
}
} catch (error) {
dispatchFrameConnectionError(events, error);
} finally {
reader.releaseLock();
}
}
async function handleWebTransportIncomingStream(stream, events) {
const reader = stream.getReader();
try {
const first = await reader.read();
if (first.done || !first.value || first.value.byteLength === 0) {
return;
}
const chunk = first.value;
const streamType = chunk[0];
const initialPayload = chunk.subarray(1);
if (streamType === WT_STREAM_CONTROL_TO_CLIENT) {
await readWebTransportControlStream(reader, initialPayload, events);
return;
}
if (streamType === WT_STREAM_FRAME) {
await readWebTransportFrameStream(reader, initialPayload, events);
}
} catch (error) {
dispatchFrameConnectionError(events, error);
} finally {
reader.releaseLock();
}
}
async function readWebTransportControlStream(reader, initialPayload, events) {
const decoder = new TextDecoder();
let pending = initialPayload.byteLength > 0 ? decoder.decode(initialPayload, { stream: true }) : '';
while (true) {
pending = dispatchWebTransportControlLines(pending, events);
const { value, done } = await reader.read();
if (done) {
pending += decoder.decode();
dispatchWebTransportControlLines(`${pending}\n`, events);
return;
}
pending += decoder.decode(value, { stream: true });
}
}
async function readWebTransportFrameStream(reader, initialPayload, events) {
const chunks = [];
let byteLength = 0;
if (initialPayload.byteLength > 0) {
chunks.push(initialPayload);
byteLength += initialPayload.byteLength;
}
while (true) {
const { value, done } = await reader.read();
if (done) {
const packet = new Uint8Array(byteLength);
let offset = 0;
for (const chunk of chunks) {
packet.set(chunk, offset);
offset += chunk.byteLength;
}
events.dispatchEvent(new MessageEvent('message', { data: packet.buffer }));
return;
}
chunks.push(value);
byteLength += value.byteLength;
}
}
function dispatchWebTransportControlLines(text, events) {
const lines = text.split(/\n/);
const pending = lines.pop() ?? '';
for (const line of lines) {
if (line.trim()) {
events.dispatchEvent(new MessageEvent('message', { data: line }));
}
}
return pending;
}
function dispatchFrameConnectionError(events, error) {
if (isExpectedWebTransportCloseError(error)) {
return;
}
console.warn('Frame connection error', error);
events.dispatchEvent(new Event('error'));
}
function isExpectedWebTransportCloseError(error) {
return error && String(error.message ?? error).toLowerCase().includes('session is closed');
}
function buildWebTransportFrameUrl(session, config) {
const host = config.host || window.location.hostname;
const port = config.port || window.location.port;
const urlHost = host.includes(':') && !host.startsWith('[') ? `[${host}]` : host;
return `https://${urlHost}:${port}/frames/${session.id}?g=${session.seekGeneration ?? 0}`;
}
function base64ToArrayBuffer(value) {
const binary = atob(value);
const bytes = new Uint8Array(binary.length);
for (let index = 0; index < binary.length; index += 1) {
bytes[index] = binary.charCodeAt(index);
}
return bytes.buffer;
}
async function playAudio() { async function playAudio() {
try { try {
await elements.audio.play(); await elements.audio.play();

View File

@@ -1,16 +1,21 @@
import { spawn } from 'node:child_process'; import { execFile, spawn } from 'node:child_process';
import { randomUUID } from 'node:crypto'; import { createHash, randomUUID, X509Certificate } from 'node:crypto';
import { EventEmitter } from 'node:events';
import fs from 'node:fs/promises'; import fs from 'node:fs/promises';
import { createServer } from 'node:http'; import { createServer } from 'node:http';
import { createRequire } from 'node:module';
import path from 'node:path'; import path from 'node:path';
import { Readable } from 'node:stream'; import { Readable } from 'node:stream';
import { fileURLToPath } from 'node:url'; import { fileURLToPath } from 'node:url';
import { promisify } from 'node:util';
import express from 'express'; import express from 'express';
import { WebSocket, WebSocketServer } from 'ws'; import { WebSocket, WebSocketServer } from 'ws';
const __dirname = path.dirname(fileURLToPath(import.meta.url)); const __dirname = path.dirname(fileURLToPath(import.meta.url));
const publicDir = path.join(__dirname, '..', 'public'); const publicDir = path.join(__dirname, '..', 'public');
const requireFromHere = createRequire(import.meta.url);
const execFileAsync = promisify(execFile);
const app = express(); const app = express();
const server = createServer(app); const server = createServer(app);
@@ -30,6 +35,7 @@ const FFMPEG_HTTP_RECONNECT_MAX_RETRIES = clampInteger(process.env.FFMPEG_HTTP_R
const FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR = process.env.FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR ?? '5xx'; const FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR = process.env.FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR ?? '5xx';
const FFMPEG_CAPABILITY_TIMEOUT_MS = 3000; const FFMPEG_CAPABILITY_TIMEOUT_MS = 3000;
const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE); const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE);
const FRAME_TRANSPORT = parseFrameTransport(process.env.FRAME_TRANSPORT);
const METADATA_PROBE_ENABLED = parseBoolean(process.env.METADATA_PROBE_ENABLED, PLAYBACK_CONNECTION_MODE !== 'relay'); const METADATA_PROBE_ENABLED = parseBoolean(process.env.METADATA_PROBE_ENABLED, PLAYBACK_CONNECTION_MODE !== 'relay');
const METADATA_PROBE_TIMEOUT_MS = clampInteger(process.env.METADATA_PROBE_TIMEOUT_MS, 4 * 1000, 1000, 30 * 1000); const METADATA_PROBE_TIMEOUT_MS = clampInteger(process.env.METADATA_PROBE_TIMEOUT_MS, 4 * 1000, 1000, 30 * 1000);
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json'); const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
@@ -37,6 +43,14 @@ const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
const FAVORITES_PATH = process.env.FAVORITES_PATH ?? path.join(__dirname, '..', 'data', 'favorites.json'); const FAVORITES_PATH = process.env.FAVORITES_PATH ?? path.join(__dirname, '..', 'data', 'favorites.json');
const FAVORITES_LIMIT = clampInteger(process.env.FAVORITES_LIMIT, 50, 1, 200); const FAVORITES_LIMIT = clampInteger(process.env.FAVORITES_LIMIT, 50, 1, 200);
const LOCAL_VIDEOS_ROOT = parseLocalVideosRoot(process.env.LOCAL_VIDEOS); const LOCAL_VIDEOS_ROOT = parseLocalVideosRoot(process.env.LOCAL_VIDEOS);
const WEBTRANSPORT_ENABLED = parseBoolean(process.env.WEBTRANSPORT_ENABLED, FRAME_TRANSPORT === 'webtransport' || FRAME_TRANSPORT === 'auto');
const WEBTRANSPORT_HOST = process.env.WEBTRANSPORT_HOST || '0.0.0.0';
const WEBTRANSPORT_PORT = clampInteger(process.env.WEBTRANSPORT_PORT, Math.min(PORT + 1, 65535), 1, 65535);
const WEBTRANSPORT_PUBLIC_HOST = process.env.WEBTRANSPORT_PUBLIC_HOST || '';
const WEBTRANSPORT_PUBLIC_PORT = clampInteger(process.env.WEBTRANSPORT_PUBLIC_PORT, WEBTRANSPORT_PORT, 1, 65535);
const WEBTRANSPORT_CERT_PATH = process.env.WEBTRANSPORT_CERT_PATH ?? path.join(__dirname, '..', 'data', 'webtransport-cert.pem');
const WEBTRANSPORT_KEY_PATH = process.env.WEBTRANSPORT_KEY_PATH ?? path.join(__dirname, '..', 'data', 'webtransport-key.pem');
const WEBTRANSPORT_CERT_RENEW_MS = 12 * 24 * 60 * 60 * 1000;
const SESSION_TTL_MS = 60 * 60 * 1000; const SESSION_TTL_MS = 60 * 60 * 1000;
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000; const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
const CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS = 0.25; const CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS = 0.25;
@@ -48,6 +62,9 @@ const MAX_YT_DLP_OUTPUT_BYTES = 8 * 1024 * 1024;
const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2); const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2);
const JPEG_SOI = Buffer.from([0xff, 0xd8]); const JPEG_SOI = Buffer.from([0xff, 0xd8]);
const JPEG_EOI = Buffer.from([0xff, 0xd9]); const JPEG_EOI = Buffer.from([0xff, 0xd9]);
const WT_STREAM_CONTROL_TO_CLIENT = 1;
const WT_STREAM_FRAME = 2;
const WT_STREAM_CONTROL_TO_SERVER = 3;
const BEST_EFFORT_RESUME_MAX_SECONDS = 30 * 24 * 60 * 60; const BEST_EFFORT_RESUME_MAX_SECONDS = 30 * 24 * 60 * 60;
const RECORDED_MEDIA_EXTENSIONS = new Set([ const RECORDED_MEDIA_EXTENSIONS = new Set([
'.avi', '.avi',
@@ -81,13 +98,23 @@ let favorites = [];
let favoritesWrite = Promise.resolve(); let favoritesWrite = Promise.resolve();
let ffmpegSupportsReconnectMaxRetries = false; let ffmpegSupportsReconnectMaxRetries = false;
let localVideosRealRootPromise = null; let localVideosRealRootPromise = null;
let webTransportServer = null;
let webTransportClientConfig = null;
app.disable('x-powered-by'); app.disable('x-powered-by');
app.use(express.json({ limit: '256kb' })); app.use(express.json({ limit: '256kb' }));
app.use(express.static(publicDir)); app.use(express.static(publicDir));
app.get('/api/health', (_request, response) => { app.get('/api/health', (_request, response) => {
response.json({ ok: true, ffmpeg: FFMPEG_PATH, playbackConnectionMode: PLAYBACK_CONNECTION_MODE }); response.json({
ok: true,
ffmpeg: FFMPEG_PATH,
playbackConnectionMode: PLAYBACK_CONNECTION_MODE,
frameTransport: FRAME_TRANSPORT,
webTransport: webTransportClientConfig
? { enabled: true, port: webTransportClientConfig.port }
: { enabled: false },
});
}); });
app.get('/api/recent-urls', (_request, response) => { app.get('/api/recent-urls', (_request, response) => {
@@ -463,12 +490,205 @@ await Promise.all([
loadRecentUrls(), loadRecentUrls(),
loadFavorites(), loadFavorites(),
detectFfmpegHttpReconnectSupport(), detectFfmpegHttpReconnectSupport(),
initializeWebTransportServer(),
]); ]);
server.listen(PORT, () => { server.listen(PORT, () => {
console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE}`); const webTransportStatus = webTransportClientConfig ? ` webTransportPort=${webTransportClientConfig.port}` : '';
console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE} frameTransport=${FRAME_TRANSPORT}${webTransportStatus}`);
}); });
async function initializeWebTransportServer() {
if (!WEBTRANSPORT_ENABLED) {
return;
}
try {
const { certPem, keyPem, certificateHash } = await loadWebTransportCertificate();
const { NapiServer } = loadWebTransportNativeBinding();
const bindAddress = formatHostPort(WEBTRANSPORT_HOST, WEBTRANSPORT_PORT);
webTransportServer = NapiServer.bind(bindAddress, certPem, keyPem);
webTransportClientConfig = {
host: WEBTRANSPORT_PUBLIC_HOST || null,
port: WEBTRANSPORT_PUBLIC_PORT,
certificateHash,
certificateHashAlgorithm: 'sha-256',
};
logInfo(`webtransport listening address=${bindAddress} publicPort=${WEBTRANSPORT_PUBLIC_PORT}`);
void acceptWebTransportFrameSessions();
} catch (error) {
webTransportServer = null;
webTransportClientConfig = null;
logWarn(`webtransport disabled error=${oneLine(error.message)}`);
}
}
async function loadWebTransportCertificate() {
const certPathConfigured = Boolean(process.env.WEBTRANSPORT_CERT_PATH);
const keyPathConfigured = Boolean(process.env.WEBTRANSPORT_KEY_PATH);
if (certPathConfigured !== keyPathConfigured) {
throw new Error('WEBTRANSPORT_CERT_PATH and WEBTRANSPORT_KEY_PATH must be set together.');
}
if (!certPathConfigured && !(await isAutoWebTransportCertificateFresh())) {
await generateWebTransportCertificate();
}
const [certPem, keyPem] = await Promise.all([
fs.readFile(WEBTRANSPORT_CERT_PATH),
fs.readFile(WEBTRANSPORT_KEY_PATH),
]);
return {
certPem,
keyPem,
certificateHash: getCertificateSha256Base64(certPem),
};
}
async function isAutoWebTransportCertificateFresh() {
try {
const [certStat, keyStat, certPem] = await Promise.all([
fs.stat(WEBTRANSPORT_CERT_PATH),
fs.stat(WEBTRANSPORT_KEY_PATH),
fs.readFile(WEBTRANSPORT_CERT_PATH),
]);
const certificate = new X509Certificate(certPem);
const expiresAt = Date.parse(certificate.validTo);
const newestMtime = Math.max(certStat.mtimeMs, keyStat.mtimeMs);
const now = Date.now();
return Number.isFinite(expiresAt) && expiresAt - now > 24 * 60 * 60 * 1000 && now - newestMtime < WEBTRANSPORT_CERT_RENEW_MS;
} catch {
return false;
}
}
async function generateWebTransportCertificate() {
const directory = path.dirname(WEBTRANSPORT_CERT_PATH);
const configPath = path.join(directory, `webtransport-openssl-${process.pid}.cnf`);
const opensslConfig = [
'[req]',
'distinguished_name=req_distinguished_name',
'x509_extensions=v3_req',
'prompt=no',
'[req_distinguished_name]',
'CN=localhost',
'[v3_req]',
'basicConstraints=critical,CA:FALSE',
'keyUsage=critical,digitalSignature',
'extendedKeyUsage=serverAuth',
'subjectAltName=@alt_names',
'[alt_names]',
'DNS.1=localhost',
'IP.1=127.0.0.1',
'IP.2=::1',
'',
].join('\n');
await fs.mkdir(directory, { recursive: true });
await fs.writeFile(configPath, opensslConfig, { mode: 0o600 });
try {
await execFileAsync('openssl', [
'ecparam',
'-name',
'prime256v1',
'-genkey',
'-noout',
'-out',
WEBTRANSPORT_KEY_PATH,
]);
await execFileAsync('openssl', [
'req',
'-new',
'-x509',
'-key',
WEBTRANSPORT_KEY_PATH,
'-out',
WEBTRANSPORT_CERT_PATH,
'-days',
'13',
'-config',
configPath,
]);
await fs.chmod(WEBTRANSPORT_KEY_PATH, 0o600);
logInfo(`webtransport generated short-lived ECDSA certificate path=${WEBTRANSPORT_CERT_PATH}`);
} finally {
await fs.rm(configPath, { force: true });
}
}
function loadWebTransportNativeBinding() {
const packageEntry = requireFromHere.resolve('@moq/web-transport');
const packageRoot = path.resolve(path.dirname(packageEntry), '..');
return requireFromHere(path.join(packageRoot, 'napi.cjs'));
}
function getCertificateSha256Base64(certPem) {
const certificate = new X509Certificate(certPem);
return createHash('sha256').update(Buffer.from(certificate.raw)).digest('base64');
}
async function acceptWebTransportFrameSessions() {
while (webTransportServer) {
let request = null;
try {
request = await webTransportServer.accept();
} catch (error) {
logWarn(`webtransport accept failed error=${oneLine(error.message)}`);
continue;
}
if (!request) {
return;
}
void handleWebTransportFrameRequest(request);
}
}
async function handleWebTransportFrameRequest(request) {
let rawUrl = '';
try {
rawUrl = await request.url;
const url = new URL(rawUrl, `https://localhost:${WEBTRANSPORT_PORT}`);
const match = url.pathname.match(/^\/frames\/([0-9a-f-]+)$/i);
const session = match ? getSession(match[1]) : null;
if (!session) {
await request.reject(404);
return;
}
const wtSession = await request.ok();
const connection = new WebTransportFrameConnection(wtSession, `webtransport:${shortId(session.id)}`);
const playbackMode = getSessionPlaybackConnectionMode(session);
logInfo(`webtransport frames attached id=${shortId(session.id)} mode=${playbackMode}`);
if (playbackMode === 'single' || playbackMode === 'relay') {
getOrCreatePlayback(session, playbackMode).attachFrames(connection);
return;
}
stopSharedPlaybackIfNeeded(session, playbackMode);
streamSplitFrames(connection, session);
} catch (error) {
logWarn(`webtransport request failed url=${oneLine(rawUrl)} error=${oneLine(error.message)}`);
try {
await request.reject(500);
} catch {
// The request may already have been accepted.
}
}
}
function parseSessionSource(body) { function parseSessionSource(body) {
if (typeof body?.localPath === 'string' && body.localPath.trim()) { if (typeof body?.localPath === 'string' && body.localPath.trim()) {
return { return {
@@ -914,6 +1134,23 @@ function parsePlaybackConnectionMode(value) {
return 'split'; return 'split';
} }
function parseFrameTransport(value) {
if (value === 'webtransport' || value === 'quic') {
return 'webtransport';
}
if (value === 'auto') {
return 'auto';
}
if (value === 'websocket' || value === 'ws' || value === undefined || value === '') {
return 'websocket';
}
console.warn(`Unknown FRAME_TRANSPORT "${value}", using websocket.`);
return 'websocket';
}
function clampInteger(value, fallback, min, max) { function clampInteger(value, fallback, min, max) {
const parsed = Number(value); const parsed = Number(value);
@@ -961,6 +1198,10 @@ function formatSessionPayload(session) {
seekable: isSessionSeekable(session), seekable: isSessionSeekable(session),
seekSeconds: session.seekSeconds, seekSeconds: session.seekSeconds,
seekGeneration: session.seekGeneration, seekGeneration: session.seekGeneration,
frameTransport: {
preferred: FRAME_TRANSPORT,
webTransport: webTransportClientConfig,
},
}; };
} }
@@ -2273,6 +2514,192 @@ function isWebSocketOpen(websocket) {
return websocket?.readyState === WebSocket.OPEN; return websocket?.readyState === WebSocket.OPEN;
} }
class WebTransportFrameConnection extends EventEmitter {
constructor(session, label) {
super();
this.session = session;
this.label = label;
this.readyState = WebSocket.OPEN;
this._bufferedAmount = 0;
this._closedEmitted = false;
this._controlStreamPromise = this.openControlStream();
this._controlWrite = Promise.resolve();
this.session.closed().then((info) => {
this.markClosed(info?.closeCode ?? 1000, info?.reason ?? '');
}).catch((error) => {
this.emitError(error);
this.markClosed(1011, 'webtransport closed');
});
void this.acceptClientControlStreams();
}
get bufferedAmount() {
return this._bufferedAmount;
}
send(data, options, callback) {
const done = typeof options === 'function' ? options : callback;
if (typeof data === 'string') {
this._controlWrite = this._controlWrite
.then(() => this.writeControl(data))
.then(() => done?.())
.catch((error) => {
this.emitError(error);
done?.(error);
});
return;
}
const packet = Buffer.isBuffer(data) ? data : Buffer.from(data);
this.writeFrame(packet).then(() => {
done?.();
}).catch((error) => {
this.emitError(error);
done?.(error);
});
}
close(code = 1000, reason = '') {
if (this.readyState === WebSocket.CLOSED || this.readyState === WebSocket.CLOSING) {
return;
}
this.readyState = WebSocket.CLOSING;
try {
this.session.close(code, reason);
} catch (error) {
this.emitError(error);
this.markClosed(code, reason);
}
}
async openControlStream() {
const stream = await this.session.openUni();
await stream.write(Buffer.from([WT_STREAM_CONTROL_TO_CLIENT]));
return stream;
}
async writeControl(data) {
if (this.readyState !== WebSocket.OPEN) {
return;
}
const stream = await this._controlStreamPromise;
await stream.write(Buffer.from(`${data}\n`, 'utf8'));
}
async writeFrame(packet) {
if (this.readyState !== WebSocket.OPEN) {
throw new Error('WebTransport frame connection is closed.');
}
this._bufferedAmount += packet.length;
try {
const stream = await this.session.openUni();
const payload = Buffer.allocUnsafe(packet.length + 1);
payload[0] = WT_STREAM_FRAME;
packet.copy(payload, 1);
await stream.write(payload);
await stream.finish();
} finally {
this._bufferedAmount = Math.max(0, this._bufferedAmount - packet.length);
}
}
async acceptClientControlStreams() {
while (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CLOSING) {
let stream;
try {
stream = await this.session.acceptUni();
} catch (error) {
if (this.readyState === WebSocket.OPEN) {
this.emitError(error);
}
return;
}
void this.readClientControlStream(stream);
}
}
async readClientControlStream(stream) {
let buffer = '';
let sawHeader = false;
try {
while (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CLOSING) {
const chunk = await stream.read(64 * 1024);
if (!chunk) {
this.flushClientControlBuffer(buffer);
return;
}
let data = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
if (!sawHeader) {
sawHeader = true;
if (data[0] !== WT_STREAM_CONTROL_TO_SERVER) {
return;
}
data = data.subarray(1);
}
buffer = this.consumeClientControlText(buffer + data.toString('utf8'));
}
} catch (error) {
if (this.readyState === WebSocket.OPEN) {
this.emitError(error);
}
}
}
consumeClientControlText(text) {
const lines = text.split(/\n/);
const pending = lines.pop() ?? '';
for (const line of lines) {
if (line.trim()) {
this.emit('message', Buffer.from(line, 'utf8'), false);
}
}
return pending;
}
flushClientControlBuffer(text) {
if (text.trim()) {
this.emit('message', Buffer.from(text, 'utf8'), false);
}
}
emitError(error) {
if (this.listenerCount('error') > 0) {
this.emit('error', error);
} else {
logWarn(`webtransport frame connection error kind=${this.label} error=${oneLine(error.message)}`);
}
}
markClosed(code, reason) {
if (this._closedEmitted) {
return;
}
this._closedEmitted = true;
this.readyState = WebSocket.CLOSED;
this.emit('close', code, reason);
}
}
function createJpegFrameParser(onFrame) { function createJpegFrameParser(onFrame) {
let collecting = false; let collecting = false;
let pendingMarkerByte = false; let pendingMarkerByte = false;
@@ -3149,6 +3576,10 @@ function sendJson(websocket, payload) {
} }
} }
function formatHostPort(host, port) {
return host.includes(':') && !host.startsWith('[') ? `[${host}]:${port}` : `${host}:${port}`;
}
function appendTail(current, chunk) { function appendTail(current, chunk) {
const next = current + chunk.toString('utf8'); const next = current + chunk.toString('utf8');
return next.length > 4000 ? next.slice(-4000) : next; return next.length > 4000 ? next.slice(-4000) : next;