Показать сообщение отдельно
  #4 (permalink)  
Старый 02.06.2018, 18:38
Аватар для EmperioAf
Профессор
Отправить личное сообщение для EmperioAf Посмотреть профиль Найти все сообщения от EmperioAf
 
Регистрация: 15.01.2015
Сообщений: 622

Вообще для работы со streams в последних редакциях ecmascript и в последних движках javascript(V8, Chakra) даже добавили async iterators and generators. Код для чтениях строк с использованием этих конструкций не приведу.
Но вообще, написать linereader, который будет промисы возвращать можно, если конечно нужно:

import * as fs from 'fs';
import * as util from 'util';
import * as os from 'os';

const promisifedOpen = util.promisify(fs.open);
const promisifedClose = util.promisify(fs.close);
const promisifedRead = util.promisify(fs.read);

class LineReader {
    MAGIC_LENGTH = 10e3;
    fd: number;
    position: number = 0;
    buf: Buffer = new Buffer(this.MAGIC_LENGTH);
    lines: Buffer[] = [];
    aggBuff: Buffer = new Buffer(0);
    finished: boolean = false;
    protected _finished: boolean = false;
    async init(path): Promise<void> {
        this.fd = await promisifedOpen(path, 'r');
    }
    protected readFile = async (): Promise<void> => {
        const data = await promisifedRead(this.fd, this.buf, 0, this.MAGIC_LENGTH - 1, this.position);
        this.position += this.MAGIC_LENGTH - 1;
        let prevEOL = 0;
        for (let i = 0; i < data.bytesRead; i++) {
            if (data.buffer[i] === 10) {
                const lineBuff = data.buffer.slice(prevEOL, i);
                prevEOL = i + 1;
                if (this.aggBuff.length > 0) {
                    this.lines.push(Buffer.concat([this.aggBuff, lineBuff], this.aggBuff.length + lineBuff.length));
                    this.aggBuff = new Buffer(0);
                }
                else
                    this.lines.push(lineBuff);
            }
        }
        const newBuff = data.buffer.slice(prevEOL, data.bytesRead);
        this.aggBuff = Buffer.concat([this.aggBuff, newBuff], this.aggBuff.length + newBuff.length);
        if (data.bytesRead < this.MAGIC_LENGTH - 1) {
            const newBuff = data.buffer.slice(prevEOL, data.bytesRead);
            this.lines.push(Buffer.concat([this.aggBuff, newBuff], this.aggBuff.length + newBuff.length));
            this._finished = true;
            return;
        }
        if (prevEOL === 0 && !this._finished)
            return this.readFile();
    }
    async readLine(): Promise<Buffer> {
        if (this.finished)
            return new Buffer(0);
        if (this.lines.length) {
            const data = this.lines.shift();
            if (this._finished && this.lines.length === 0) this.finished = true;
            return data;
        }
        if (!this._finished)
            await this.readFile();
        return this.readLine();
    }
    async close(): Promise<void> {
        await promisifedClose(this.fd);
    }
}

Извиняюсь конечно, за ts, но уже отвык на js писать. Использовать бы я этот код в продакшене конечно не стал, в виду того, что он написан на коленке и работает много медленнее кода, который использует createReadStream. Как минимум в 2-3 раза на больших файлах.
Для сравнения код, с использованием модуля linebyline
var readline = require('linebyline'),
    rl = readline('access.log');
  rl.on('line', function (line, lineCount, byteCount) {
    // do something with the line of text
    // console.log(line);
  })
    .on('error', function (e) {
      // something went wrong
    })
    .on('close', () => {
      console.timeEnd('goodCode');
    })

и функциями
const test = async () => {
    console.time('badCode');
    const lineReader = new LineReader();
    await lineReader.init('access.log');
    while (!lineReader.finished) {
        const line = await lineReader.readLine();
        // console.log(line.toString());
    }
    await lineReader.close();
    console.timeEnd('badCode');
}
test();
function test2() {
    console.time('goodCode');
    var readline = require('linebyline'),
        rl = readline('access.log');
    rl
        .on('line', function (line, lineCount, byteCount) { /* console.log(line); */ })
        .on('error', function (e) { /* handleError() */ })
        .on('close', () => {
            console.timeEnd('goodCode');
        })
}
test2();
function test3() {
    console.time('superbCode');
    let lineReader = require('readline')
        .createInterface({
            input: require('fs').createReadStream('access.log')
        });
    let r = lineReader
        .on('line', function (line) { /* console.log(line); */ })
        .on('error', function (e) { /* handleError() */ })
        .on('close', () => {
            console.timeEnd('superbCode');
        })
}
test3();

на файле весом 700mb показывает
superbCode: 1536.653ms
goodCode: 4545.884ms
badCode: 5023.794ms

Так что тут скорее в академических целях пример LineReader возвращающего Promise приведен.

На самом же деле, чтобы избежать лапшы в коде с linebyline надо обрабатывать данные не внутри Анонимной callback функции, а определять функцию обработки в другом месте, это также может быть метод, какого-то объекта.
const readline = require('linebyline');
const rl = readline(pathToFile);
function lineProcessing(line, lineCount, byteCount) {
 // dosmth with line e.g. parse && pass to another func
 const data = JSON.parse(line);
 myModel.processNewData(data);
}
function handleError(err) {
  // 
}
rl
  .on('line', lineProcessing)
  .on('error', handleError)
  .on('close', () => {
    console.log('done');
  });


Тут почитал на тему async generators, в node js 10, как оказалось createReadStream возвращает AsyncIterableIterator<Buffer>. Правда node js уведомляет, что это эксперементальная фича:
async function* bufferToLines(bufferAsync) {
    let agg = '';
    for await (const buffer of bufferAsync) {
        agg += buffer;
        let prevEOL;
        while ((prevEOL = agg.indexOf('\n')) >= 0) {
            const line = agg.slice(0, prevEOL);
            yield line;
            agg = agg.slice(prevEOL + 1);
        }
    }
    if (agg.length > 0) {
        yield agg;
    }
}
async function test() {
    console.time('asyncGeneratorCode');
    const inputFilePath = 'access.log';
    const stream = fs.createReadStream('access.log', { encoding: 'utf8' });
    for await (const line of bufferToLines(stream)) {
        // console.log(line);
    }
    console.timeEnd('asyncGeneratorCode');
}
test();

asyncGeneratorCode: 3273.566ms

Последний раз редактировалось EmperioAf, 03.06.2018 в 02:33.
Ответить с цитированием