Многопоточность в JS Зачем многопоточность в Node.js Node.js исторически опирается на однопоточный Event Loop
и неблокирующий ввод/вывод. Это идеально для сетевых приложений, но плохо для CPU-тяжёлых задач (парсинг больших файлов, криптография, сжатие, обработка изображений, ML-инференс). Многопоточность через модуль worker_threads
позволяет вынести такие расчёты в отдельные потоки, не блокируя главный цикл событий.
Ключевая идея: JavaScript-код в Node по умолчанию выполняется в одном потоке. Worker
создаёт ещё один поток с отдельным JS-движком и собственным Event Loop.
Архитектура: Event Loop, libuv и Worker Каждый изолят V8 исполняет JS и имеет свой Event Loop
. Node использует libuv
с внутренним пулом потоков
для некоторых операций (fs, crypto), однако это не исполняет ваш JS параллельно. Worker
создаёт новый изолят и ОС-поток, где ваш JS действительно идёт параллельно.
Один процесс Node может содержать множество воркеров , каждый со своей памятью и циклом событий.
Когда выбирать worker_threads, а когда нет CPU-bound задачи : изображения, видео, PDF, криптография, сложная сериализация — worker_threads
.
I/O-bound (базы, HTTP, очереди) — чаще достаточно встроенного неблокирующего I/O.
Изоляция памяти и надёжность : для недоверенного кода и «жёстких» перезапусков лучше child_process
(отдельный процесс).
Масштабирование по ядрам : распределение соединений по ядрам — cluster
или несколько процессов за балансировщиком.
Не используйте воркеры для каждой мелкой операции. Создание потока — дорогая операция. Объединяйте задачи в пул
и переиспользуйте.
API: базовые сущности и каналы связи Worker
— класс для создания потока.
isMainThread
— признак «мы в главном потоке?».
parentPort
— порт связи с родителем (внутри воркера).
workerData
— «снэпшот» входных данных при запуске воркера.
MessageChannel
, MessagePort
— двунаправленные порты.
transferList
— список «передаваемых» объектов (например, ArrayBuffer
).
SharedArrayBuffer
и Atomics
— разделяемая память и атомарные операции.
Передача сообщений использует Structured Clone
: большинство структур копируются прозрачно; Transferable
объекты можно передавать без копии, передавая «владение».
Пример 1: простой воркер (два файла) Рассчитаем тяжёлую функцию в отдельном потоке.
// main.js
const { Worker, isMainThread } = require("node:worker_threads");
if (isMainThread) {
const worker = new Worker(__filename, {workerData: {n: 45}});
worker.once("message", (result) => {
console.log("fib:", result); // fib: 1134903170
});
worker.once("error", (e) => console.error("error:", e)); // error: ...
worker.once("exit", (code) => console.log("exit:", code)); // exit: 0
} else {
const {parentPort, workerData} = require("node:worker_threads");
function fib(n) {
return n < 2 ? n : fib(n - 1) + fib(n - 2);
}
const result = fib(workerData.n);
parentPort.postMessage(result);
}
Рекурсивный fib
— демонстрационная «печка», он неэффективен. В реальности используйте алгоритмы с линейной сложностью или memoization.
Пример 2: воркер как отдельный модуль (многофайловый) Так чаще организуют код в реальных проектах.
// main.js const { Worker } = require("node:worker_threads"); const path = require("node:path");
function runTask(payload) {
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, "worker.js"), {workerData: payload});
worker.once("message", resolve);
worker.once("error", reject);
worker.once("exit", (code) => {
if (code !== 0) reject(new Error("exit " + code));
});
});
}
(async () => {
const res = await runTask({op: "sum", items: [1, 2, 3]});
console.log("sum:", res); // sum: 6
})();
// worker.js
const {workerData, parentPort} = require("node:worker_threads");
function compute(data) {
if (data.op === "sum") return data.items.reduce((a, b) => a + b, 0);
return null;
}
parentPort.postMessage(compute(workerData));
Пример 3: простой пул воркеров (переиспользование потоков) Создаём фиксированное число воркеров и очередь задач.
// pool.js
const { Worker } = require("node:worker_threads");
const path = require("node:path");
class WorkerPool {
constructor(size = Math.max(1, require("node:os").cpus().length - 1)) {
this.size = size;
this.free = [];
this.busy = new Set();
this.queue = [];
for (let i = 0; i < size; i++) this.free.push(this._create());
}
_create() {
const w = new Worker(path.join(__dirname, "worker.js"));
w.on("message", (msg) => {
w.currentResolve && w.currentResolve(msg);
w.currentResolve = w.currentReject = null;
this._release(w);
});
w.on("error", (err) => {
w.currentReject && w.currentReject(err);
this.busy.delete(w);
// Создаём новый на замену
const nw = this._create();
this.free.push(nw);
this._drain();
});
w.on("exit", (code) => {
this.busy.delete(w);
if (code !== 0) {
const nw = this._create();
this.free.push(nw);
}
this._drain();
});
return w;
}
exec(payload) {
return new Promise((resolve, reject) => {
this.queue.push({payload, resolve, reject});
this._drain();
});
}
_drain() {
while (this.free.length > 0 && this.queue.length > 0) {
const w = this.free.pop();
const job = this.queue.shift();
this.busy.add(w);
w.currentResolve = job.resolve;
w.currentReject = job.reject;
w.postMessage(job.payload);
}
}
_release(w) {
this.busy.delete(w);
this.free.push(w);
this._drain();
}
async destroy() {
const all = [...this.free, ...this.busy];
await Promise.all(all.map((w) => w.terminate()));
this.free = [];
this.busy.clear();
}
}
module.exports = {WorkerPool};
// worker.js
const {parentPort} = require("node:worker_threads");
parentPort.on("message", (task) => {
let result = null;
if (task.type === "mul") result = task.a * task.b;
if (task.type === "sleep") {
const end = Date.now() + task.ms;
while (Date.now() < end) {
}
result = task.ms;
}
parentPort.postMessage({id: task.id, result});
});
// main.js
const {WorkerPool} = require("./pool");
(async () => {
const pool = new WorkerPool(4);
const jobs = [];
for (let i = 0; i < 8; i++) jobs.push(pool.exec({type: "mul", a: i, b: i + 1, id: i}));
const res = await Promise.all(jobs);
console.log(res.length); // 8
await pool.destroy();
})();
Передача данных: Structured Clone, Transferable, SharedArrayBuffer Копирование и передача владения По умолчанию объекты копируются по Structured Clone . Для больших бинарных данных выгоднее передавать «владение» ArrayBuffer
через transferList
.
// transfer.js
const { Worker, isMainThread, parentPort } = require("node:worker_threads");
if (isMainThread) {
const worker = new Worker(__filename);
const buf = new ArrayBuffer(1024 * 1024);
const u8 = new Uint8Array(buf);
u8[0] = 7;
worker.postMessage({buf}, [buf]);
worker.once("message", (m) => console.log(m)); // { first: 7 }
} else {
parentPort.once("message", ({buf}) => {
const view = new Uint8Array(buf);
parentPort.postMessage({first: view[0]});
});
}
После передачи в transferList исходный буфер у отправителя становится «detached» (использование приводит к ошибкам).
Разделяемая память и Atomics SharedArrayBuffer
позволяет разделять память между потоками, а Atomics
— синхронизировать доступ.
// shared.js
const {Worker, isMainThread, parentPort, workerData} = require("node:worker_threads");
if (isMainThread) {
const sab = new SharedArrayBuffer(4);
const view = new Int32Array(sab);
const w = new Worker(__filename, {workerData: sab});
Atomics.store(view, 0, 0);
setTimeout(() => {
Atomics.store(view, 0, 42);
Atomics.notify(view, 0, 1);
}, 100);
w.once("message", (v) => console.log(v)); // 42
} else {
const view = new Int32Array(workerData);
Atomics.wait(view, 0, 0);
parentPort.postMessage(Atomics.load(view, 0));
}
Злоупотребление SAB усложняет код и повышает риск гонок. Используйте только для действительно «горячих» путей.
Двусторонние каналы: MessageChannel/MessagePort MessageChannel
создаёт пару портов. Их можно передать воркеру и использовать как выделенную шину.
// channel.js
const { Worker, MessageChannel, isMainThread, parentPort } = require("node:worker_threads");
if (isMainThread) {
const worker = new Worker(__filename, {workerData: null});
const {port1, port2} = new MessageChannel();
worker.postMessage({port: port2}, [port2]);
port1.on("message", (m) => console.log("got:", m)); // got: pong
port1.postMessage("ping");
} else {
parentPort.once("message", ({port}) => {
port.on("message", (m) => port.postMessage(m === "ping" ? "pong" : m));
});
}
Порты удобны для multiplexing и для передачи в «пул», когда воркер обслуживает несколько клиентов.
Отмена задач и таймауты Прямой «убийства» функции нет — есть worker.terminate()
, которое завершит поток целиком. Для «мягкой» отмены пошлите сигнал и регулярно проверяйте его в воркере.
// cancel.js
const {Worker, isMainThread, parentPort} = require("node:worker_threads");
if (isMainThread) {
const w = new Worker(__filename);
const timer = setTimeout(() => w.postMessage({type: "cancel"}), 50);
w.once("message", (m) => console.log(m)); // { status: "canceled" }
w.postMessage({type: "start"});
} else {
let canceled = false;
parentPort.on("message", (m) => {
if (m.type === "cancel") canceled = true;
if (m.type === "start") {
let s = 0;
for (let i = 0; i < 1e9; i++) {
s += i;
if (canceled) break;
}
parentPort.postMessage({status: canceled ? "canceled" : "done"});
}
});
}
ESM, TypeScript и бандлинг ESM : new Worker(new URL("worker.mjs", import.meta.url), { type: "module" })
.
Бандлеры : убедитесь, что воркер лежит как отдельный asset; относительные пути из __filename
/import.meta.url
надёжнее «магических» строк.
Пути до воркера часто «ломаются» при упаковке. Всегда тестируйте финальный артефакт (Docker/CI) с реальными путями.
Ошибки, выход, утечки worker.on("error")
— перехватывайте исключения.
worker.on("exit")
— код выхода ≠ 0 =— аварийное завершение.
worker.terminate()
— «жёсткое» завершение (освободит ресурсы).
resourceLimits
— ограничение heap/stack при создании воркера.
Не забывайте отписываться от событий и очищать очереди в пуле, иначе можно «нарастить» ссылки и получить утечку памяти.
Производительность: измеряем и считаем Замеряйте «до/после» и учитывайте расходы на сериализацию/копирование.
// perf.js
const { Worker } = require("node:worker_threads");
const { performance } = require("node:perf_hooks");
const path = require("node:path");
(async () => {
const start = performance.now();
const w = new Worker(path.join(__dirname, "worker.js"), {workerData: {n: 45}});
const t = await new Promise((res, rej) => {
w.once("message", res);
w.once("error", rej);
});
const dt = performance.now() - start;
console.log(Math.round(dt)); // 1234
})();
Расширенный многофайловый пример: обработка изображений в пуле Схема: главный поток читает список задач, пул раздаёт их воркерам, воркеры считают и возвращают результат. Здесь имитируем CPU-нагрузку.
// pool.js
const { Worker } = require("node:worker_threads");
const path = require("node:path");
const os = require("node:os");
class Pool {
constructor(file, size = Math.max(1, os.cpus().length - 1)) {
this.file = file;
this.size = size;
this.queue = [];
this.id = 0;
this.workers = Array.from({length: size}, () => this._spawn());
}
_spawn() {
const w = new Worker(this.file);
w.idle = true;
w.on("message", (msg) => {
w.current && w.current.resolve(msg);
w.current = null;
w.idle = true;
this._schedule();
});
w.on("error", (e) => {
w.current && w.current.reject(e);
Object.assign(w, this._spawn()); // замена
});
return w;
}
exec(payload, transfer = []) {
return new Promise((resolve, reject) => {
this.queue.push({payload, resolve, reject, transfer});
this._schedule();
});
}
_schedule() {
const w = this.workers.find(x => x.idle);
if (!w) return;
const job = this.queue.shift();
if (!job) return;
w.idle = false;
w.current = job;
w.postMessage(job.payload, job.transfer);
}
async destroy() {
await Promise.all(this.workers.map((w) => w.terminate()));
}
}
module.exports = {Pool};
// img.worker.js
const {parentPort} = require("node:worker_threads");
parentPort.on("message", ({buf, factor}) => {
const view = new Uint8Array(buf);
let sum = 0;
for (let i = 0; i < view.length; i++) sum += (view[i] * factor) | 0;
parentPort.postMessage({sum});
});
// main.js
const {Pool} = require("./pool");
const path = require("node:path");
(async () => {
const pool = new Pool(path.join(__dirname, "img.worker.js"), 4);
const buf = new ArrayBuffer(1024 * 1024);
const u8 = new Uint8Array(buf);
for (let i = 0; i < u8.length; i++) u8[i] = i & 255;
const res = await pool.exec({buf, factor: 3}, [buf]);
console.log(res.sum > 0); // true
await pool.destroy();
})();
Чеклист лучших практик Оценивайте тип задачи: CPU-bound → воркеры , I/O-bound → обычный асинхронный код .
Используйте пул , а не бесконечное создание новых воркеров.
Определите протокол сообщений (типы, версии, поля ошибок).
Минимизируйте копирование: передавайте буферы через transferList .
Ограничивайте ресурсы resourceLimits
для «жадных» задач.
Логируйте error
, отслеживайте exit
, пишите метрики времени и очередей.
Тестируйте пути к воркеру в собранном окружении (Docker/CI).
Типичные ошибки и подводные камни Блокировка Event Loop : запуск тяжёлого JS в главном потоке тормозит всё приложение.
Чрезмерная сериализация : частые большие сообщения без transfer — потеря времени.
Ломаются пути : относительные пути до воркера после бандлинга/обфускации.
Отсутствие стратегии отмены : «вечные» воркеры без terminate()
и протокола cancel
.
Утечки : забытые слушатели и ссылки в пуле.
Не запускайте недоверенный код в воркере «как есть»: поток разделяет права процесса. Для жёсткой изоляции — отдельный процесс/контейнер.
Сравнение: worker_threads vs child_process vs cluster worker_threads
: общая память процесса, низкие накладные расходы, быстрые сообщения, слабая изоляция.
child_process
: сильная изоляция, IPC медленнее, больше памяти.
cluster
: масштабирование входящих соединений по ядрам, каждый «воркер» — это процесс.
Если вам нужна и многопоточность, и масштабирование по ядрам — комбинируйте: несколько процессов, внутри каждого — пул воркеров .
Last modified: 01 October 2025