Revert "Add optional WebTransport frame transport"
This reverts commit 3e2ca8057b.
This commit is contained in:
439
server/index.js
439
server/index.js
@@ -1,21 +1,16 @@
|
||||
import { execFile, spawn } from 'node:child_process';
|
||||
import { createHash, randomUUID, X509Certificate } from 'node:crypto';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import { spawn } from 'node:child_process';
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import fs from 'node:fs/promises';
|
||||
import { createServer } from 'node:http';
|
||||
import { createRequire } from 'node:module';
|
||||
import path from 'node:path';
|
||||
import { Readable } from 'node:stream';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import { promisify } from 'node:util';
|
||||
|
||||
import express from 'express';
|
||||
import { WebSocket, WebSocketServer } from 'ws';
|
||||
|
||||
const __dirname = path.dirname(fileURLToPath(import.meta.url));
|
||||
const publicDir = path.join(__dirname, '..', 'public');
|
||||
const requireFromHere = createRequire(import.meta.url);
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
const app = express();
|
||||
const server = createServer(app);
|
||||
@@ -35,7 +30,6 @@ const FFMPEG_HTTP_RECONNECT_MAX_RETRIES = clampInteger(process.env.FFMPEG_HTTP_R
|
||||
const FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR = process.env.FFMPEG_HTTP_RECONNECT_ON_HTTP_ERROR ?? '5xx';
|
||||
const FFMPEG_CAPABILITY_TIMEOUT_MS = 3000;
|
||||
const PLAYBACK_CONNECTION_MODE = parsePlaybackConnectionMode(process.env.PLAYBACK_CONNECTION_MODE ?? process.env.PLAYBACK_MODE);
|
||||
const FRAME_TRANSPORT = parseFrameTransport(process.env.FRAME_TRANSPORT);
|
||||
const METADATA_PROBE_ENABLED = parseBoolean(process.env.METADATA_PROBE_ENABLED, PLAYBACK_CONNECTION_MODE !== 'relay');
|
||||
const METADATA_PROBE_TIMEOUT_MS = clampInteger(process.env.METADATA_PROBE_TIMEOUT_MS, 4 * 1000, 1000, 30 * 1000);
|
||||
const RECENT_URLS_PATH = process.env.RECENT_URLS_PATH ?? path.join(__dirname, '..', 'data', 'recent-urls.json');
|
||||
@@ -43,14 +37,6 @@ const RECENT_URL_LIMIT = clampInteger(process.env.RECENT_URL_LIMIT, 12, 1, 50);
|
||||
const FAVORITES_PATH = process.env.FAVORITES_PATH ?? path.join(__dirname, '..', 'data', 'favorites.json');
|
||||
const FAVORITES_LIMIT = clampInteger(process.env.FAVORITES_LIMIT, 50, 1, 200);
|
||||
const LOCAL_VIDEOS_ROOT = parseLocalVideosRoot(process.env.LOCAL_VIDEOS);
|
||||
const WEBTRANSPORT_ENABLED = parseBoolean(process.env.WEBTRANSPORT_ENABLED, FRAME_TRANSPORT === 'webtransport' || FRAME_TRANSPORT === 'auto');
|
||||
const WEBTRANSPORT_HOST = process.env.WEBTRANSPORT_HOST || '0.0.0.0';
|
||||
const WEBTRANSPORT_PORT = clampInteger(process.env.WEBTRANSPORT_PORT, Math.min(PORT + 1, 65535), 1, 65535);
|
||||
const WEBTRANSPORT_PUBLIC_HOST = process.env.WEBTRANSPORT_PUBLIC_HOST || '';
|
||||
const WEBTRANSPORT_PUBLIC_PORT = clampInteger(process.env.WEBTRANSPORT_PUBLIC_PORT, WEBTRANSPORT_PORT, 1, 65535);
|
||||
const WEBTRANSPORT_CERT_PATH = process.env.WEBTRANSPORT_CERT_PATH ?? path.join(__dirname, '..', 'data', 'webtransport-cert.pem');
|
||||
const WEBTRANSPORT_KEY_PATH = process.env.WEBTRANSPORT_KEY_PATH ?? path.join(__dirname, '..', 'data', 'webtransport-key.pem');
|
||||
const WEBTRANSPORT_CERT_RENEW_MS = 12 * 24 * 60 * 60 * 1000;
|
||||
const SESSION_TTL_MS = 60 * 60 * 1000;
|
||||
const PLAYBACK_READY_TIMEOUT_MS = 15 * 1000;
|
||||
const CLIENT_CLOCK_FRAME_LATE_GRACE_SECONDS = 0.25;
|
||||
@@ -62,9 +48,6 @@ const MAX_YT_DLP_OUTPUT_BYTES = 8 * 1024 * 1024;
|
||||
const RELAY_BRANCH_PAUSE_BYTES = Math.floor(MAX_RELAY_BRANCH_QUEUE_BYTES / 2);
|
||||
const JPEG_SOI = Buffer.from([0xff, 0xd8]);
|
||||
const JPEG_EOI = Buffer.from([0xff, 0xd9]);
|
||||
const WT_STREAM_CONTROL_TO_CLIENT = 1;
|
||||
const WT_STREAM_FRAME = 2;
|
||||
const WT_STREAM_CONTROL_TO_SERVER = 3;
|
||||
const BEST_EFFORT_RESUME_MAX_SECONDS = 30 * 24 * 60 * 60;
|
||||
const RECORDED_MEDIA_EXTENSIONS = new Set([
|
||||
'.avi',
|
||||
@@ -98,23 +81,13 @@ let favorites = [];
|
||||
let favoritesWrite = Promise.resolve();
|
||||
let ffmpegSupportsReconnectMaxRetries = false;
|
||||
let localVideosRealRootPromise = null;
|
||||
let webTransportServer = null;
|
||||
let webTransportClientConfig = null;
|
||||
|
||||
app.disable('x-powered-by');
|
||||
app.use(express.json({ limit: '256kb' }));
|
||||
app.use(express.static(publicDir));
|
||||
|
||||
app.get('/api/health', (_request, response) => {
|
||||
response.json({
|
||||
ok: true,
|
||||
ffmpeg: FFMPEG_PATH,
|
||||
playbackConnectionMode: PLAYBACK_CONNECTION_MODE,
|
||||
frameTransport: FRAME_TRANSPORT,
|
||||
webTransport: webTransportClientConfig
|
||||
? { enabled: true, port: webTransportClientConfig.port }
|
||||
: { enabled: false },
|
||||
});
|
||||
response.json({ ok: true, ffmpeg: FFMPEG_PATH, playbackConnectionMode: PLAYBACK_CONNECTION_MODE });
|
||||
});
|
||||
|
||||
app.get('/api/recent-urls', (_request, response) => {
|
||||
@@ -490,205 +463,12 @@ await Promise.all([
|
||||
loadRecentUrls(),
|
||||
loadFavorites(),
|
||||
detectFfmpegHttpReconnectSupport(),
|
||||
initializeWebTransportServer(),
|
||||
]);
|
||||
|
||||
server.listen(PORT, () => {
|
||||
const webTransportStatus = webTransportClientConfig ? ` webTransportPort=${webTransportClientConfig.port}` : '';
|
||||
console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE} frameTransport=${FRAME_TRANSPORT}${webTransportStatus}`);
|
||||
console.log(`Frame stream app listening at http://localhost:${PORT} mode=${PLAYBACK_CONNECTION_MODE}`);
|
||||
});
|
||||
|
||||
async function initializeWebTransportServer() {
|
||||
if (!WEBTRANSPORT_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const { certPem, keyPem, certificateHash } = await loadWebTransportCertificate();
|
||||
const { NapiServer } = loadWebTransportNativeBinding();
|
||||
const bindAddress = formatHostPort(WEBTRANSPORT_HOST, WEBTRANSPORT_PORT);
|
||||
|
||||
webTransportServer = NapiServer.bind(bindAddress, certPem, keyPem);
|
||||
webTransportClientConfig = {
|
||||
host: WEBTRANSPORT_PUBLIC_HOST || null,
|
||||
port: WEBTRANSPORT_PUBLIC_PORT,
|
||||
certificateHash,
|
||||
certificateHashAlgorithm: 'sha-256',
|
||||
};
|
||||
|
||||
logInfo(`webtransport listening address=${bindAddress} publicPort=${WEBTRANSPORT_PUBLIC_PORT}`);
|
||||
void acceptWebTransportFrameSessions();
|
||||
} catch (error) {
|
||||
webTransportServer = null;
|
||||
webTransportClientConfig = null;
|
||||
logWarn(`webtransport disabled error=${oneLine(error.message)}`);
|
||||
}
|
||||
}
|
||||
|
||||
async function loadWebTransportCertificate() {
|
||||
const certPathConfigured = Boolean(process.env.WEBTRANSPORT_CERT_PATH);
|
||||
const keyPathConfigured = Boolean(process.env.WEBTRANSPORT_KEY_PATH);
|
||||
|
||||
if (certPathConfigured !== keyPathConfigured) {
|
||||
throw new Error('WEBTRANSPORT_CERT_PATH and WEBTRANSPORT_KEY_PATH must be set together.');
|
||||
}
|
||||
|
||||
if (!certPathConfigured && !(await isAutoWebTransportCertificateFresh())) {
|
||||
await generateWebTransportCertificate();
|
||||
}
|
||||
|
||||
const [certPem, keyPem] = await Promise.all([
|
||||
fs.readFile(WEBTRANSPORT_CERT_PATH),
|
||||
fs.readFile(WEBTRANSPORT_KEY_PATH),
|
||||
]);
|
||||
|
||||
return {
|
||||
certPem,
|
||||
keyPem,
|
||||
certificateHash: getCertificateSha256Base64(certPem),
|
||||
};
|
||||
}
|
||||
|
||||
async function isAutoWebTransportCertificateFresh() {
|
||||
try {
|
||||
const [certStat, keyStat, certPem] = await Promise.all([
|
||||
fs.stat(WEBTRANSPORT_CERT_PATH),
|
||||
fs.stat(WEBTRANSPORT_KEY_PATH),
|
||||
fs.readFile(WEBTRANSPORT_CERT_PATH),
|
||||
]);
|
||||
const certificate = new X509Certificate(certPem);
|
||||
const expiresAt = Date.parse(certificate.validTo);
|
||||
const newestMtime = Math.max(certStat.mtimeMs, keyStat.mtimeMs);
|
||||
const now = Date.now();
|
||||
|
||||
return Number.isFinite(expiresAt) && expiresAt - now > 24 * 60 * 60 * 1000 && now - newestMtime < WEBTRANSPORT_CERT_RENEW_MS;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function generateWebTransportCertificate() {
|
||||
const directory = path.dirname(WEBTRANSPORT_CERT_PATH);
|
||||
const configPath = path.join(directory, `webtransport-openssl-${process.pid}.cnf`);
|
||||
const opensslConfig = [
|
||||
'[req]',
|
||||
'distinguished_name=req_distinguished_name',
|
||||
'x509_extensions=v3_req',
|
||||
'prompt=no',
|
||||
'[req_distinguished_name]',
|
||||
'CN=localhost',
|
||||
'[v3_req]',
|
||||
'basicConstraints=critical,CA:FALSE',
|
||||
'keyUsage=critical,digitalSignature',
|
||||
'extendedKeyUsage=serverAuth',
|
||||
'subjectAltName=@alt_names',
|
||||
'[alt_names]',
|
||||
'DNS.1=localhost',
|
||||
'IP.1=127.0.0.1',
|
||||
'IP.2=::1',
|
||||
'',
|
||||
].join('\n');
|
||||
|
||||
await fs.mkdir(directory, { recursive: true });
|
||||
await fs.writeFile(configPath, opensslConfig, { mode: 0o600 });
|
||||
|
||||
try {
|
||||
await execFileAsync('openssl', [
|
||||
'ecparam',
|
||||
'-name',
|
||||
'prime256v1',
|
||||
'-genkey',
|
||||
'-noout',
|
||||
'-out',
|
||||
WEBTRANSPORT_KEY_PATH,
|
||||
]);
|
||||
await execFileAsync('openssl', [
|
||||
'req',
|
||||
'-new',
|
||||
'-x509',
|
||||
'-key',
|
||||
WEBTRANSPORT_KEY_PATH,
|
||||
'-out',
|
||||
WEBTRANSPORT_CERT_PATH,
|
||||
'-days',
|
||||
'13',
|
||||
'-config',
|
||||
configPath,
|
||||
]);
|
||||
await fs.chmod(WEBTRANSPORT_KEY_PATH, 0o600);
|
||||
logInfo(`webtransport generated short-lived ECDSA certificate path=${WEBTRANSPORT_CERT_PATH}`);
|
||||
} finally {
|
||||
await fs.rm(configPath, { force: true });
|
||||
}
|
||||
}
|
||||
|
||||
function loadWebTransportNativeBinding() {
|
||||
const packageEntry = requireFromHere.resolve('@moq/web-transport');
|
||||
const packageRoot = path.resolve(path.dirname(packageEntry), '..');
|
||||
return requireFromHere(path.join(packageRoot, 'napi.cjs'));
|
||||
}
|
||||
|
||||
function getCertificateSha256Base64(certPem) {
|
||||
const certificate = new X509Certificate(certPem);
|
||||
return createHash('sha256').update(Buffer.from(certificate.raw)).digest('base64');
|
||||
}
|
||||
|
||||
async function acceptWebTransportFrameSessions() {
|
||||
while (webTransportServer) {
|
||||
let request = null;
|
||||
|
||||
try {
|
||||
request = await webTransportServer.accept();
|
||||
} catch (error) {
|
||||
logWarn(`webtransport accept failed error=${oneLine(error.message)}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!request) {
|
||||
return;
|
||||
}
|
||||
|
||||
void handleWebTransportFrameRequest(request);
|
||||
}
|
||||
}
|
||||
|
||||
async function handleWebTransportFrameRequest(request) {
|
||||
let rawUrl = '';
|
||||
|
||||
try {
|
||||
rawUrl = await request.url;
|
||||
const url = new URL(rawUrl, `https://localhost:${WEBTRANSPORT_PORT}`);
|
||||
const match = url.pathname.match(/^\/frames\/([0-9a-f-]+)$/i);
|
||||
const session = match ? getSession(match[1]) : null;
|
||||
|
||||
if (!session) {
|
||||
await request.reject(404);
|
||||
return;
|
||||
}
|
||||
|
||||
const wtSession = await request.ok();
|
||||
const connection = new WebTransportFrameConnection(wtSession, `webtransport:${shortId(session.id)}`);
|
||||
const playbackMode = getSessionPlaybackConnectionMode(session);
|
||||
logInfo(`webtransport frames attached id=${shortId(session.id)} mode=${playbackMode}`);
|
||||
|
||||
if (playbackMode === 'single' || playbackMode === 'relay') {
|
||||
getOrCreatePlayback(session, playbackMode).attachFrames(connection);
|
||||
return;
|
||||
}
|
||||
|
||||
stopSharedPlaybackIfNeeded(session, playbackMode);
|
||||
streamSplitFrames(connection, session);
|
||||
} catch (error) {
|
||||
logWarn(`webtransport request failed url=${oneLine(rawUrl)} error=${oneLine(error.message)}`);
|
||||
|
||||
try {
|
||||
await request.reject(500);
|
||||
} catch {
|
||||
// The request may already have been accepted.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function parseSessionSource(body) {
|
||||
if (typeof body?.localPath === 'string' && body.localPath.trim()) {
|
||||
return {
|
||||
@@ -1134,23 +914,6 @@ function parsePlaybackConnectionMode(value) {
|
||||
return 'split';
|
||||
}
|
||||
|
||||
function parseFrameTransport(value) {
|
||||
if (value === 'webtransport' || value === 'quic') {
|
||||
return 'webtransport';
|
||||
}
|
||||
|
||||
if (value === 'auto') {
|
||||
return 'auto';
|
||||
}
|
||||
|
||||
if (value === 'websocket' || value === 'ws' || value === undefined || value === '') {
|
||||
return 'websocket';
|
||||
}
|
||||
|
||||
console.warn(`Unknown FRAME_TRANSPORT "${value}", using websocket.`);
|
||||
return 'websocket';
|
||||
}
|
||||
|
||||
function clampInteger(value, fallback, min, max) {
|
||||
const parsed = Number(value);
|
||||
|
||||
@@ -1198,10 +961,6 @@ function formatSessionPayload(session) {
|
||||
seekable: isSessionSeekable(session),
|
||||
seekSeconds: session.seekSeconds,
|
||||
seekGeneration: session.seekGeneration,
|
||||
frameTransport: {
|
||||
preferred: FRAME_TRANSPORT,
|
||||
webTransport: webTransportClientConfig,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2514,192 +2273,6 @@ function isWebSocketOpen(websocket) {
|
||||
return websocket?.readyState === WebSocket.OPEN;
|
||||
}
|
||||
|
||||
class WebTransportFrameConnection extends EventEmitter {
|
||||
constructor(session, label) {
|
||||
super();
|
||||
this.session = session;
|
||||
this.label = label;
|
||||
this.readyState = WebSocket.OPEN;
|
||||
this._bufferedAmount = 0;
|
||||
this._closedEmitted = false;
|
||||
this._controlStreamPromise = this.openControlStream();
|
||||
this._controlWrite = Promise.resolve();
|
||||
|
||||
this.session.closed().then((info) => {
|
||||
this.markClosed(info?.closeCode ?? 1000, info?.reason ?? '');
|
||||
}).catch((error) => {
|
||||
this.emitError(error);
|
||||
this.markClosed(1011, 'webtransport closed');
|
||||
});
|
||||
|
||||
void this.acceptClientControlStreams();
|
||||
}
|
||||
|
||||
get bufferedAmount() {
|
||||
return this._bufferedAmount;
|
||||
}
|
||||
|
||||
send(data, options, callback) {
|
||||
const done = typeof options === 'function' ? options : callback;
|
||||
|
||||
if (typeof data === 'string') {
|
||||
this._controlWrite = this._controlWrite
|
||||
.then(() => this.writeControl(data))
|
||||
.then(() => done?.())
|
||||
.catch((error) => {
|
||||
this.emitError(error);
|
||||
done?.(error);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const packet = Buffer.isBuffer(data) ? data : Buffer.from(data);
|
||||
this.writeFrame(packet).then(() => {
|
||||
done?.();
|
||||
}).catch((error) => {
|
||||
this.emitError(error);
|
||||
done?.(error);
|
||||
});
|
||||
}
|
||||
|
||||
close(code = 1000, reason = '') {
|
||||
if (this.readyState === WebSocket.CLOSED || this.readyState === WebSocket.CLOSING) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.readyState = WebSocket.CLOSING;
|
||||
|
||||
try {
|
||||
this.session.close(code, reason);
|
||||
} catch (error) {
|
||||
this.emitError(error);
|
||||
this.markClosed(code, reason);
|
||||
}
|
||||
}
|
||||
|
||||
async openControlStream() {
|
||||
const stream = await this.session.openUni();
|
||||
await stream.write(Buffer.from([WT_STREAM_CONTROL_TO_CLIENT]));
|
||||
return stream;
|
||||
}
|
||||
|
||||
async writeControl(data) {
|
||||
if (this.readyState !== WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
|
||||
const stream = await this._controlStreamPromise;
|
||||
await stream.write(Buffer.from(`${data}\n`, 'utf8'));
|
||||
}
|
||||
|
||||
async writeFrame(packet) {
|
||||
if (this.readyState !== WebSocket.OPEN) {
|
||||
throw new Error('WebTransport frame connection is closed.');
|
||||
}
|
||||
|
||||
this._bufferedAmount += packet.length;
|
||||
|
||||
try {
|
||||
const stream = await this.session.openUni();
|
||||
const payload = Buffer.allocUnsafe(packet.length + 1);
|
||||
payload[0] = WT_STREAM_FRAME;
|
||||
packet.copy(payload, 1);
|
||||
await stream.write(payload);
|
||||
await stream.finish();
|
||||
} finally {
|
||||
this._bufferedAmount = Math.max(0, this._bufferedAmount - packet.length);
|
||||
}
|
||||
}
|
||||
|
||||
async acceptClientControlStreams() {
|
||||
while (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CLOSING) {
|
||||
let stream;
|
||||
|
||||
try {
|
||||
stream = await this.session.acceptUni();
|
||||
} catch (error) {
|
||||
if (this.readyState === WebSocket.OPEN) {
|
||||
this.emitError(error);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void this.readClientControlStream(stream);
|
||||
}
|
||||
}
|
||||
|
||||
async readClientControlStream(stream) {
|
||||
let buffer = '';
|
||||
let sawHeader = false;
|
||||
|
||||
try {
|
||||
while (this.readyState === WebSocket.OPEN || this.readyState === WebSocket.CLOSING) {
|
||||
const chunk = await stream.read(64 * 1024);
|
||||
|
||||
if (!chunk) {
|
||||
this.flushClientControlBuffer(buffer);
|
||||
return;
|
||||
}
|
||||
|
||||
let data = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
|
||||
|
||||
if (!sawHeader) {
|
||||
sawHeader = true;
|
||||
|
||||
if (data[0] !== WT_STREAM_CONTROL_TO_SERVER) {
|
||||
return;
|
||||
}
|
||||
|
||||
data = data.subarray(1);
|
||||
}
|
||||
|
||||
buffer = this.consumeClientControlText(buffer + data.toString('utf8'));
|
||||
}
|
||||
} catch (error) {
|
||||
if (this.readyState === WebSocket.OPEN) {
|
||||
this.emitError(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
consumeClientControlText(text) {
|
||||
const lines = text.split(/\n/);
|
||||
const pending = lines.pop() ?? '';
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.trim()) {
|
||||
this.emit('message', Buffer.from(line, 'utf8'), false);
|
||||
}
|
||||
}
|
||||
|
||||
return pending;
|
||||
}
|
||||
|
||||
flushClientControlBuffer(text) {
|
||||
if (text.trim()) {
|
||||
this.emit('message', Buffer.from(text, 'utf8'), false);
|
||||
}
|
||||
}
|
||||
|
||||
emitError(error) {
|
||||
if (this.listenerCount('error') > 0) {
|
||||
this.emit('error', error);
|
||||
} else {
|
||||
logWarn(`webtransport frame connection error kind=${this.label} error=${oneLine(error.message)}`);
|
||||
}
|
||||
}
|
||||
|
||||
markClosed(code, reason) {
|
||||
if (this._closedEmitted) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._closedEmitted = true;
|
||||
this.readyState = WebSocket.CLOSED;
|
||||
this.emit('close', code, reason);
|
||||
}
|
||||
}
|
||||
|
||||
function createJpegFrameParser(onFrame) {
|
||||
let collecting = false;
|
||||
let pendingMarkerByte = false;
|
||||
@@ -3576,10 +3149,6 @@ function sendJson(websocket, payload) {
|
||||
}
|
||||
}
|
||||
|
||||
function formatHostPort(host, port) {
|
||||
return host.includes(':') && !host.startsWith('[') ? `[${host}]:${port}` : `${host}:${port}`;
|
||||
}
|
||||
|
||||
function appendTail(current, chunk) {
|
||||
const next = current + chunk.toString('utf8');
|
||||
return next.length > 4000 ? next.slice(-4000) : next;
|
||||
|
||||
Reference in New Issue
Block a user