"use strict"; var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) { return new (P || (P = Promise))(function (resolve, reject) { function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } } function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } } function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); } step((generator = generator.apply(thisArg, _arguments || [])).next()); }); }; Object.defineProperty(exports, "__esModule", { value: true }); const assert = require("assert"); class Deferred { constructor() { this.promise = new Promise((resolve, reject) => { this.reject = reject; this.resolve = resolve; }); } } /** * Error message */ exports.endOfStream = 'End-Of-Stream'; class StreamReader { constructor(s) { this.s = s; this.endOfStream = false; /** * Store peeked data * @type {Array} */ this.peekQueue = []; if (!s.read || !s.once) { throw new Error('Expected an instance of stream.Readable'); } this.s.once('end', () => this.reject(new Error(exports.endOfStream))); this.s.once('error', err => this.reject(err)); this.s.once('close', () => this.reject(new Error('Stream closed'))); } /** * Read ahead (peek) from stream. Subsequent read or peeks will return the same data * @param buffer Buffer to store data read from stream in * @param offset Offset buffer * @param length Number of bytes to read * @param position Source offset * @returns {any} */ peek(buffer, offset, length) { return __awaiter(this, void 0, void 0, function* () { const bytesRead = yield this.read(buffer, offset, length); this.peekQueue.push(buffer.slice(offset, offset + bytesRead)); // Put read data back to peek buffer return bytesRead; }); } /** * Read chunk from stream * @param buffer Target buffer to store data read from stream in * @param offset Offset of target buffer * @param length Number of bytes to read * @returns {any} */ read(buffer, offset, length) { return __awaiter(this, void 0, void 0, function* () { if (length === 0) { return 0; } if (this.peekQueue.length === 0 && this.endOfStream) { throw new Error(exports.endOfStream); } let remaining = length; let bytesRead = 0; // consume peeked data first while (this.peekQueue.length > 0 && remaining > 0) { const peekData = this.peekQueue.pop(); // Front of queue const lenCopy = Math.min(peekData.length, remaining); peekData.copy(buffer, offset + bytesRead, 0, lenCopy); bytesRead += lenCopy; remaining -= lenCopy; if (lenCopy < peekData.length) { // remainder back to queue this.peekQueue.push(peekData.slice(lenCopy)); } } // continue reading from stream if required if (remaining > 0 && !this.endOfStream) { bytesRead += yield this._read(buffer, offset + bytesRead, remaining); } return bytesRead; }); } /** * Read chunk from stream * @param buffer Buffer to store data read from stream in * @param offset Offset buffer * @param length Number of bytes to read * @returns {any} */ _read(buffer, offset, length) { return __awaiter(this, void 0, void 0, function* () { assert.ok(!this.request, 'Concurrent read operation?'); const readBuffer = this.s.read(length); if (readBuffer) { readBuffer.copy(buffer, offset); return readBuffer.length; } else { this.request = { buffer, offset, length, deferred: new Deferred() }; this.s.once('readable', () => { this.tryRead(); }); return this.request.deferred.promise.then(n => { this.request = null; return n; }).catch(err => { this.request = null; throw err; }); } }); } tryRead() { const readBuffer = this.s.read(this.request.length); if (readBuffer) { readBuffer.copy(this.request.buffer, this.request.offset); this.request.deferred.resolve(readBuffer.length); } else { this.s.once('readable', () => { this.tryRead(); }); } } reject(err) { this.endOfStream = true; if (this.request) { this.request.deferred.reject(err); this.request = null; } } } exports.StreamReader = StreamReader;