博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
node那点事(一) -- Readable streams(可读流)
阅读量:5826 次
发布时间:2019-06-18

本文共 13989 字,大约阅读时间需要 46 分钟。

流的简介

流(stream)在 Node.js 中是处理流数据的抽象接口(abstract interface)。 stream 模块提供了基础的 API 。使用这些 API 可以很容易地来构建实现流接口的对象。

Node.js 提供了多种流对象。 例如, HTTP 请求 和 process.stdout 就都是流的实例。

流可以是可读的、可写的,或是可读写的。所有的流都是 EventEmitter 的实例。

为什么要用流

这里我们举一个简单的例子:

我们打算读取一个文件,使用 fs.readFileSync 同步读取一个文件,程序会被阻塞,所有的数据都会被读取到内存中。

换用 fs.readFile 读取文件,程序不会被阻塞,但是所有的数据依旧会被一次性全部被读取到内存中。

当处理大文件压缩、归档、媒体文件和巨大的日志文件的时候,内存使用就成了问题,现在大家一般家用机内存大多数都是8G、16G,软件包还在日益增大,在这种情况下,流的优势就体现出来了。

流被设计为异步的方式,在内存中只开启一个固定的空间,将文件化整为零,以流动的方式进行传输操作,解决了以上问题。

流的类型

Node.js 中有四种基本的流类型:

Readable - 可读的流 (例如 fs.createReadStream()).

Writable - 可写的流 (例如 fs.createWriteStream()).

Duplex - 可读写的流 (例如 net.Socket).

Transform - 在读写过程中可以修改和变换数据的 Duplex 流 (例如 zlib.createDeflate()).

可读流(Readable Stream)

可读流有两种模式:

1、流动模式(flowing):可读流自动读取数据,通过EventEmitter接口的事件尽快将数据提供给应用。

2、暂停模式(paused):必须显式调用stream.read()方法来从流中读取数据片段。

可以通过三种途径切换到流动模式:

  • 监听 'data' 事件
  • 调用 stream.resume() 方法
  • 调用 stream.pipe() 方法将数据发送到 Writable

流动模式切换到暂停模式的api有:

  • 如果不存在管道目标,调用stream.pause()方法
  • 如果存在管道目标,调用 stream.unpipe()并取消'data'事件监听

可读流事件:'data','readable','error','close','end'

我们可以想象下家用热水器的模型,热水器的水箱(buffer缓存区)里面存着热水(数据),在我们用热水的时候,开启水龙头,自来水会不断的进入水箱,再从水箱由水龙头流出来供我们使用。这就是进入了“flowing”模式。当我们关闭水龙头时候,水箱则会暂停进水,水龙头也会暂停出水,这是就进入了“paused”模式。

flowing模式

const fs = require('fs')const path = require('path')const rs = fs.createReadStream(path.join(__dirname, './1.txt'))rs.setEncoding('utf8')rs.on('data', (data) => {    console.log(data)})

paused模式

const fs = require('fs')const path = require('path')const rs = fs.createReadStream(path.join(__dirname, './1.txt'))rs.setEncoding('utf8')rs.on('readable', () => {    let d = rs.read(1)    console.log(d)})

实现原理

流动模式原理

我们来实现一个简单的流动模式下的可读流介绍其原理,由NODEJS官方文档可知,流继承自EventEmitter模块,然后我们定义一些默认参数、缓存区、模式:

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path,options) {        super();        this.path = path;        this.flags = options.flags || 'r';        this.autoClose = options.autoClose || true;        this.highWaterMark = options.highWaterMark|| 64*1024;        this.start = options.start||0;        this.end = options.end;        this.encoding = options.encoding || null                this.buffer = Buffer.alloc(this.highWaterMark);//定义缓存区大小                this.pos = this.start; // pos 读取的位置 可变 start不变的                this.flowing = null; // null就是暂停模式    }}module.exports = ReadStream;

接着在我们需要定义一个打开文件的方法用于打开文件。还有一个一个destroy方法,用于在文件操作出错或者读完之后关闭文件。

open(){    fs.open(this.path,this.flags,(err,fd)=>{        if(err){            this.emit('error',err);            if(this.autoClose){ // 是否自动关闭                this.destroy();            }            return;        }        this.fd = fd; // 保存文件描述符        this.emit('open'); // 文件打开了    });} destroy(){    // 先判断有没有fd 有关闭文件 触发close事件    if(typeof this.fd ==='number'){        fs.close(this.fd,()=>{            this.emit('close');        });        return;    }    this.emit('close'); // 销毁}

接着要在构造函数中调用open方法,当用户绑定data监听时,修改可读流的模式:

constructor(path,options){    super();    this.path = path;    this.flags = options.flags || 'r';    this.autoClose = options.autoClose || true;    this.highWaterMark = options.highWaterMark|| 64*1024;    this.start = options.start||0;    this.end = options.end;    this.encoding = options.encoding || null    this.flowing = null;     this.buffer = Buffer.alloc(this.highWaterMark);    this.pos = this.start;        this.open();//打开文件 fd    this.on('newListener',(eventName,callback)=>{        if(eventName === 'data'){            // 相当于用户监听了data事件            this.flowing  = true;            // 监听了 就去读            this.read(); // 去读内容了        }    })}

接下来我们实现最总要的read方法,首先要保证文件已经打开,接着镀组文件进入缓存,触发data事件传入数据,如果处于流动模式,继续读取直到读完文件。

read(){    // 此时文件还没打开呢    if(typeof this.fd !== 'number'){        // 当文件真正打开的时候 会触发open事件,触发事件后再执行read,此时fd肯定有了        return this.once('open',()=>this.read())    }    // 此时有fd了    // 应该填highWaterMark?    // 想读4个 写的是3  每次读3个    // 123 4    let howMuchToRead = this.end?Math.min(this.highWaterMark,this.end-this.pos+1):this.highWaterMark;    fs.read(this.fd,this.buffer,0,howMuchToRead,this.pos,(err,bytesRead)=>{        // 读到了多少个 累加        if(bytesRead>0){            this.pos+= bytesRead;            let data = this.encoding?this.buffer.slice(0,bytesRead).toString(this.encoding):this.buffer.slice(0,bytesRead);            this.emit('data',data);            // 当读取的位置 大于了末尾 就是读取完毕了            if(this.pos > this.end){                this.emit('end');                this.destroy();            }            if(this.flowing) { // 流动模式继续触发                this.read();             }        }else{            this.emit('end');            this.destroy();        }    });}

剩下的pause和resume方法,很简单

resume() {    this.flowing = true;    this.read();}pause() {    this.flowing = false;}

简单的流实现完成了,看一下完整代码

let EventEmitter = require('events');let fs = require('fs');class ReadStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.flags = options.flags || 'r';        this.autoClose = options.autoClose || true;        this.highWaterMark = options.highWaterMark|| 64*1024;        this.start = options.start||0;        this.end = options.end;        this.encoding = options.encoding || null        this.open();        this.flowing = null; // null就是暂停模式        this.buffer = Buffer.alloc(this.highWaterMark);        this.pos = this.start;         this.on('newListener', (eventName,callback) => {            if (eventName === 'data') {                this.flowing  = true;                this.read();             }        })    }        read(){        if (typeof this.fd !== 'number') {            return this.once('open', () => this.read())        }        let howMuchToRead = this.end ? Math.min(this.highWaterMark, this.end - this.pos+1) : this.highWaterMark;        fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (err,bytesRead) => {            if (bytesRead > 0) {                this.pos += bytesRead;                let data = this.encoding ? this.buffer.slice(0, bytesRead).toString(this.encoding) : this.buffer.slice(0, bytesRead);                this.emit('data', data);                if(this.pos > this.end){                    this.emit('end');                    this.destroy();                }                if(this.flowing) {                     this.read();                 }            }else{                this.emit('end');                this.destroy();            }        });    }        resume() {        this.flowing = true;        this.read();    }        pause() {        this.flowing = false;    }        destroy() {        if(typeof this.fd === 'number') {            fs.close(this.fd, () => {                this.emit('close');            });            return;        }        this.emit('close');     };        open() {        fs.open(this.path, this.flags, (err,fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                     this.destroy();                }                return;            }            this.fd = fd;             this.emit('open');         });    }}module.exports = ReadStream;

暂停模式原理

以上是流动模式的可读流实现原理,暂停模式的可读流原理与流动模式的主要区别在于监听readable事件的绑定与read方法,先实现监听绑定readable事件回调函数时,调用read方法读取数据到缓存区,定义一个读取方法_read

constructor(path, options) {    super();    this.path = path;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.autoClose = options.autoClose || true;    this.start = 0;    this.end = options.end;    this.flags = options.flags || 'r';    this.buffers = []; // 缓存区     this.pos = this.start;    this.length = 0; // 缓存区大小    this.emittedReadable = false;    this.reading = false; // 不是正在读取的    this.open();    this.on('newListener', (eventName) => {        if (eventName === 'readable') {            this.read();        }    })}read(n) {    if (this.length == 0) {        this.emittedReadable = true;    }    if (this.length < this.highWaterMark) {        if(!this.reading) {            this.reading = true;            this._read();         }    }}_read() {    if (typeof this.fd !== 'number') {        return this.once('open', () => this._read());    }    let buffer = Buffer.alloc(this.highWaterMark);    fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {        if (bytesRead > 0) {            this.buffers.push(buffer.slice(0, bytesRead));            this.pos += bytesRead;            this.length += bytesRead;            this.reading = false;            if (this.emittedReadable) {                this.emittedReadable = false;                 this.emit('readable');            }        } else {            this.emit('end');            this.destroy();        }    })}

由api可知,暂停模式下的可读流手动调用read方法参数可以大于highWaterMark,为了处理这种情况,我们先写一个函数computeNewHighWaterMark,取到大于等于n的最小2的n次方的整数

function computeNewHighWaterMark(n) {      n--;      n |= n >>> 1;      n |= n >>> 2;      n |= n >>> 4;      n |= n >>> 8;      n |= n >>> 16;      n++;     return n;  }

然后写read方法,要考虑全n的各种情况,上代码

read(n) {     if(n>this.length){        // 更改缓存区大小  读取五个就找 2的几次放最近的        this.highWaterMark = computeNewHighWaterMark(n)        this.emittedReadable = true;        this._read();    }    // 如果n>0 去缓存区中取吧    let buffer=null;    let index = 0; // 维护buffer的索引的    let flag = true;    if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多        // 在缓存区中取 [[2,3],[4,5,6]]        buffer = Buffer.alloc(n); // 这是要返回的buffer        let buf;        while (flag&&(buf = this.buffers.shift())) {            for (let i = 0; i < buf.length; i++) {                buffer[index++] = buf[i];                if(index === n){ // 拷贝够了 不需要拷贝了                    flag = false;                    this.length -= n;                    let bufferArr = buf.slice(i+1); // 取出留下的部分                    // 如果有剩下的内容 在放入到缓存中                    if(bufferArr.length > 0){                        this.buffers.unshift(bufferArr);                    }                    break;                }            }        }    }    // 当前缓存区 小于highWaterMark时在去读取    if (this.length == 0) {        this.emittedReadable = true;    }    if (this.length < this.highWaterMark) {        if(!this.reading){            this.reading = true;            this._read(); // 异步的        }    }    return buffer}

附上可读流暂停模式的完整实现原理代码

let fs = require('fs');let EventEmitter = require('events');function computeNewHighWaterMark(n) {      n--;      n |= n >>> 1;      n |= n >>> 2;      n |= n >>> 4;      n |= n >>> 8;      n |= n >>> 16;      n++;     return n;  }class ReadStream extends EventEmitter {    constructor(path, options) {        super();        this.path = path;        this.highWaterMark = options.highWaterMark || 64 * 1024;        this.autoClose = options.autoClose || true;        this.start = 0;        this.end = options.end;        this.flags = options.flags || 'r';        this.buffers = []; // 缓存区         this.pos = this.start;        this.length = 0; // 缓存区大小        this.emittedReadable = false;        this.reading = false; // 不是正在读取的        this.open();        this.on('newListener', (eventName) => {            if (eventName === 'readable') {                this.read();            }        })    }    read(n) {         if(n>this.length){            // 更改缓存区大小  读取五个就找 2的几次放最近的            this.highWaterMark = computeNewHighWaterMark(n)            this.emittedReadable = true;            this._read();        }        // 如果n>0 去缓存区中取吧        let buffer=null;        let index = 0; // 维护buffer的索引的        let flag = true;        if (n > 0 && n <= this.length) { // 读的内容 缓存区中有这么多            // 在缓存区中取 [[2,3],[4,5,6]]            buffer = Buffer.alloc(n); // 这是要返回的buffer            let buf;            while (flag&&(buf = this.buffers.shift())) {                for (let i = 0; i < buf.length; i++) {                    buffer[index++] = buf[i];                    if(index === n){ // 拷贝够了 不需要拷贝了                        flag = false;                        this.length -= n;                        let bufferArr = buf.slice(i+1); // 取出留下的部分                        // 如果有剩下的内容 在放入到缓存中                        if(bufferArr.length > 0){                            this.buffers.unshift(bufferArr);                        }                        break;                    }                }            }        }        // 当前缓存区 小于highWaterMark时在去读取        if (this.length == 0) {            this.emittedReadable = true;        }        if (this.length < this.highWaterMark) {            if(!this.reading){                this.reading = true;                this._read(); // 异步的            }        }        return buffer    }    // 封装的读取的方法    _read() {        // 当文件打开后在去读取        if (typeof this.fd !== 'number') {            return this.once('open', () => this._read());        }        // 上来我要喝水 先倒三升水 []        let buffer = Buffer.alloc(this.highWaterMark);        fs.read(this.fd, buffer, 0, buffer.length, this.pos, (err, bytesRead) => {            if (bytesRead > 0) {                // 默认读取的内容放到缓存区中                this.buffers.push(buffer.slice(0, bytesRead));                this.pos += bytesRead; // 维护读取的索引                this.length += bytesRead;// 维护缓存区的大小                this.reading = false;                // 是否需要触发readable事件                if (this.emittedReadable) {                    this.emittedReadable = false; // 下次默认不触发                    this.emit('readable');                }            } else {                this.emit('end');                this.destroy();            }        })    }    destroy() {        if (typeof this.fd !== 'number') {            return this.emit('close')        }        fs.close(this.fd, () => {            this.emit('close')        })    }    open() {        fs.open(this.path, this.flags, (err, fd) => {            if (err) {                this.emit('error', err);                if (this.autoClose) {                    this.destroy();                }                return            }            this.fd = fd;            this.emit('open');        });    }}module.exports = ReadStream;

转载地址:http://xosdx.baihongyu.com/

你可能感兴趣的文章
fadingEdge
查看>>
hello world
查看>>
php 文件操作
查看>>
Python开发(基础):列表List
查看>>
支付宝SDK官方下载-移动端、服务端各个平台通用
查看>>
Scala笔记整理(七):模式匹配和样例类
查看>>
DIV CSS遮罩层
查看>>
e1000 detected Tx Unit Hang
查看>>
Zabbix监控(十二):自动监控Linux端口
查看>>
Cubieboard自带的红外接收(IR)测试成功
查看>>
redis读取字符串类型的整数时会直接将其当做整数来处理
查看>>
JS_TAB鼠标点击切换特效
查看>>
MenuintroduceActivity
查看>>
Oracle Database 11g Release 2 (11.2) Installation On Oracle Linux 5
查看>>
rsync 实现同步
查看>>
网站提速-伪静态(3)
查看>>
在Excel表格中找出重复数据
查看>>
磁盘及文件系统管理(二)
查看>>
No compiler is provided in this environment. Perhaps you are running on a JRE rather than a JDK
查看>>
代理服务器与网络地址转换作用差异的解析
查看>>