fatsify核心功能示例测试!!!

This commit is contained in:
2025-09-21 14:50:41 +08:00
commit 9145aea047
1958 changed files with 230098 additions and 0 deletions

13
node_modules/thread-stream/.github/dependabot.yml generated vendored Normal file
View File

@@ -0,0 +1,13 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: "monthly"
open-pull-requests-limit: 10
- package-ecosystem: "npm"
directory: "/"
schedule:
interval: "weekly"
open-pull-requests-limit: 10

94
node_modules/thread-stream/.github/workflows/ci.yml generated vendored Normal file
View File

@@ -0,0 +1,94 @@
name: CI
on:
push:
paths-ignore:
- 'docs/**'
- '*.md'
pull_request:
paths-ignore:
- 'docs/**'
- '*.md'
# This allows a subsequently queued workflow run to interrupt previous runs
concurrency:
group: "${{ github.workflow }} @ ${{ github.event.pull_request.head.label || github.head_ref || github.ref }}"
cancel-in-progress: true
jobs:
dependency-review:
name: Dependency Review
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Check out repo
uses: actions/checkout@v3
with:
persist-credentials: false
- name: Dependency review
uses: actions/dependency-review-action@v4
test:
name: Test
runs-on: ${{ matrix.os }}
permissions:
contents: read
strategy:
matrix:
node-version: [18, 20, 22]
os: [macos-latest, ubuntu-latest, windows-latest]
exclude:
- os: windows-latest
node-version: 22
steps:
- name: Check out repo
uses: actions/checkout@v3
with:
persist-credentials: false
- name: Setup Node ${{ matrix.node-version }}
uses: actions/setup-node@v4
with:
node-version: ${{ matrix.node-version }}
- name: Install dependencies
run: npm i --ignore-scripts
- name: Run tests
run: npm run test:ci
- name: Coveralls Parallel
uses: coverallsapp/github-action@v2.3.0
with:
github-token: ${{ secrets.github_token }}
parallel: true
flag-name: run-${{ matrix.node-version }}-${{ matrix.os }}
coverage:
needs: test
runs-on: ubuntu-latest
steps:
- name: Coveralls Finished
uses: coverallsapp/github-action@v2.3.0
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
parallel-finished: true
automerge:
name: Automerge Dependabot PRs
if: >
github.event_name == 'pull_request' &&
github.event.pull_request.user.login == 'dependabot[bot]'
needs: test
permissions:
pull-requests: write
contents: write
runs-on: ubuntu-latest
steps:
- uses: fastify/github-action-merge-dependabot@v3
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

4
node_modules/thread-stream/.husky/pre-commit generated vendored Executable file
View File

@@ -0,0 +1,4 @@
#!/usr/bin/env sh
. "$(dirname -- "$0")/_/husky.sh"
npm test

4
node_modules/thread-stream/.taprc generated vendored Normal file
View File

@@ -0,0 +1,4 @@
jobs: 1
check-coverage: false
# in seconds
timeout: 60

21
node_modules/thread-stream/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 Matteo Collina
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

135
node_modules/thread-stream/README.md generated vendored Normal file
View File

@@ -0,0 +1,135 @@
# thread-stream
[![npm version](https://img.shields.io/npm/v/thread-stream)](https://www.npmjs.com/package/thread-stream)
[![Build Status](https://img.shields.io/github/actions/workflow/status/pinojs/thread-stream/ci.yml?branch=main)](https://github.com/pinojs/thread-stream/actions)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat)](https://standardjs.com/)
A streaming way to send data to a Node.js Worker Thread.
## install
```sh
npm i thread-stream
```
## Usage
```js
'use strict'
const ThreadStream = require('thread-stream')
const { join } = require('path')
const stream = new ThreadStream({
filename: join(__dirname, 'worker.js'),
workerData: { dest },
workerOpts: {}, // Other options to be passed to Worker
sync: false, // default
})
stream.write('hello')
// Asynchronous flushing
stream.flush(function () {
stream.write(' ')
stream.write('world')
// Synchronous flushing
stream.flushSync()
stream.end()
})
```
In `worker.js`:
```js
'use strict'
const fs = require('fs')
const { once } = require('events')
async function run (opts) {
const stream = fs.createWriteStream(opts.dest)
await once(stream, 'open')
return stream
}
module.exports = run
```
Make sure that the stream emits `'close'` when the stream completes.
This can usually be achieved by passing the [`autoDestroy: true`](https://nodejs.org/api/stream.html#stream_new_stream_writable_options)
flag your stream classes.
The underlining worker is automatically closed if the stream is garbage collected.
### External modules
You may use this module within compatible external modules, that exports the `worker.js` interface.
```js
const ThreadStream = require('thread-stream')
const modulePath = require.resolve('pino-elasticsearch')
const stream = new ThreadStream({
filename: modulePath,
workerData: { node: 'http://localhost:9200' }
})
stream.write('log to elasticsearch!')
stream.flushSync()
stream.end()
```
This module works with `yarn` in PnP (plug'n play) mode too!
### Emit events
You can emit events on the ThreadStream from your worker using [`worker.parentPort.postMessage()`](https://nodejs.org/api/worker_threads.html#workerparentport).
The message (JSON object) must have the following data structure:
```js
parentPort.postMessage({
code: 'EVENT',
name: 'eventName',
args: ['list', 'of', 'args', 123, new Error('Boom')]
})
```
On your ThreadStream, you can add a listener function for this event name:
```js
const stream = new ThreadStream({
filename: join(__dirname, 'worker.js'),
workerData: {},
})
stream.on('eventName', function (a, b, c, n, err) {
console.log('received:', a, b, c, n, err) // received: list of args 123 Error: Boom
})
```
### Post Messages
You can post messages to the worker by emitting a `message` event on the ThreadStream.
```js
const stream = new ThreadStream({
filename: join(__dirname, 'worker.js'),
workerData: {},
})
stream.emit('message', message)
```
On your worker, you can listen for this message using [`worker.parentPort.on('message', cb)`](https://nodejs.org/api/worker_threads.html#event-message).
```js
const { parentPort } = require('worker_threads')
parentPort.on('message', function (message) {
console.log('received:', message)
})
```
## License
MIT

85
node_modules/thread-stream/bench.js generated vendored Normal file
View File

@@ -0,0 +1,85 @@
'use strict'
const bench = require('fastbench')
const SonicBoom = require('sonic-boom')
const ThreadStream = require('.')
const Console = require('console').Console
const fs = require('fs')
const { join } = require('path')
const core = fs.createWriteStream('/dev/null')
const fd = fs.openSync('/dev/null', 'w')
const sonic = new SonicBoom({ fd })
const sonicSync = new SonicBoom({ fd, sync: true })
const out = fs.createWriteStream('/dev/null')
const dummyConsole = new Console(out)
const threadStreamSync = new ThreadStream({
filename: join(__dirname, 'test', 'to-file.js'),
workerData: { dest: '/dev/null' },
bufferSize: 4 * 1024 * 1024,
sync: true
})
const threadStreamAsync = new ThreadStream({
filename: join(__dirname, 'test', 'to-file.js'),
workerData: { dest: '/dev/null' },
bufferSize: 4 * 1024 * 1024,
sync: false
})
const MAX = 10000
let str = ''
for (let i = 0; i < 100; i++) {
str += 'hello'
}
setTimeout(doBench, 100)
const run = bench([
function benchThreadStreamSync (cb) {
for (let i = 0; i < MAX; i++) {
threadStreamSync.write(str)
}
setImmediate(cb)
},
function benchThreadStreamAsync (cb) {
threadStreamAsync.once('drain', cb)
for (let i = 0; i < MAX; i++) {
threadStreamAsync.write(str)
}
},
function benchSonic (cb) {
sonic.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonic.write(str)
}
},
function benchSonicSync (cb) {
sonicSync.once('drain', cb)
for (let i = 0; i < MAX; i++) {
sonicSync.write(str)
}
},
function benchCore (cb) {
core.once('drain', cb)
for (let i = 0; i < MAX; i++) {
core.write(str)
}
},
function benchConsole (cb) {
for (let i = 0; i < MAX; i++) {
dummyConsole.log(str)
}
setImmediate(cb)
}
], 1000)
function doBench () {
run(function () {
run(function () {
// TODO figure out why it does not shut down
process.exit(0)
})
})
}

92
node_modules/thread-stream/index.d.ts generated vendored Normal file
View File

@@ -0,0 +1,92 @@
import { EventEmitter } from 'events'
import * as workerThreads from 'worker_threads'
interface ThreadStreamOptions {
/**
* The size (in bytes) of the buffer.
* Must be greater than 4 (i.e. it must at least fit a 4-byte utf-8 char).
*
* Default: `4 * 1024 * 1024` = `4194304`
*/
bufferSize?: number,
/**
* The path to the Worker's main script or module.
* Must be either an absolute path or a relative path (i.e. relative to the current working directory) starting with ./ or ../, or a WHATWG URL object using file: or data: protocol.
* When using a data: URL, the data is interpreted based on MIME type using the ECMAScript module loader.
*
* {@link workerThreads.Worker()}
*/
filename: string | URL,
/**
* If `true`, write data synchronously; otherwise write data asynchronously.
*
* Default: `false`.
*/
sync?: boolean,
/**
* {@link workerThreads.WorkerOptions.workerData}
*
* Default: `{}`
*/
workerData?: any,
/**
* {@link workerThreads.WorkerOptions}
*
* Default: `{}`
*/
workerOpts?: workerThreads.WorkerOptions
}
declare class ThreadStream extends EventEmitter {
/**
* @param {ThreadStreamOptions} opts
*/
constructor(opts: ThreadStreamOptions)
/**
* Write some data to the stream.
*
* **Please note that this method should not throw an {Error} if something goes wrong but emit an error event.**
* @param {string} data data to write.
* @returns {boolean} false if the stream wishes for the calling code to wait for the 'drain' event to be emitted before continuing to write additional data or if it fails to write; otherwise true.
*/
write(data: string): boolean
/**
* Signal that no more data will be written.
*
* **Please note that this method should not throw an {Error} if something goes wrong but emit an error event.**
*
* Calling the {@link write()} method after calling {@link end()} will emit an error.
*/
end(): void
/**
* Flush the stream synchronously.
* This method should be called in the shutdown phase to make sure that all data has been flushed.
*
* **Please note that this method will throw an {Error} if something goes wrong.**
*
* @throws {Error} if the stream is already flushing, if it fails to flush or if it takes more than 10 seconds to flush.
*/
flushSync(): void
/**
* Synchronously calls each of the listeners registered for the event named`eventName`, in the order they were registered, passing the supplied arguments
* to each.
*
* @param eventName the name of the event.
* @param args the arguments to be passed to the event handlers.
* @returns {boolean} `true` if the event had listeners, `false` otherwise.
*/
emit(eventName: string | symbol, ...args: any[]): boolean;
/**
* Post a message to the Worker with specified data and an optional list of transferable objects.
*
* @param eventName the name of the event, specifically 'message'.
* @param message message data to be sent to the Worker.
* @param transferList an optional list of transferable objects to be transferred to the Worker context.
* @returns {boolean} true if the event had listeners, false otherwise.
*/
emit(eventName: 'message', message: any, transferList?: workerThreads.TransferListItem[]): boolean
}
export = ThreadStream;

537
node_modules/thread-stream/index.js generated vendored Normal file
View File

@@ -0,0 +1,537 @@
'use strict'
const { version } = require('./package.json')
const { EventEmitter } = require('events')
const { Worker } = require('worker_threads')
const { join } = require('path')
const { pathToFileURL } = require('url')
const { wait } = require('./lib/wait')
const {
WRITE_INDEX,
READ_INDEX
} = require('./lib/indexes')
const buffer = require('buffer')
const assert = require('assert')
const kImpl = Symbol('kImpl')
// V8 limit for string size
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
class FakeWeakRef {
constructor (value) {
this._value = value
}
deref () {
return this._value
}
}
class FakeFinalizationRegistry {
register () {}
unregister () {}
}
// Currently using FinalizationRegistry with code coverage breaks the world
// Ref: https://github.com/nodejs/node/issues/49344
const FinalizationRegistry = process.env.NODE_V8_COVERAGE ? FakeFinalizationRegistry : global.FinalizationRegistry || FakeFinalizationRegistry
const WeakRef = process.env.NODE_V8_COVERAGE ? FakeWeakRef : global.WeakRef || FakeWeakRef
const registry = new FinalizationRegistry((worker) => {
if (worker.exited) {
return
}
worker.terminate()
})
function createWorker (stream, opts) {
const { filename, workerData } = opts
const bundlerOverrides = '__bundlerPathsOverrides' in globalThis ? globalThis.__bundlerPathsOverrides : {}
const toExecute = bundlerOverrides['thread-stream-worker'] || join(__dirname, 'lib', 'worker.js')
const worker = new Worker(toExecute, {
...opts.workerOpts,
trackUnmanagedFds: false,
workerData: {
filename: filename.indexOf('file://') === 0
? filename
: pathToFileURL(filename).href,
dataBuf: stream[kImpl].dataBuf,
stateBuf: stream[kImpl].stateBuf,
workerData: {
$context: {
threadStreamVersion: version
},
...workerData
}
}
})
// We keep a strong reference for now,
// we need to start writing first
worker.stream = new FakeWeakRef(stream)
worker.on('message', onWorkerMessage)
worker.on('exit', onWorkerExit)
registry.register(stream, worker)
return worker
}
function drain (stream) {
assert(!stream[kImpl].sync)
if (stream[kImpl].needDrain) {
stream[kImpl].needDrain = false
stream.emit('drain')
}
}
function nextFlush (stream) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
if (leftover > 0) {
if (stream[kImpl].buf.length === 0) {
stream[kImpl].flushing = false
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
}
return
}
let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, nextFlush.bind(null, stream))
} else {
// multi-byte utf-8
stream.flush(() => {
// err is already handled in flush()
if (stream.destroyed) {
return
}
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].data.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, nextFlush.bind(null, stream))
})
}
} else if (leftover === 0) {
if (writeIndex === 0 && stream[kImpl].buf.length === 0) {
// we had a flushSync in the meanwhile
return
}
stream.flush(() => {
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
nextFlush(stream)
})
} else {
// This should never happen
destroy(stream, new Error('overwritten'))
}
}
function onWorkerMessage (msg) {
const stream = this.stream.deref()
if (stream === undefined) {
this.exited = true
// Terminate the worker.
this.terminate()
return
}
switch (msg.code) {
case 'READY':
// Replace the FakeWeakRef with a
// proper one.
this.stream = new WeakRef(stream)
stream.flush(() => {
stream[kImpl].ready = true
stream.emit('ready')
})
break
case 'ERROR':
destroy(stream, msg.err)
break
case 'EVENT':
if (Array.isArray(msg.args)) {
stream.emit(msg.name, ...msg.args)
} else {
stream.emit(msg.name, msg.args)
}
break
case 'WARNING':
process.emitWarning(msg.err)
break
default:
destroy(stream, new Error('this should not happen: ' + msg.code))
}
}
function onWorkerExit (code) {
const stream = this.stream.deref()
if (stream === undefined) {
// Nothing to do, the worker already exit
return
}
registry.unregister(stream)
stream.worker.exited = true
stream.worker.off('exit', onWorkerExit)
destroy(stream, code !== 0 ? new Error('the worker thread exited') : null)
}
class ThreadStream extends EventEmitter {
constructor (opts = {}) {
super()
if (opts.bufferSize < 4) {
throw new Error('bufferSize must at least fit a 4-byte utf-8 char')
}
this[kImpl] = {}
this[kImpl].stateBuf = new SharedArrayBuffer(128)
this[kImpl].state = new Int32Array(this[kImpl].stateBuf)
this[kImpl].dataBuf = new SharedArrayBuffer(opts.bufferSize || 4 * 1024 * 1024)
this[kImpl].data = Buffer.from(this[kImpl].dataBuf)
this[kImpl].sync = opts.sync || false
this[kImpl].ending = false
this[kImpl].ended = false
this[kImpl].needDrain = false
this[kImpl].destroyed = false
this[kImpl].flushing = false
this[kImpl].ready = false
this[kImpl].finished = false
this[kImpl].errored = null
this[kImpl].closed = false
this[kImpl].buf = ''
// TODO (fix): Make private?
this.worker = createWorker(this, opts) // TODO (fix): make private
this.on('message', (message, transferList) => {
this.worker.postMessage(message, transferList)
})
}
write (data) {
if (this[kImpl].destroyed) {
error(this, new Error('the worker has exited'))
return false
}
if (this[kImpl].ending) {
error(this, new Error('the worker is ending'))
return false
}
if (this[kImpl].flushing && this[kImpl].buf.length + data.length >= MAX_STRING) {
try {
writeSync(this)
this[kImpl].flushing = true
} catch (err) {
destroy(this, err)
return false
}
}
this[kImpl].buf += data
if (this[kImpl].sync) {
try {
writeSync(this)
return true
} catch (err) {
destroy(this, err)
return false
}
}
if (!this[kImpl].flushing) {
this[kImpl].flushing = true
setImmediate(nextFlush, this)
}
this[kImpl].needDrain = this[kImpl].data.length - this[kImpl].buf.length - Atomics.load(this[kImpl].state, WRITE_INDEX) <= 0
return !this[kImpl].needDrain
}
end () {
if (this[kImpl].destroyed) {
return
}
this[kImpl].ending = true
end(this)
}
flush (cb) {
if (this[kImpl].destroyed) {
if (typeof cb === 'function') {
process.nextTick(cb, new Error('the worker has exited'))
}
return
}
// TODO write all .buf
const writeIndex = Atomics.load(this[kImpl].state, WRITE_INDEX)
// process._rawDebug(`(flush) readIndex (${Atomics.load(this.state, READ_INDEX)}) writeIndex (${Atomics.load(this.state, WRITE_INDEX)})`)
wait(this[kImpl].state, READ_INDEX, writeIndex, Infinity, (err, res) => {
if (err) {
destroy(this, err)
process.nextTick(cb, err)
return
}
if (res === 'not-equal') {
// TODO handle deadlock
this.flush(cb)
return
}
process.nextTick(cb)
})
}
flushSync () {
if (this[kImpl].destroyed) {
return
}
writeSync(this)
flushSync(this)
}
unref () {
this.worker.unref()
}
ref () {
this.worker.ref()
}
get ready () {
return this[kImpl].ready
}
get destroyed () {
return this[kImpl].destroyed
}
get closed () {
return this[kImpl].closed
}
get writable () {
return !this[kImpl].destroyed && !this[kImpl].ending
}
get writableEnded () {
return this[kImpl].ending
}
get writableFinished () {
return this[kImpl].finished
}
get writableNeedDrain () {
return this[kImpl].needDrain
}
get writableObjectMode () {
return false
}
get writableErrored () {
return this[kImpl].errored
}
}
function error (stream, err) {
setImmediate(() => {
stream.emit('error', err)
})
}
function destroy (stream, err) {
if (stream[kImpl].destroyed) {
return
}
stream[kImpl].destroyed = true
if (err) {
stream[kImpl].errored = err
error(stream, err)
}
if (!stream.worker.exited) {
stream.worker.terminate()
.catch(() => {})
.then(() => {
stream[kImpl].closed = true
stream.emit('close')
})
} else {
setImmediate(() => {
stream[kImpl].closed = true
stream.emit('close')
})
}
}
function write (stream, data, cb) {
// data is smaller than the shared buffer length
const current = Atomics.load(stream[kImpl].state, WRITE_INDEX)
const length = Buffer.byteLength(data)
stream[kImpl].data.write(data, current)
Atomics.store(stream[kImpl].state, WRITE_INDEX, current + length)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
cb()
return true
}
function end (stream) {
if (stream[kImpl].ended || !stream[kImpl].ending || stream[kImpl].flushing) {
return
}
stream[kImpl].ended = true
try {
stream.flushSync()
let readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
// process._rawDebug('writing index')
Atomics.store(stream[kImpl].state, WRITE_INDEX, -1)
// process._rawDebug(`(end) readIndex (${Atomics.load(stream.state, READ_INDEX)}) writeIndex (${Atomics.load(stream.state, WRITE_INDEX)})`)
Atomics.notify(stream[kImpl].state, WRITE_INDEX)
// Wait for the process to complete
let spins = 0
while (readIndex !== -1) {
// process._rawDebug(`read = ${read}`)
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
if (readIndex === -2) {
destroy(stream, new Error('end() failed'))
return
}
if (++spins === 10) {
destroy(stream, new Error('end() took too long (10s)'))
return
}
}
process.nextTick(() => {
stream[kImpl].finished = true
stream.emit('finish')
})
} catch (err) {
destroy(stream, err)
}
// process._rawDebug('end finished...')
}
function writeSync (stream) {
const cb = () => {
if (stream[kImpl].ending) {
end(stream)
} else if (stream[kImpl].needDrain) {
process.nextTick(drain, stream)
}
}
stream[kImpl].flushing = false
while (stream[kImpl].buf.length !== 0) {
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let leftover = stream[kImpl].data.length - writeIndex
if (leftover === 0) {
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
continue
} else if (leftover < 0) {
// stream should never happen
throw new Error('overwritten')
}
let toWrite = stream[kImpl].buf.slice(0, leftover)
let toWriteBytes = Buffer.byteLength(toWrite)
if (toWriteBytes <= leftover) {
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
// process._rawDebug('writing ' + toWrite.length)
write(stream, toWrite, cb)
} else {
// multi-byte utf-8
flushSync(stream)
Atomics.store(stream[kImpl].state, READ_INDEX, 0)
Atomics.store(stream[kImpl].state, WRITE_INDEX, 0)
// Find a toWrite length that fits the buffer
// it must exists as the buffer is at least 4 bytes length
// and the max utf-8 length for a char is 4 bytes.
while (toWriteBytes > stream[kImpl].buf.length) {
leftover = leftover / 2
toWrite = stream[kImpl].buf.slice(0, leftover)
toWriteBytes = Buffer.byteLength(toWrite)
}
stream[kImpl].buf = stream[kImpl].buf.slice(leftover)
write(stream, toWrite, cb)
}
}
}
function flushSync (stream) {
if (stream[kImpl].flushing) {
throw new Error('unable to flush while flushing')
}
// process._rawDebug('flushSync started')
const writeIndex = Atomics.load(stream[kImpl].state, WRITE_INDEX)
let spins = 0
// TODO handle deadlock
while (true) {
const readIndex = Atomics.load(stream[kImpl].state, READ_INDEX)
if (readIndex === -2) {
throw Error('_flushSync failed')
}
// process._rawDebug(`(flushSync) readIndex (${readIndex}) writeIndex (${writeIndex})`)
if (readIndex !== writeIndex) {
// TODO stream timeouts for some reason.
Atomics.wait(stream[kImpl].state, READ_INDEX, readIndex, 1000)
} else {
break
}
if (++spins === 10) {
throw new Error('_flushSync took too long (10s)')
}
}
// process._rawDebug('flushSync finished')
}
module.exports = ThreadStream

9
node_modules/thread-stream/lib/indexes.js generated vendored Normal file
View File

@@ -0,0 +1,9 @@
'use strict'
const WRITE_INDEX = 4
const READ_INDEX = 8
module.exports = {
WRITE_INDEX,
READ_INDEX
}

61
node_modules/thread-stream/lib/wait.js generated vendored Normal file
View File

@@ -0,0 +1,61 @@
'use strict'
const MAX_TIMEOUT = 1000
function wait (state, index, expected, timeout, done) {
const max = Date.now() + timeout
let current = Atomics.load(state, index)
if (current === expected) {
done(null, 'ok')
return
}
let prior = current
const check = (backoff) => {
if (Date.now() > max) {
done(null, 'timed-out')
} else {
setTimeout(() => {
prior = current
current = Atomics.load(state, index)
if (current === prior) {
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2)
} else {
if (current === expected) done(null, 'ok')
else done(null, 'not-equal')
}
}, backoff)
}
}
check(1)
}
// let waitDiffCount = 0
function waitDiff (state, index, expected, timeout, done) {
// const id = waitDiffCount++
// process._rawDebug(`>>> waitDiff ${id}`)
const max = Date.now() + timeout
let current = Atomics.load(state, index)
if (current !== expected) {
done(null, 'ok')
return
}
const check = (backoff) => {
// process._rawDebug(`${id} ${index} current ${current} expected ${expected}`)
// process._rawDebug('' + backoff)
if (Date.now() > max) {
done(null, 'timed-out')
} else {
setTimeout(() => {
current = Atomics.load(state, index)
if (current !== expected) {
done(null, 'ok')
} else {
check(backoff >= MAX_TIMEOUT ? MAX_TIMEOUT : backoff * 2)
}
}, backoff)
}
}
check(1)
}
module.exports = { wait, waitDiff }

174
node_modules/thread-stream/lib/worker.js generated vendored Normal file
View File

@@ -0,0 +1,174 @@
'use strict'
const { realImport, realRequire } = require('real-require')
const { workerData, parentPort } = require('worker_threads')
const { WRITE_INDEX, READ_INDEX } = require('./indexes')
const { waitDiff } = require('./wait')
const {
dataBuf,
filename,
stateBuf
} = workerData
let destination
const state = new Int32Array(stateBuf)
const data = Buffer.from(dataBuf)
async function start () {
let worker
try {
if (filename.endsWith('.ts') || filename.endsWith('.cts')) {
// TODO: add support for the TSM modules loader ( https://github.com/lukeed/tsm ).
if (!process[Symbol.for('ts-node.register.instance')]) {
realRequire('ts-node/register')
} else if (process.env.TS_NODE_DEV) {
realRequire('ts-node-dev')
}
// TODO: Support ES imports once tsc, tap & ts-node provide better compatibility guarantees.
// Remove extra forwardslash on Windows
worker = realRequire(decodeURIComponent(filename.replace(process.platform === 'win32' ? 'file:///' : 'file://', '')))
} else {
worker = (await realImport(filename))
}
} catch (error) {
// A yarn user that tries to start a ThreadStream for an external module
// provides a filename pointing to a zip file.
// eg. require.resolve('pino-elasticsearch') // returns /foo/pino-elasticsearch-npm-6.1.0-0c03079478-6915435172.zip/bar.js
// The `import` will fail to try to load it.
// This catch block executes the `require` fallback to load the module correctly.
// In fact, yarn modifies the `require` function to manage the zipped path.
// More details at https://github.com/pinojs/pino/pull/1113
// The error codes may change based on the node.js version (ENOTDIR > 12, ERR_MODULE_NOT_FOUND <= 12 )
if ((error.code === 'ENOTDIR' || error.code === 'ERR_MODULE_NOT_FOUND') &&
filename.startsWith('file://')) {
worker = realRequire(decodeURIComponent(filename.replace('file://', '')))
} else if (error.code === undefined || error.code === 'ERR_VM_DYNAMIC_IMPORT_CALLBACK_MISSING') {
// When bundled with pkg, an undefined error is thrown when called with realImport
// When bundled with pkg and using node v20, an ERR_VM_DYNAMIC_IMPORT_CALLBACK_MISSING error is thrown when called with realImport
// More info at: https://github.com/pinojs/thread-stream/issues/143
try {
worker = realRequire(decodeURIComponent(filename.replace(process.platform === 'win32' ? 'file:///' : 'file://', '')))
} catch {
throw error
}
} else {
throw error
}
}
// Depending on how the default export is performed, and on how the code is
// transpiled, we may find cases of two nested "default" objects.
// See https://github.com/pinojs/pino/issues/1243#issuecomment-982774762
if (typeof worker === 'object') worker = worker.default
if (typeof worker === 'object') worker = worker.default
destination = await worker(workerData.workerData)
destination.on('error', function (err) {
Atomics.store(state, WRITE_INDEX, -2)
Atomics.notify(state, WRITE_INDEX)
Atomics.store(state, READ_INDEX, -2)
Atomics.notify(state, READ_INDEX)
parentPort.postMessage({
code: 'ERROR',
err
})
})
destination.on('close', function () {
// process._rawDebug('worker close emitted')
const end = Atomics.load(state, WRITE_INDEX)
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
setImmediate(() => {
process.exit(0)
})
})
}
// No .catch() handler,
// in case there is an error it goes
// to unhandledRejection
start().then(function () {
parentPort.postMessage({
code: 'READY'
})
process.nextTick(run)
})
function run () {
const current = Atomics.load(state, READ_INDEX)
const end = Atomics.load(state, WRITE_INDEX)
// process._rawDebug(`pre state ${current} ${end}`)
if (end === current) {
if (end === data.length) {
waitDiff(state, READ_INDEX, end, Infinity, run)
} else {
waitDiff(state, WRITE_INDEX, end, Infinity, run)
}
return
}
// process._rawDebug(`post state ${current} ${end}`)
if (end === -1) {
// process._rawDebug('end')
destination.end()
return
}
const toWrite = data.toString('utf8', current, end)
// process._rawDebug('worker writing: ' + toWrite)
const res = destination.write(toWrite)
if (res) {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
setImmediate(run)
} else {
destination.once('drain', function () {
Atomics.store(state, READ_INDEX, end)
Atomics.notify(state, READ_INDEX)
run()
})
}
}
process.on('unhandledRejection', function (err) {
parentPort.postMessage({
code: 'ERROR',
err
})
process.exit(1)
})
process.on('uncaughtException', function (err) {
parentPort.postMessage({
code: 'ERROR',
err
})
process.exit(1)
})
process.once('exit', exitCode => {
if (exitCode !== 0) {
process.exit(exitCode)
return
}
if (destination?.writableNeedDrain && !destination?.writableEnded) {
parentPort.postMessage({
code: 'WARNING',
err: new Error('ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream')
})
}
process.exit(0)
})

57
node_modules/thread-stream/package.json generated vendored Normal file
View File

@@ -0,0 +1,57 @@
{
"name": "thread-stream",
"version": "3.1.0",
"description": "A streaming way to send data to a Node.js Worker Thread",
"main": "index.js",
"types": "index.d.ts",
"dependencies": {
"real-require": "^0.2.0"
},
"devDependencies": {
"@types/node": "^20.1.0",
"@types/tap": "^15.0.0",
"@yao-pkg/pkg": "^5.11.5",
"desm": "^1.3.0",
"fastbench": "^1.0.1",
"husky": "^9.0.6",
"pino-elasticsearch": "^8.0.0",
"sonic-boom": "^4.0.1",
"standard": "^17.0.0",
"tap": "^16.2.0",
"ts-node": "^10.8.0",
"typescript": "^5.3.2",
"why-is-node-running": "^2.2.2"
},
"scripts": {
"build": "tsc --noEmit",
"test": "standard && npm run build && npm run transpile && tap \"test/**/*.test.*js\" && tap --ts test/*.test.*ts",
"test:ci": "standard && npm run transpile && npm run test:ci:js && npm run test:ci:ts",
"test:ci:js": "tap --no-check-coverage --timeout=120 --coverage-report=lcovonly \"test/**/*.test.*js\"",
"test:ci:ts": "tap --ts --no-check-coverage --coverage-report=lcovonly \"test/**/*.test.*ts\"",
"test:yarn": "npm run transpile && tap \"test/**/*.test.js\" --no-check-coverage",
"transpile": "sh ./test/ts/transpile.sh",
"prepare": "husky install"
},
"standard": {
"ignore": [
"test/ts/**/*",
"test/syntax-error.mjs"
]
},
"repository": {
"type": "git",
"url": "git+https://github.com/mcollina/thread-stream.git"
},
"keywords": [
"worker",
"thread",
"threads",
"stream"
],
"author": "Matteo Collina <hello@matteocollina.com>",
"license": "MIT",
"bugs": {
"url": "https://github.com/mcollina/thread-stream/issues"
},
"homepage": "https://github.com/mcollina/thread-stream#readme"
}

285
node_modules/thread-stream/test/base.test.js generated vendored Normal file
View File

@@ -0,0 +1,285 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { readFile } = require('fs')
const { file } = require('./helper')
const ThreadStream = require('..')
const { MessageChannel } = require('worker_threads')
const { once } = require('events')
test('base sync=true', function (t) {
t.plan(15)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: true
})
t.same(stream.writableObjectMode, false)
t.same(stream.writableFinished, false)
stream.on('finish', () => {
t.same(stream.writableFinished, true)
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
t.same(stream.closed, false)
stream.on('close', () => {
t.same(stream.closed, true)
t.notOk(stream.writable)
t.pass('close emitted')
})
t.same(stream.writableNeedDrain, false)
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
t.same(stream.writableEnded, false)
stream.end()
t.same(stream.writableEnded, true)
})
test('overflow sync=true', function (t) {
t.plan(3)
const dest = file()
const stream = new ThreadStream({
bufferSize: 128,
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: true
})
let count = 0
// Write 10 chars, 20 times
function write () {
if (count++ === 20) {
stream.end()
return
}
stream.write('aaaaaaaaaa')
// do not wait for drain event
setImmediate(write)
}
write()
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
})
})
})
test('overflow sync=false', function (t) {
const dest = file()
const stream = new ThreadStream({
bufferSize: 128,
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
let count = 0
t.same(stream.writableNeedDrain, false)
// Write 10 chars, 20 times
function write () {
if (count++ === 20) {
t.pass('end sent')
stream.end()
return
}
if (!stream.write('aaaaaaaaaa')) {
t.same(stream.writableNeedDrain, true)
}
// do not wait for drain event
setImmediate(write)
}
write()
stream.on('drain', () => {
t.same(stream.writableNeedDrain, false)
t.pass('drain')
})
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
t.end()
})
})
})
test('over the bufferSize at startup', function (t) {
t.plan(6)
const dest = file()
const stream = new ThreadStream({
bufferSize: 10,
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: true
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
t.ok(stream.write('hello'))
t.ok(stream.write(' world\n'))
t.ok(stream.write('something else\n'))
stream.end()
})
test('over the bufferSize at startup (async)', function (t) {
t.plan(6)
const dest = file()
const stream = new ThreadStream({
bufferSize: 10,
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
t.ok(stream.write('hello'))
t.notOk(stream.write(' world\n'))
t.notOk(stream.write('something else\n'))
stream.end()
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
})
test('flushSync sync=false', function (t) {
const dest = file()
const stream = new ThreadStream({
bufferSize: 128,
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('drain', () => {
t.pass('drain')
stream.end()
})
stream.on('finish', () => {
t.pass('finish emitted')
})
stream.on('close', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data.length, 200)
t.end()
})
})
for (let count = 0; count < 20; count++) {
stream.write('aaaaaaaaaa')
}
stream.flushSync()
})
test('pass down MessagePorts', async function (t) {
t.plan(3)
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'port.js'),
workerData: { port: port1 },
workerOpts: {
transferList: [port1]
},
sync: false
})
t.teardown(() => {
stream.end()
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
const [strings] = await once(port2, 'message')
t.equal(strings, 'hello world\nsomething else\n')
})
test('destroy does not error', function (t) {
t.plan(5)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('ready', () => {
t.pass('ready emitted')
stream.worker.terminate()
})
stream.on('error', (err) => {
t.equal(err.message, 'the worker thread exited')
stream.flush((err) => {
t.equal(err.message, 'the worker has exited')
})
t.doesNotThrow(() => stream.flushSync())
t.doesNotThrow(() => stream.end())
})
})
test('syntax error', function (t) {
t.plan(1)
const stream = new ThreadStream({
filename: join(__dirname, 'syntax-error.mjs')
})
stream.on('error', (err) => {
t.equal(err.message, 'Unexpected end of input')
})
})

38
node_modules/thread-stream/test/bench.test.js generated vendored Normal file
View File

@@ -0,0 +1,38 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const ThreadStream = require('..')
const { file } = require('./helper')
const MAX = 1000
let str = ''
for (let i = 0; i < 10; i++) {
str += 'hello'
}
test('base', function (t) {
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest }
})
let runs = 0
function benchThreadStream () {
if (++runs === 1000) {
stream.end()
return
}
for (let i = 0; i < MAX; i++) {
stream.write(str)
}
setImmediate(benchThreadStream)
}
benchThreadStream()
stream.on('finish', function () {
t.end()
})
})

60
node_modules/thread-stream/test/bundlers.test.js generated vendored Normal file
View File

@@ -0,0 +1,60 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { file } = require('./helper')
const ThreadStream = require('..')
test('bundlers support with .js file', function (t) {
t.plan(1)
globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': join(__dirname, 'custom-worker.js')
}
const dest = file()
process.on('uncaughtException', error => {
console.log(error)
})
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: true
})
stream.worker.removeAllListeners('message')
stream.worker.once('message', message => {
t.equal(message.code, 'CUSTOM-WORKER-CALLED')
})
stream.end()
})
test('bundlers support with .mjs file', function (t) {
t.plan(1)
globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': join(__dirname, 'custom-worker.js')
}
const dest = file()
process.on('uncaughtException', error => {
console.log(error)
})
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.mjs'),
workerData: { dest },
sync: true
})
stream.worker.removeAllListeners('message')
stream.worker.once('message', message => {
t.equal(message.code, 'CUSTOM-WORKER-CALLED')
})
stream.end()
})

37
node_modules/thread-stream/test/close-on-gc.js generated vendored Normal file
View File

@@ -0,0 +1,37 @@
'use strict'
const { join } = require('path')
const ThreadStream = require('..')
const assert = require('assert')
let worker = null
function setup () {
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest: process.argv[2] },
sync: true
})
worker = stream.worker
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()
// the stream object goes out of scope here
setImmediate(gc) // eslint-disable-line
}
setup()
let exitEmitted = false
worker.on('exit', function () {
exitEmitted = true
})
process.on('exit', function () {
assert.strictEqual(exitEmitted, true)
})

View File

@@ -0,0 +1,80 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { MessageChannel } = require('worker_threads')
const { once } = require('events')
const ThreadStream = require('..')
const isYarnPnp = process.versions.pnp !== undefined
test('yarn module resolution', { skip: !isYarnPnp }, t => {
t.plan(6)
const modulePath = require.resolve('pino-elasticsearch')
t.match(modulePath, /.*\.zip.*/)
const stream = new ThreadStream({
filename: modulePath,
workerData: { node: null },
sync: true
})
t.same(stream.writableErrored, null)
stream.on('error', (err) => {
t.same(stream.writableErrored, err)
t.pass('error emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.writable)
stream.end()
})
test('yarn module resolution for directories with special characters', { skip: !isYarnPnp }, async t => {
t.plan(3)
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'dir with spaces', 'test-package.zip', 'worker.js'),
workerData: { port: port1 },
workerOpts: {
transferList: [port1]
},
sync: false
})
t.teardown(() => {
stream.end()
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
const [strings] = await once(port2, 'message')
t.equal(strings, 'hello world\nsomething else\n')
})
test('yarn module resolution for typescript commonjs modules', { skip: !isYarnPnp }, async t => {
t.plan(3)
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'ts-commonjs-default-export.zip', 'worker.js'),
workerData: { port: port1 },
workerOpts: {
transferList: [port1]
},
sync: false
})
t.teardown(() => {
stream.end()
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
const [strings] = await once(port2, 'message')
t.equal(strings, 'hello world\nsomething else\n')
})

21
node_modules/thread-stream/test/context.test.js generated vendored Normal file
View File

@@ -0,0 +1,21 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const ThreadStream = require('..')
const { version } = require('../package.json')
require('why-is-node-running')
test('get context', (t) => {
const stream = new ThreadStream({
filename: join(__dirname, 'get-context.js'),
workerData: {},
sync: true
})
t.on('end', () => stream.end())
stream.on('context', (ctx) => {
t.same(ctx.threadStreamVersion, version)
t.end()
})
stream.write('hello')
})

16
node_modules/thread-stream/test/create-and-exit.js generated vendored Normal file
View File

@@ -0,0 +1,16 @@
'use strict'
const { join } = require('path')
const ThreadStream = require('..')
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest: process.argv[2] },
sync: true
})
stream.write('hello')
stream.write(' ')
stream.write('world\n')
stream.flushSync()
stream.unref()

9
node_modules/thread-stream/test/custom-worker.js generated vendored Normal file
View File

@@ -0,0 +1,9 @@
'use strict'
const { parentPort } = require('worker_threads')
parentPort.postMessage({
code: 'CUSTOM-WORKER-CALLED'
})
require('../lib/worker')

Binary file not shown.

22
node_modules/thread-stream/test/emit-event.js generated vendored Normal file
View File

@@ -0,0 +1,22 @@
'use strict'
const { Writable } = require('stream')
const parentPort = require('worker_threads').parentPort
async function run () {
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
if (parentPort) {
parentPort.postMessage({
code: 'EVENT',
name: 'socketError',
args: ['list', 'of', 'args', 123, new Error('unable to write data to the TCP socket')]
})
}
cb()
}
})
}
module.exports = run

61
node_modules/thread-stream/test/end.test.js generated vendored Normal file
View File

@@ -0,0 +1,61 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { readFile } = require('fs')
const { file } = require('./helper')
const ThreadStream = require('..')
test('destroy support', function (t) {
t.plan(7)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file-on-destroy.js'),
workerData: { dest },
sync: true
})
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
test('synchronous _final support', function (t) {
t.plan(7)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file-on-final.js'),
workerData: { dest },
sync: true
})
stream.on('close', () => {
t.notOk(stream.writable)
t.pass('close emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
t.ok(stream.writable)
stream.end()
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})

14
node_modules/thread-stream/test/error.js generated vendored Normal file
View File

@@ -0,0 +1,14 @@
'use strict'
const { Writable } = require('stream')
async function run (opts) {
const stream = new Writable({
write (chunk, enc, cb) {
cb(new Error('kaboom'))
}
})
return stream
}
module.exports = run

47
node_modules/thread-stream/test/esm.test.mjs generated vendored Normal file
View File

@@ -0,0 +1,47 @@
import { test } from 'tap'
import { readFile } from 'fs'
import ThreadStream from '../index.js'
import { join } from 'desm'
import { pathToFileURL } from 'url'
import { file } from './helper.js'
function basic (text, filename) {
test(text, function (t) {
t.plan(5)
const dest = file()
const stream = new ThreadStream({
filename,
workerData: { dest },
sync: true
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
stream.end()
})
}
basic('esm with path', join(import.meta.url, 'to-file.mjs'))
basic('esm with file URL', pathToFileURL(join(import.meta.url, 'to-file.mjs')).href)
basic('(ts -> es6) esm with path', join(import.meta.url, 'ts', 'to-file.es6.mjs'))
basic('(ts -> es6) esm with file URL', pathToFileURL(join(import.meta.url, 'ts', 'to-file.es6.mjs')).href)
basic('(ts -> es2017) esm with path', join(import.meta.url, 'ts', 'to-file.es2017.mjs'))
basic('(ts -> es2017) esm with file URL', pathToFileURL(join(import.meta.url, 'ts', 'to-file.es2017.mjs')).href)
basic('(ts -> esnext) esm with path', join(import.meta.url, 'ts', 'to-file.esnext.mjs'))
basic('(ts -> esnext) esm with file URL', pathToFileURL(join(import.meta.url, 'ts', 'to-file.esnext.mjs')).href)

23
node_modules/thread-stream/test/event.test.js generated vendored Normal file
View File

@@ -0,0 +1,23 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const ThreadStream = require('..')
test('event propagate', t => {
const stream = new ThreadStream({
filename: join(__dirname, 'emit-event.js'),
workerData: {},
sync: true
})
t.on('end', () => stream.end())
stream.on('socketError', function (a, b, c, n, error) {
t.same(a, 'list')
t.same(b, 'of')
t.same(c, 'args')
t.same(n, 123)
t.same(error, new Error('unable to write data to the TCP socket'))
t.end()
})
stream.write('hello')
})

14
node_modules/thread-stream/test/exit.js generated vendored Normal file
View File

@@ -0,0 +1,14 @@
'use strict'
const { Writable } = require('stream')
async function run (opts) {
const stream = new Writable({
write (chunk, enc, cb) {
process.exit(1)
}
})
return stream
}
module.exports = run

22
node_modules/thread-stream/test/get-context.js generated vendored Normal file
View File

@@ -0,0 +1,22 @@
'use strict'
const { Writable } = require('stream')
const parentPort = require('worker_threads').parentPort
async function run (opts) {
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
if (parentPort) {
parentPort.postMessage({
code: 'EVENT',
name: 'context',
args: opts.$context
})
}
cb()
}
})
}
module.exports = run

1
node_modules/thread-stream/test/helper.d.ts generated vendored Normal file
View File

@@ -0,0 +1 @@
export declare function file(): string

35
node_modules/thread-stream/test/helper.js generated vendored Normal file
View File

@@ -0,0 +1,35 @@
'use strict'
const { join } = require('path')
const { tmpdir } = require('os')
const { unlinkSync } = require('fs')
const t = require('tap')
const files = []
let count = 0
function file () {
const file = join(tmpdir(), `thread-stream-${process.pid}-${count++}`)
files.push(file)
return file
}
process.on('beforeExit', () => {
t.comment('unlink files')
for (const file of files) {
try {
t.comment(`unliking ${file}`)
unlinkSync(file)
} catch (e) {
console.log(e)
}
}
t.comment('unlink completed')
})
module.exports.file = file
if (process.env.SKIP_PROCESS_EXIT_CHECK !== 'true') {
const why = require('why-is-node-running')
setInterval(why, 10000).unref()
}

11
node_modules/thread-stream/test/indexes.test.js generated vendored Normal file
View File

@@ -0,0 +1,11 @@
'use strict'
const { test } = require('tap')
const indexes = require('../lib/indexes')
for (const index of Object.keys(indexes)) {
test(`${index} is lock free`, function (t) {
t.equal(Atomics.isLockFree(indexes[index]), true)
t.end()
})
}

View File

@@ -0,0 +1,74 @@
import { test } from 'tap'
import { readFile } from 'fs'
import ThreadStream from '../index.js'
import { join } from 'desm'
import { file } from './helper.js'
test('break up utf8 multibyte (sync)', (t) => {
t.plan(2)
const longString = '\u03A3'.repeat(16)
const dest = file()
const stream = new ThreadStream({
bufferSize: 15, // this must be odd
filename: join(import.meta.url, 'to-file.js'),
workerData: { dest },
sync: true
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, longString)
})
})
stream.write(longString)
stream.end()
})
test('break up utf8 multibyte (async)', (t) => {
t.plan(2)
const longString = '\u03A3'.repeat(16)
const dest = file()
const stream = new ThreadStream({
bufferSize: 15, // this must be odd
filename: join(import.meta.url, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, longString)
})
})
stream.write(longString)
stream.end()
})
test('break up utf8 multibyte several times bigger than write buffer', (t) => {
t.plan(2)
const longString = '\u03A3'.repeat(32)
const dest = file()
const stream = new ThreadStream({
bufferSize: 15, // this must be odd
filename: join(import.meta.url, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, longString)
})
})
stream.write(longString)
stream.end()
})

57
node_modules/thread-stream/test/never-drain.test.js generated vendored Normal file
View File

@@ -0,0 +1,57 @@
const { test } = require('tap')
const ThreadStream = require('../index')
const { join } = require('path')
function retryUntilTimeout (fn, timeout) {
const start = Date.now()
return new Promise((resolve, reject) => {
async function run () {
if (fn()) {
resolve()
return
}
if (Date.now() - start >= timeout) {
reject(new Error('timeout'))
return
}
setTimeout(run, 10)
}
run()
})
}
const isNode18 = process.version.indexOf('v18') === 0
test('emit warning when the worker gracefully exit without the stream ended', { skip: !isNode18 }, async function (t) {
const expectedWarning = 'ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream'
const stream = new ThreadStream({
filename: join(__dirname, 'to-next.js')
})
stream.unref()
let streamWarning
function saveWarning (e) {
if (e.message === expectedWarning) {
streamWarning = e
}
}
process.on('warning', saveWarning)
const data = 'hello'.repeat(10)
for (let i = 0; i < 1000; i++) {
if (streamWarning?.message === expectedWarning) {
break
}
stream.write(data)
await new Promise((resolve) => {
setTimeout(resolve, 1)
})
}
process.off('warning', saveWarning)
t.equal(streamWarning?.message, expectedWarning)
await retryUntilTimeout(() => stream.worker.exited === true, 3000)
})

18
node_modules/thread-stream/test/on-message.js generated vendored Normal file
View File

@@ -0,0 +1,18 @@
'use strict'
const { parentPort } = require('worker_threads')
const { Writable } = require('stream')
function run () {
parentPort.once('message', function ({ text, takeThisPortPlease }) {
takeThisPortPlease.postMessage(`received: ${text}`)
})
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
cb()
}
})
}
module.exports = run

37
node_modules/thread-stream/test/pkg/index.js generated vendored Normal file
View File

@@ -0,0 +1,37 @@
'use strict'
/**
* This file is packaged using pkg in order to test if worker.js works in that context
*/
const { test } = require('tap')
const { join } = require('path')
const { file } = require('../helper')
const ThreadStream = require('../..')
test('bundlers support with .js file', function (t) {
t.plan(1)
globalThis.__bundlerPathsOverrides = {
'thread-stream-worker': join(__dirname, '..', 'custom-worker.js')
}
const dest = file()
process.on('uncaughtException', (error) => {
console.log(error)
})
const stream = new ThreadStream({
filename: join(__dirname, '..', 'to-file.js'),
workerData: { dest },
sync: true
})
stream.worker.removeAllListeners('message')
stream.worker.once('message', (message) => {
t.equal(message.code, 'CUSTOM-WORKER-CALLED')
})
stream.end()
})

15
node_modules/thread-stream/test/pkg/pkg.config.json generated vendored Normal file
View File

@@ -0,0 +1,15 @@
{
"pkg": {
"assets": [
"../custom-worker.js",
"../to-file.js"
],
"targets": [
"node14",
"node16",
"node18",
"node20"
],
"outputPath": "test/pkg"
}
}

46
node_modules/thread-stream/test/pkg/pkg.test.js generated vendored Normal file
View File

@@ -0,0 +1,46 @@
'use strict'
const { test } = require('tap')
const config = require('./pkg.config.json')
const { promisify } = require('util')
const { unlink } = require('fs/promises')
const { join } = require('path')
const { platform } = require('process')
const exec = promisify(require('child_process').exec)
test('worker test when packaged into executable using pkg', async (t) => {
const packageName = 'index'
// package the app into several node versions, check config for more info
const filePath = `${join(__dirname, packageName)}.js`
const configPath = join(__dirname, 'pkg.config.json')
process.env.NODE_OPTIONS ||= ''
process.env.NODE_OPTIONS = '--no-warnings'
const { stderr } = await exec(`npx pkg ${filePath} --config ${configPath}`)
// there should be no error when packaging
t.equal(stderr, '')
// pkg outputs files in the following format by default: {filename}-{node version}
for (const target of config.pkg.targets) {
// execute the packaged test
let executablePath = `${join(config.pkg.outputPath, packageName)}-${target}`
// when on windows, we need the .exe extension
if (platform === 'win32') {
executablePath = `${executablePath}.exe`
} else {
executablePath = `./${executablePath}`
}
const { stderr } = await exec(executablePath)
// check if there were no errors
t.equal(stderr, '')
// clean up afterwards
await unlink(executablePath)
}
t.end()
})

16
node_modules/thread-stream/test/port.js generated vendored Normal file
View File

@@ -0,0 +1,16 @@
'use strict'
const { Writable } = require('stream')
function run (opts) {
const { port } = opts
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
port.postMessage(chunk.toString())
cb()
}
})
}
module.exports = run

24
node_modules/thread-stream/test/post-message.test.js generated vendored Normal file
View File

@@ -0,0 +1,24 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { once } = require('events')
const { MessageChannel } = require('worker_threads')
const ThreadStream = require('..')
test('message events emitted on the stream are posted to the worker', async function (t) {
t.plan(1)
const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'on-message.js'),
sync: false
})
t.teardown(() => {
stream.end()
})
stream.emit('message', { text: 'hello', takeThisPortPlease: port1 }, [port1])
const [confirmation] = await once(port2, 'message')
t.equal(confirmation, 'received: hello')
})

41
node_modules/thread-stream/test/string-limit-2.test.js generated vendored Normal file
View File

@@ -0,0 +1,41 @@
'use strict'
const t = require('tap')
if (process.env.CI) {
t.skip('skip on CI')
process.exit(0)
}
const { join } = require('path')
const { file } = require('./helper')
const { createReadStream } = require('fs')
const ThreadStream = require('..')
const buffer = require('buffer')
const MAX_STRING = buffer.constants.MAX_STRING_LENGTH
t.plan(1)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
stream.on('close', async () => {
t.comment('close emitted')
let buf
for await (const chunk of createReadStream(dest)) {
buf = chunk
}
t.equal('asd', buf.toString().slice(-3))
})
stream.on('ready', () => {
t.comment('open emitted')
stream.write('a'.repeat(MAX_STRING - 2))
stream.write('asd')
stream.end()
})

42
node_modules/thread-stream/test/string-limit.test.js generated vendored Normal file
View File

@@ -0,0 +1,42 @@
'use strict'
const t = require('tap')
if (process.env.CI) {
t.skip('skip on CI')
process.exit(0)
}
const { join } = require('path')
const { file } = require('./helper')
const { stat } = require('fs')
const ThreadStream = require('..')
t.setTimeout(30000)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'to-file.js'),
workerData: { dest },
sync: false
})
let length = 0
stream.on('close', () => {
stat(dest, (err, f) => {
t.error(err)
t.equal(f.size, length)
t.end()
})
})
const buf = Buffer.alloc(1024).fill('x').toString() // 1 KB
// This writes 1 GB of data
for (let i = 0; i < 1024 * 1024; i++) {
length += buf.length
stream.write(buf)
}
stream.end()

2
node_modules/thread-stream/test/syntax-error.mjs generated vendored Normal file
View File

@@ -0,0 +1,2 @@
// this is a syntax error
import

View File

@@ -0,0 +1,121 @@
'use strict'
const { test } = require('tap')
const { fork } = require('child_process')
const { join } = require('path')
const { readFile } = require('fs').promises
const { file } = require('./helper')
const { once } = require('events')
const ThreadStream = require('..')
test('exits with 0', async function (t) {
const dest = file()
const child = fork(join(__dirname, 'create-and-exit.js'), [dest])
const [code] = await once(child, 'exit')
t.equal(code, 0)
const data = await readFile(dest, 'utf8')
t.equal(data, 'hello world\n')
})
test('emit error if thread exits', async function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'exit.js'),
sync: true
})
stream.on('ready', () => {
stream.write('hello world\n')
})
let [err] = await once(stream, 'error')
t.equal(err.message, 'the worker thread exited')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})
test('emit error if thread have unhandledRejection', async function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'unhandledRejection.js'),
sync: true
})
stream.on('ready', () => {
stream.write('hello world\n')
})
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})
test('emit error if worker stream emit error', async function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'error.js'),
sync: true
})
stream.on('ready', () => {
stream.write('hello world\n')
})
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})
test('emit error if thread have uncaughtException', async function (t) {
const stream = new ThreadStream({
filename: join(__dirname, 'uncaughtException.js'),
sync: true
})
stream.on('ready', () => {
stream.write('hello world\n')
})
let [err] = await once(stream, 'error')
t.equal(err.message, 'kaboom')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
stream.write('noop');
[err] = await once(stream, 'error')
t.equal(err.message, 'the worker has exited')
})
test('close the work if out of scope on gc', { skip: !global.WeakRef }, async function (t) {
const dest = file()
const child = fork(join(__dirname, 'close-on-gc.js'), [dest], {
execArgv: ['--expose-gc']
})
const [code] = await once(child, 'exit')
t.equal(code, 0)
const data = await readFile(dest, 'utf8')
t.equal(data, 'hello world\n')
})

23
node_modules/thread-stream/test/to-file-on-destroy.js generated vendored Normal file
View File

@@ -0,0 +1,23 @@
'use strict'
const fs = require('fs')
const { Writable } = require('stream')
function run (opts) {
let data = ''
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
data += chunk.toString()
cb()
},
destroy (err, cb) {
// process._rawDebug('destroy called')
fs.writeFile(opts.dest, data, function (err2) {
cb(err2 || err)
})
}
})
}
module.exports = run

24
node_modules/thread-stream/test/to-file-on-final.js generated vendored Normal file
View File

@@ -0,0 +1,24 @@
'use strict'
const fs = require('fs')
const { Writable } = require('stream')
function run (opts) {
let data = ''
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
data += chunk.toString()
cb()
},
final (cb) {
setTimeout(function () {
fs.writeFile(opts.dest, data, function (err) {
cb(err)
})
}, 100)
}
})
}
module.exports = run

12
node_modules/thread-stream/test/to-file.js generated vendored Normal file
View File

@@ -0,0 +1,12 @@
'use strict'
const fs = require('fs')
const { once } = require('events')
async function run (opts) {
const stream = fs.createWriteStream(opts.dest)
await once(stream, 'open')
return stream
}
module.exports = run

8
node_modules/thread-stream/test/to-file.mjs generated vendored Normal file
View File

@@ -0,0 +1,8 @@
import { createWriteStream } from 'fs'
import { once } from 'events'
export default async function run (opts) {
const stream = createWriteStream(opts.dest)
await once(stream, 'open')
return stream
}

9
node_modules/thread-stream/test/to-next.js generated vendored Normal file
View File

@@ -0,0 +1,9 @@
'use strict'
const { PassThrough } = require('stream')
async function run (opts) {
return new PassThrough({})
}
module.exports = run

30
node_modules/thread-stream/test/transpiled.test.js generated vendored Normal file
View File

@@ -0,0 +1,30 @@
'use strict'
const { test } = require('tap')
const { join } = require('path')
const { file } = require('./helper')
const ThreadStream = require('..')
function basic (esVersion) {
test(`transpiled-ts-to-${esVersion}`, function (t) {
t.plan(2)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'ts', `to-file.${esVersion}.cjs`),
workerData: { dest },
sync: true
})
// There are arbitrary checks, the important aspect of this test is to ensure
// that we can properly load the transpiled file into our worker thread.
t.same(stream.writableEnded, false)
stream.end()
t.same(stream.writableEnded, true)
})
}
basic('es5')
basic('es6')
basic('es2017')
basic('esnext')

Binary file not shown.

33
node_modules/thread-stream/test/ts.test.ts generated vendored Normal file
View File

@@ -0,0 +1,33 @@
import { test } from 'tap'
import { readFile } from 'fs'
import ThreadStream from '../index.js'
import { join } from 'path'
import { file } from './helper.js'
test('typescript module', function (t) {
t.plan(5)
const dest = file()
const stream = new ThreadStream({
filename: join(__dirname, 'ts', 'to-file.ts'),
workerData: { dest },
sync: true
})
stream.on('finish', () => {
readFile(dest, 'utf8', (err, data) => {
t.error(err)
t.equal(data, 'hello world\nsomething else\n')
})
})
stream.on('close', () => {
t.pass('close emitted')
})
t.ok(stream.write('hello world\n'))
t.ok(stream.write('something else\n'))
stream.end()
})

10
node_modules/thread-stream/test/ts/to-file.ts generated vendored Normal file
View File

@@ -0,0 +1,10 @@
import { type PathLike, type WriteStream, createWriteStream } from 'fs'
import { once } from 'events'
export default async function run (
opts: { dest: PathLike },
): Promise<WriteStream> {
const stream = createWriteStream(opts.dest)
await once(stream, 'open')
return stream
}

19
node_modules/thread-stream/test/ts/transpile.sh generated vendored Executable file
View File

@@ -0,0 +1,19 @@
#!/bin/sh
set -e
cd ./test/ts;
if (echo "${npm_config_user_agent}" | grep "yarn"); then
export RUNNER="yarn";
else
export RUNNER="npx";
fi
test ./to-file.ts -ot ./to-file.es5.cjs || ("${RUNNER}" tsc --skipLibCheck --target es5 ./to-file.ts && mv ./to-file.js ./to-file.es5.cjs);
test ./to-file.ts -ot ./to-file.es6.mjs || ("${RUNNER}" tsc --skipLibCheck --target es6 ./to-file.ts && mv ./to-file.js ./to-file.es6.mjs);
test ./to-file.ts -ot ./to-file.es6.cjs || ("${RUNNER}" tsc --skipLibCheck --target es6 --module commonjs ./to-file.ts && mv ./to-file.js ./to-file.es6.cjs);
test ./to-file.ts -ot ./to-file.es2017.mjs || ("${RUNNER}" tsc --skipLibCheck --target es2017 ./to-file.ts && mv ./to-file.js ./to-file.es2017.mjs);
test ./to-file.ts -ot ./to-file.es2017.cjs || ("${RUNNER}" tsc --skipLibCheck --target es2017 --module commonjs ./to-file.ts && mv ./to-file.js ./to-file.es2017.cjs);
test ./to-file.ts -ot ./to-file.esnext.mjs || ("${RUNNER}" tsc --skipLibCheck --target esnext --module esnext ./to-file.ts && mv ./to-file.js ./to-file.esnext.mjs);
test ./to-file.ts -ot ./to-file.esnext.cjs || ("${RUNNER}" tsc --skipLibCheck --target esnext --module commonjs ./to-file.ts && mv ./to-file.js ./to-file.esnext.cjs);

21
node_modules/thread-stream/test/uncaughtException.js generated vendored Normal file
View File

@@ -0,0 +1,21 @@
'use strict'
const { Writable } = require('stream')
// Nop console.error to avoid printing things out
console.error = () => {}
setImmediate(function () {
throw new Error('kaboom')
})
async function run (opts) {
const stream = new Writable({
write (chunk, enc, cb) {
cb()
}
})
return stream
}
module.exports = run

21
node_modules/thread-stream/test/unhandledRejection.js generated vendored Normal file
View File

@@ -0,0 +1,21 @@
'use strict'
const { Writable } = require('stream')
// Nop console.error to avoid printing things out
console.error = () => {}
setImmediate(function () {
Promise.reject(new Error('kaboom'))
})
async function run (opts) {
const stream = new Writable({
write (chunk, enc, cb) {
cb()
}
})
return stream
}
module.exports = run

7
node_modules/thread-stream/test/yarnrc.yml generated vendored Normal file
View File

@@ -0,0 +1,7 @@
nodeLinker: pnp
pnpMode: loose
pnpEnableEsmLoader: false
packageExtensions:
debug@*:
dependencies:
supports-color: '*'

8
node_modules/thread-stream/tsconfig.json generated vendored Normal file
View File

@@ -0,0 +1,8 @@
{
"compilerOptions": {
"esModuleInterop": true
},
"files": [
"index.d.ts"
],
}