Backend Typescript 1.0.0 Help

Многопоточность в JS

Зачем многопоточность в Node.js

Node.js исторически опирается на однопоточный Event Loop и неблокирующий ввод/вывод. Это идеально для сетевых приложений, но плохо для CPU-тяжёлых задач (парсинг больших файлов, криптография, сжатие, обработка изображений, ML-инференс). Многопоточность через модуль worker_threads позволяет вынести такие расчёты в отдельные потоки, не блокируя главный цикл событий.

Архитектура: Event Loop, libuv и Worker

Каждый изолят V8 исполняет JS и имеет свой Event Loop. Node использует libuv с внутренним пулом потоков для некоторых операций (fs, crypto), однако это не исполняет ваш JS параллельно. Worker создаёт новый изолят и ОС-поток, где ваш JS действительно идёт параллельно.

Когда выбирать worker_threads, а когда нет

  • CPU-bound задачи: изображения, видео, PDF, криптография, сложная сериализация — worker_threads.

  • I/O-bound (базы, HTTP, очереди) — чаще достаточно встроенного неблокирующего I/O.

  • Изоляция памяти и надёжность: для недоверенного кода и «жёстких» перезапусков лучше child_process (отдельный процесс).

  • Масштабирование по ядрам: распределение соединений по ядрам — cluster или несколько процессов за балансировщиком.

API: базовые сущности и каналы связи

  • Worker — класс для создания потока.

  • isMainThread — признак «мы в главном потоке?».

  • parentPort — порт связи с родителем (внутри воркера).

  • workerData — «снэпшот» входных данных при запуске воркера.

  • MessageChannel, MessagePort — двунаправленные порты.

  • transferList — список «передаваемых» объектов (например, ArrayBuffer).

  • SharedArrayBuffer и Atomics — разделяемая память и атомарные операции.

Пример 1: простой воркер (два файла)

Рассчитаем тяжёлую функцию в отдельном потоке.

// main.js const { Worker, isMainThread } = require("node:worker_threads"); if (isMainThread) { const worker = new Worker(__filename, {workerData: {n: 45}}); worker.once("message", (result) => { console.log("fib:", result); // fib: 1134903170 }); worker.once("error", (e) => console.error("error:", e)); // error: ... worker.once("exit", (code) => console.log("exit:", code)); // exit: 0 } else { const {parentPort, workerData} = require("node:worker_threads"); function fib(n) { return n < 2 ? n : fib(n - 1) + fib(n - 2); } const result = fib(workerData.n); parentPort.postMessage(result); }

Пример 2: воркер как отдельный модуль (многофайловый)

Так чаще организуют код в реальных проектах.

// main.js const { Worker } = require("node:worker_threads"); const path = require("node:path"); function runTask(payload) { return new Promise((resolve, reject) => { const worker = new Worker(path.join(__dirname, "worker.js"), {workerData: payload}); worker.once("message", resolve); worker.once("error", reject); worker.once("exit", (code) => { if (code !== 0) reject(new Error("exit " + code)); }); }); } (async () => { const res = await runTask({op: "sum", items: [1, 2, 3]}); console.log("sum:", res); // sum: 6 })();
// worker.js const {workerData, parentPort} = require("node:worker_threads"); function compute(data) { if (data.op === "sum") return data.items.reduce((a, b) => a + b, 0); return null; } parentPort.postMessage(compute(workerData));

Пример 3: простой пул воркеров (переиспользование потоков)

Создаём фиксированное число воркеров и очередь задач.

// pool.js const { Worker } = require("node:worker_threads"); const path = require("node:path"); class WorkerPool { constructor(size = Math.max(1, require("node:os").cpus().length - 1)) { this.size = size; this.free = []; this.busy = new Set(); this.queue = []; for (let i = 0; i < size; i++) this.free.push(this._create()); } _create() { const w = new Worker(path.join(__dirname, "worker.js")); w.on("message", (msg) => { w.currentResolve && w.currentResolve(msg); w.currentResolve = w.currentReject = null; this._release(w); }); w.on("error", (err) => { w.currentReject && w.currentReject(err); this.busy.delete(w); // Создаём новый на замену const nw = this._create(); this.free.push(nw); this._drain(); }); w.on("exit", (code) => { this.busy.delete(w); if (code !== 0) { const nw = this._create(); this.free.push(nw); } this._drain(); }); return w; } exec(payload) { return new Promise((resolve, reject) => { this.queue.push({payload, resolve, reject}); this._drain(); }); } _drain() { while (this.free.length > 0 && this.queue.length > 0) { const w = this.free.pop(); const job = this.queue.shift(); this.busy.add(w); w.currentResolve = job.resolve; w.currentReject = job.reject; w.postMessage(job.payload); } } _release(w) { this.busy.delete(w); this.free.push(w); this._drain(); } async destroy() { const all = [...this.free, ...this.busy]; await Promise.all(all.map((w) => w.terminate())); this.free = []; this.busy.clear(); } } module.exports = {WorkerPool};
// worker.js const {parentPort} = require("node:worker_threads"); parentPort.on("message", (task) => { let result = null; if (task.type === "mul") result = task.a * task.b; if (task.type === "sleep") { const end = Date.now() + task.ms; while (Date.now() < end) { } result = task.ms; } parentPort.postMessage({id: task.id, result}); });
// main.js const {WorkerPool} = require("./pool"); (async () => { const pool = new WorkerPool(4); const jobs = []; for (let i = 0; i < 8; i++) jobs.push(pool.exec({type: "mul", a: i, b: i + 1, id: i})); const res = await Promise.all(jobs); console.log(res.length); // 8 await pool.destroy(); })();

Передача данных: Structured Clone, Transferable, SharedArrayBuffer

Копирование и передача владения

По умолчанию объекты копируются по Structured Clone. Для больших бинарных данных выгоднее передавать «владение» ArrayBuffer через transferList.

// transfer.js const { Worker, isMainThread, parentPort } = require("node:worker_threads"); if (isMainThread) { const worker = new Worker(__filename); const buf = new ArrayBuffer(1024 * 1024); const u8 = new Uint8Array(buf); u8[0] = 7; worker.postMessage({buf}, [buf]); worker.once("message", (m) => console.log(m)); // { first: 7 } } else { parentPort.once("message", ({buf}) => { const view = new Uint8Array(buf); parentPort.postMessage({first: view[0]}); }); }

Разделяемая память и Atomics

SharedArrayBuffer позволяет разделять память между потоками, а Atomics — синхронизировать доступ.

// shared.js const {Worker, isMainThread, parentPort, workerData} = require("node:worker_threads"); if (isMainThread) { const sab = new SharedArrayBuffer(4); const view = new Int32Array(sab); const w = new Worker(__filename, {workerData: sab}); Atomics.store(view, 0, 0); setTimeout(() => { Atomics.store(view, 0, 42); Atomics.notify(view, 0, 1); }, 100); w.once("message", (v) => console.log(v)); // 42 } else { const view = new Int32Array(workerData); Atomics.wait(view, 0, 0); parentPort.postMessage(Atomics.load(view, 0)); }

Двусторонние каналы: MessageChannel/MessagePort

MessageChannel создаёт пару портов. Их можно передать воркеру и использовать как выделенную шину.

// channel.js const { Worker, MessageChannel, isMainThread, parentPort } = require("node:worker_threads"); if (isMainThread) { const worker = new Worker(__filename, {workerData: null}); const {port1, port2} = new MessageChannel(); worker.postMessage({port: port2}, [port2]); port1.on("message", (m) => console.log("got:", m)); // got: pong port1.postMessage("ping"); } else { parentPort.once("message", ({port}) => { port.on("message", (m) => port.postMessage(m === "ping" ? "pong" : m)); }); }

Отмена задач и таймауты

Прямой «убийства» функции нет — есть worker.terminate(), которое завершит поток целиком. Для «мягкой» отмены пошлите сигнал и регулярно проверяйте его в воркере.

// cancel.js const {Worker, isMainThread, parentPort} = require("node:worker_threads"); if (isMainThread) { const w = new Worker(__filename); const timer = setTimeout(() => w.postMessage({type: "cancel"}), 50); w.once("message", (m) => console.log(m)); // { status: "canceled" } w.postMessage({type: "start"}); } else { let canceled = false; parentPort.on("message", (m) => { if (m.type === "cancel") canceled = true; if (m.type === "start") { let s = 0; for (let i = 0; i < 1e9; i++) { s += i; if (canceled) break; } parentPort.postMessage({status: canceled ? "canceled" : "done"}); } }); }

ESM, TypeScript и бандлинг

  • ESM: new Worker(new URL("worker.mjs", import.meta.url), { type: "module" }).

  • Бандлеры: убедитесь, что воркер лежит как отдельный asset; относительные пути из __filename/import.meta.url надёжнее «магических» строк.

Ошибки, выход, утечки

  • worker.on("error") — перехватывайте исключения.

  • worker.on("exit") — код выхода ≠ 0 =— аварийное завершение.

  • worker.terminate() — «жёсткое» завершение (освободит ресурсы).

  • resourceLimits — ограничение heap/stack при создании воркера.

Производительность: измеряем и считаем

Замеряйте «до/после» и учитывайте расходы на сериализацию/копирование.

// perf.js const { Worker } = require("node:worker_threads"); const { performance } = require("node:perf_hooks"); const path = require("node:path"); (async () => { const start = performance.now(); const w = new Worker(path.join(__dirname, "worker.js"), {workerData: {n: 45}}); const t = await new Promise((res, rej) => { w.once("message", res); w.once("error", rej); }); const dt = performance.now() - start; console.log(Math.round(dt)); // 1234 })();

Расширенный многофайловый пример: обработка изображений в пуле

Схема: главный поток читает список задач, пул раздаёт их воркерам, воркеры считают и возвращают результат. Здесь имитируем CPU-нагрузку.

// pool.js const { Worker } = require("node:worker_threads"); const path = require("node:path"); const os = require("node:os"); class Pool { constructor(file, size = Math.max(1, os.cpus().length - 1)) { this.file = file; this.size = size; this.queue = []; this.id = 0; this.workers = Array.from({length: size}, () => this._spawn()); } _spawn() { const w = new Worker(this.file); w.idle = true; w.on("message", (msg) => { w.current && w.current.resolve(msg); w.current = null; w.idle = true; this._schedule(); }); w.on("error", (e) => { w.current && w.current.reject(e); Object.assign(w, this._spawn()); // замена }); return w; } exec(payload, transfer = []) { return new Promise((resolve, reject) => { this.queue.push({payload, resolve, reject, transfer}); this._schedule(); }); } _schedule() { const w = this.workers.find(x => x.idle); if (!w) return; const job = this.queue.shift(); if (!job) return; w.idle = false; w.current = job; w.postMessage(job.payload, job.transfer); } async destroy() { await Promise.all(this.workers.map((w) => w.terminate())); } } module.exports = {Pool};
// img.worker.js const {parentPort} = require("node:worker_threads"); parentPort.on("message", ({buf, factor}) => { const view = new Uint8Array(buf); let sum = 0; for (let i = 0; i < view.length; i++) sum += (view[i] * factor) | 0; parentPort.postMessage({sum}); });
// main.js const {Pool} = require("./pool"); const path = require("node:path"); (async () => { const pool = new Pool(path.join(__dirname, "img.worker.js"), 4); const buf = new ArrayBuffer(1024 * 1024); const u8 = new Uint8Array(buf); for (let i = 0; i < u8.length; i++) u8[i] = i & 255; const res = await pool.exec({buf, factor: 3}, [buf]); console.log(res.sum > 0); // true await pool.destroy(); })();

Чеклист лучших практик

  • Оценивайте тип задачи: CPU-bound → воркеры, I/O-bound → обычный асинхронный код.

  • Используйте пул, а не бесконечное создание новых воркеров.

  • Определите протокол сообщений (типы, версии, поля ошибок).

  • Минимизируйте копирование: передавайте буферы через transferList.

  • Ограничивайте ресурсы resourceLimits для «жадных» задач.

  • Логируйте error, отслеживайте exit, пишите метрики времени и очередей.

  • Тестируйте пути к воркеру в собранном окружении (Docker/CI).

Типичные ошибки и подводные камни

  • Блокировка Event Loop: запуск тяжёлого JS в главном потоке тормозит всё приложение.

  • Чрезмерная сериализация: частые большие сообщения без transfer — потеря времени.

  • Ломаются пути: относительные пути до воркера после бандлинга/обфускации.

  • Отсутствие стратегии отмены: «вечные» воркеры без terminate() и протокола cancel.

  • Утечки: забытые слушатели и ссылки в пуле.

Сравнение: worker_threads vs child_process vs cluster

  • worker_threads: общая память процесса, низкие накладные расходы, быстрые сообщения, слабая изоляция.

  • child_process: сильная изоляция, IPC медленнее, больше памяти.

  • cluster: масштабирование входящих соединений по ядрам, каждый «воркер» — это процесс.

Last modified: 01 October 2025