Вообще для работы со streams в последних редакциях ecmascript и в последних движках javascript(V8, Chakra) даже добавили async iterators and generators. Код для чтениях строк с использованием этих конструкций не приведу.
Но вообще, написать linereader, который будет промисы возвращать можно, если конечно нужно:
import * as fs from 'fs';
import * as util from 'util';
import * as os from 'os';
const promisifedOpen = util.promisify(fs.open);
const promisifedClose = util.promisify(fs.close);
const promisifedRead = util.promisify(fs.read);
class LineReader {
MAGIC_LENGTH = 10e3;
fd: number;
position: number = 0;
buf: Buffer = new Buffer(this.MAGIC_LENGTH);
lines: Buffer[] = [];
aggBuff: Buffer = new Buffer(0);
finished: boolean = false;
protected _finished: boolean = false;
async init(path): Promise<void> {
this.fd = await promisifedOpen(path, 'r');
}
protected readFile = async (): Promise<void> => {
const data = await promisifedRead(this.fd, this.buf, 0, this.MAGIC_LENGTH - 1, this.position);
this.position += this.MAGIC_LENGTH - 1;
let prevEOL = 0;
for (let i = 0; i < data.bytesRead; i++) {
if (data.buffer[i] === 10) {
const lineBuff = data.buffer.slice(prevEOL, i);
prevEOL = i + 1;
if (this.aggBuff.length > 0) {
this.lines.push(Buffer.concat([this.aggBuff, lineBuff], this.aggBuff.length + lineBuff.length));
this.aggBuff = new Buffer(0);
}
else
this.lines.push(lineBuff);
}
}
const newBuff = data.buffer.slice(prevEOL, data.bytesRead);
this.aggBuff = Buffer.concat([this.aggBuff, newBuff], this.aggBuff.length + newBuff.length);
if (data.bytesRead < this.MAGIC_LENGTH - 1) {
const newBuff = data.buffer.slice(prevEOL, data.bytesRead);
this.lines.push(Buffer.concat([this.aggBuff, newBuff], this.aggBuff.length + newBuff.length));
this._finished = true;
return;
}
if (prevEOL === 0 && !this._finished)
return this.readFile();
}
async readLine(): Promise<Buffer> {
if (this.finished)
return new Buffer(0);
if (this.lines.length) {
const data = this.lines.shift();
if (this._finished && this.lines.length === 0) this.finished = true;
return data;
}
if (!this._finished)
await this.readFile();
return this.readLine();
}
async close(): Promise<void> {
await promisifedClose(this.fd);
}
}
Извиняюсь конечно, за ts, но уже отвык на js писать. Использовать бы я этот код в продакшене конечно не стал, в виду того, что он написан на коленке и работает много медленнее кода, который использует createReadStream. Как минимум в 2-3 раза на больших файлах.
Для сравнения код, с использованием модуля linebyline
var readline = require('linebyline'),
rl = readline('access.log');
rl.on('line', function (line, lineCount, byteCount) {
// do something with the line of text
// console.log(line);
})
.on('error', function (e) {
// something went wrong
})
.on('close', () => {
console.timeEnd('goodCode');
})
и функциями
const test = async () => {
console.time('badCode');
const lineReader = new LineReader();
await lineReader.init('access.log');
while (!lineReader.finished) {
const line = await lineReader.readLine();
// console.log(line.toString());
}
await lineReader.close();
console.timeEnd('badCode');
}
test();
function test2() {
console.time('goodCode');
var readline = require('linebyline'),
rl = readline('access.log');
rl
.on('line', function (line, lineCount, byteCount) { /* console.log(line); */ })
.on('error', function (e) { /* handleError() */ })
.on('close', () => {
console.timeEnd('goodCode');
})
}
test2();
function test3() {
console.time('superbCode');
let lineReader = require('readline')
.createInterface({
input: require('fs').createReadStream('access.log')
});
let r = lineReader
.on('line', function (line) { /* console.log(line); */ })
.on('error', function (e) { /* handleError() */ })
.on('close', () => {
console.timeEnd('superbCode');
})
}
test3();
на файле весом 700mb показывает
superbCode: 1536.653ms
goodCode: 4545.884ms
badCode: 5023.794ms
Так что тут скорее в академических целях пример LineReader возвращающего Promise приведен.
На самом же деле, чтобы избежать лапшы в коде с linebyline надо обрабатывать данные не внутри Анонимной callback функции, а определять функцию обработки в другом месте, это также может быть метод, какого-то объекта.
const readline = require('linebyline');
const rl = readline(pathToFile);
function lineProcessing(line, lineCount, byteCount) {
// dosmth with line e.g. parse && pass to another func
const data = JSON.parse(line);
myModel.processNewData(data);
}
function handleError(err) {
//
}
rl
.on('line', lineProcessing)
.on('error', handleError)
.on('close', () => {
console.log('done');
});
Тут почитал на тему async generators, в node js 10, как оказалось createReadStream возвращает AsyncIterableIterator<Buffer>. Правда node js уведомляет, что это эксперементальная фича:
async function* bufferToLines(bufferAsync) {
let agg = '';
for await (const buffer of bufferAsync) {
agg += buffer;
let prevEOL;
while ((prevEOL = agg.indexOf('\n')) >= 0) {
const line = agg.slice(0, prevEOL);
yield line;
agg = agg.slice(prevEOL + 1);
}
}
if (agg.length > 0) {
yield agg;
}
}
async function test() {
console.time('asyncGeneratorCode');
const inputFilePath = 'access.log';
const stream = fs.createReadStream('access.log', { encoding: 'utf8' });
for await (const line of bufferToLines(stream)) {
// console.log(line);
}
console.timeEnd('asyncGeneratorCode');
}
test();
asyncGeneratorCode: 3273.566ms