博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
node中的流(stream)
阅读量:6389 次
发布时间:2019-06-23

本文共 10302 字,大约阅读时间需要 34 分钟。

什么是流

流在node中是非常重要的,gulp的task任务,文件压缩,和http中的请求和响应等功能的实现都是基于流来实现的。为什么会有流的出现呢,因为我们一开始有文件操作之后,包括写文件和读文件都会有一个问题,就是会把内容不停的读到内存中,都读取完之后再往外写,这样就会导致内存被大量占用。为了解决这个问题流就诞生了,通过流,我们可以读一点内容就往文件中写一点内容,并且可以控制读取的速度。

流的种类有很多,最常用的有:

  • ReadStream 可读流
  • WriteStream 可写流
  • 双工流
  • 转换流
  • 对象流(gulp)

流的特点:

  • 有序的有方向的
  • 流可以自己控制速率

什么是读和写呢?

  • 读是将内容读取到内存中
  • 写是将内存或者文件的内容写入到文件内

流都是基于原生的fs操作文件的方法来实现的,通过fs创建流。流是异步方法,都有回调函数,所有的 Stream 对象都是 EventEmitter 的实例。常用的事件有:

  • open - 打开文件
  • data - 当有数据可读时触发。
  • error - 在接收和写入过程中发生错误时触发。
  • close - 关闭文件
  • end - 没有更多的数据可读时触发。
  • drain - 当缓存区也执行完了触发

可读流

let fs = require('fs');let rs = fs.createReadStream('./2.txt', {  highWaterMark: 3,   flags:'r',  autoClose:true,  start:0,  end:3,  encoding:'utf8'});复制代码

主要参数说明:

  • highWaterMark 文件一次读多少字节,默认是64x1024
  • flags 类型,默认是r
  • autoClose 默认是true ,读取完毕后自动关闭
  • start 读取开始位置
  • end 读取结束位置,star和end都是包前包后的。
  • endencoding 默认读取的是buffer 一般读取可以使用默认参数。默认创建一个流是非流动模式,默认不会读取数据, 我们需要接收数据是基于事件的,我们要监听一个data事件,数据会自动的流出来,数据从非流动模式变为流动模式。 读取之前先把文件打开:
rs.on('open',function () {  console.log('文件打开了');});复制代码

内部会自动的触发这个事件rs.emit('data'), 不停的触发data方法,直到数据读完为止。

rs.on('data',function (data) {  console.log(data);  rs.pause(); // 暂停触发on('data')事件,将流动模式又转化成了非流动模式,可以用setTimeout(()=>{rs.resume()},5000)恢复});复制代码

文件读完后触发end方法:

rs.on('end',function () {  console.log('读取完毕了');});复制代码

最后关闭文件:

rs.on('close',function () {  console.log('关闭')});复制代码

监控错误:

rs.on('error',function (err) {  console.log(err)});复制代码

可读流实现原理解析

let fs = require('fs');let ReadStream = require('./ReadStream');let rs = new ReadStream('./2.txt', {  highWaterMark: 3,   flags:'r',  autoClose:true,   start:0,  end:3,  encoding:'utf8'});复制代码

ReadStream.js

let fs = require('fs');let EventEmitter = require('events');class ReadStream extends EventEmitter {  constructor(path, options = {}) {    super();    this.path = path;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.autoClose = options.autoClose || true;    this.start = options.start || 0;     this.pos = this.start; // pos会随着读取的位置改变    this.end = options.end || null; // null表示没传递    this.encoding = options.encoding || null;    this.flags = options.flags || 'r';    this.flowing = null; // 非流动模式    // 弄一个buffer读出来的数    this.buffer = Buffer.alloc(this.highWaterMark);    this.open();     // 次方法默认同步调用的    this.on('newListener', (type) => { // 等待着 它监听data事件      if (type === 'data') {        this.flowing = true;        this.read();// 开始读取 客户已经监听了data事件      }    })  }  pause(){    this.flowing = false;  }  resume(){    this.flowing =true;    this.read();  }  read(){ // 默认第一次调用read方法时还没有获取fd,所以不能直接读    if(typeof this.fd !== 'number'){       return this.once('open',() => this.read()); // 等待着触发open事件后fd肯定拿到了,拿到以后再去执行read方法    }    let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;    fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => { // byteRead真实的读到了几个      // 读取完毕      this.pos += byteRead; // 都出来两个位置就往后搓两位      // this.buffer默认就是三个      let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);      this.emit('data', b);      if ((byteRead === this.highWaterMark)&&this.flowing){        return this.read(); // 继续读      }      // 这里就是没有更多的逻辑了      if (byteRead < this.highWaterMark){        // 没有更多了        this.emit('end'); // 读取完毕        this.destroy(); // 销毁即可      }    });  }  // 打开文件用的  destroy() {    if (typeof this.fd != 'number') { return this.emit('close'); }    fs.close(this.fd, () => {      // 如果文件打开过了 那就关闭文件并且触发close事件      this.emit('close');    });  }  open() {    fs.open(this.path, this.flags, (err, fd) => { //fd标识的就是当前this.path这个文件,从3开始(number类型)      if (err) {        if (this.autoClose) { // 如果需要自动关闭我在去销毁fd          this.destroy(); // 销毁(关闭文件,触发关闭事件)        }        this.emit('error', err); // 如果有错误触发error事件        return;      }      this.fd = fd; // 保存文件描述符      this.emit('open', this.fd); // 触发文件的打开的方法    });  }}module.exports = ReadStream;复制代码

可写流

可写流有缓存区的概念,

  • 第一次写入是真的向文件里写,第二次再写入的时候是放到了缓存区里
  • 写入时会返回一个boolean类型,返回为false时表示不要再写入了,
  • 当内存和正在写入的内容消耗完后,会触发一个事件 drain,
let fs = require('fs');let ws = fs.createWriteStream('2.txt',{  flags: 'w',   highWaterMark: 3,   encoding: 'utf8',  start: 0,  autoClose: true,   mode: 0o666, });复制代码

参数说明:

  • flags: 默认是w (写)默认文件不存在会创建,a 追加
  • highWaterMark:设置当前缓存区的大小
  • encoding:文件里存放的都是二进制
  • start: 从哪开始写
  • autoClose: 默认为true,自动关闭(写完之后销毁)
  • mode: 写的模式,默认0o666,可读可写
let i = 9;function write() {  let flag = true; // 表示是否能写入  while (flag&&i>=0) { // 9 - 0     flag = ws.write(i--+'');  }}复制代码

drain只有嘴塞满了吃完了才会触发,不是消耗完就触发

ws.on('drain',()=>{  console.log('干了');  write();})write();复制代码

可写流实现原理

let fs = require('fs');let WS = require('./WriteStream')let ws = new WS('./2.txt', {  flags: 'w',   highWaterMark: 1,   encoding: 'utf8',  start: 0,  autoClose: true,   mode: 0o666, });let i = 9;function write() {  let flag = true;  while (flag && i >= 0) {    i--;    flag = ws.write('111'); // 987 // 654 // 321 // 0    console.log(flag)  }}write();ws.on('drain', function () {  console.log('干了');  write();});复制代码

WriteStream.js

let fs = require('fs');let EventEmitter = require('events');class WriteStream extends EventEmitter {  constructor(path, options = {}) {    super();    this.path = path;    this.flags = options.flags || 'w';    this.encoding = options.encoding || 'utf8';    this.start = options.start || 0;    this.pos = this.start;    this.mode = options.mode || 0o666;    this.autoClose = options.autoClose || true;    this.highWaterMark = options.highWaterMark || 16 * 1024;    this.open(); // fd 异步的  触发一个open事件当触发open事件后fd肯定就存在了    // 写文件的时候 需要的参数有哪些    // 第一次写入是真的往文件里写    this.writing = false; // 默认第一次就不是正在写入    // 缓存我用简单的数组来模拟一下    this.cache = [];    // 维护一个变量 表示缓存的长度    this.len = 0;    // 是否触发drain事件    this.needDrain = false;  }  clearBuffer() {    let buffer = this.cache.shift();    if (buffer) { // 缓存里有      this._write(buffer.chunk, buffer.encoding, () => this.clearBuffer());    } else {// 缓存里没有了      if (this.needDrain) { // 需要触发drain事件        this.writing = false; // 告诉下次直接写就可以了 不需要写到内存中了        this.needDrain = false;        this.emit('drain');      }    }  }  _write(chunk, encoding, clearBuffer) { // 因为write方法是同步调用的此时fd还没有获取到,所以等待获取到再执行write操作    if (typeof this.fd != 'number') {      return this.once('open', () => this._write(chunk, encoding, clearBuffer));    }    fs.write(this.fd, chunk, 0, chunk.length, this.pos, (err, byteWritten) => {      this.pos += byteWritten;      this.len -= byteWritten; // 每次写入后就要再内存中减少一下      clearBuffer(); // 第一次就写完了    })  }  write(chunk, encoding = this.encoding) { // 客户调用的是write方法去写入内容    // 要判断 chunk必须是buffer或者字符串 为了统一,如果传递的是字符串也要转成buffer    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding);    this.len += chunk.length; // 维护缓存的长度 3    let ret = this.len < this.highWaterMark;    if (!ret) {      this.needDrain = true; // 表示需要触发drain事件    }    if (this.writing) { // 正在写入应该放到内存中      this.cache.push({        chunk,        encoding,      });    } else { // 第一次      this.writing = true;      this._write(chunk, encoding, () => this.clearBuffer()); // 专门实现写的方法    }    return ret; // 能不能继续写了,false表示下次的写的时候就要占用更多内存了  }  destroy() {    if (typeof this.fd != 'number') {      this.emit('close');    } else {      fs.close(this.fd, () => {        this.emit('close');      });    }  }  open() {    fs.open(this.path, this.flags, this.mode, (err, fd) => {      if (err) {        this.emit('error', err);        if (this.autoClose) {          this.destroy(); // 如果自动关闭就销毁文件描述符        }        return;      }      this.fd = fd;      this.emit('open', this.fd);    });  }}module.exports = WriteStream;复制代码

管道流

let fs = require('fs');let rs = fs.createReadStream('./2.txt',{  highWaterMark:1});let ws = fs.createWriteStream('./1.txt',{  highWaterMark:3});rs.pipe(ws); // 会控制速率(防止淹没可用内存)复制代码
  • pipe方法 叫管道,可以控制速率,pipe会监听rs的on('data'),将读取到的内容调用ws.write方法
  • 调用写的方法会返回一个boolean类型
  • 如果返回了false就调用rs.pause()暂停读取
  • 等待可写流写入完毕后 on('drain')在恢复读取

pip实现原理

let RS = require('./ReadStream');let WS = require('./WriteStream');let rs = new RS('./1.txt',{  highWaterMark:4})let ws = new WS('./2.txt', {  highWaterMark: 1});rs.pipe(ws);复制代码

ReadStream.js

let fs = require('fs');let EventEmitter = require('events');class ReadStream extends EventEmitter {  constructor(path, options = {}) {    super();    this.path = path;    this.highWaterMark = options.highWaterMark || 64 * 1024;    this.autoClose = options.autoClose || true;    this.start = options.start || 0;     this.pos = this.start;    this.end = options.end || null;    this.encoding = options.encoding || null;    this.flags = options.flags || 'r';        this.flowing = null;    this.buffer = Buffer.alloc(this.highWaterMark);    this.open();     // {newListener:[fn]}    this.on('newListener', (type) => { // 等待着 它监听data事件      if (type === 'data') {        this.flowing = true;        this.read();// 开始读取 客户已经监听了data事件      }    })  }  pause(){    this.flowing = false;  }  resume(){    this.flowing =true;    this.read();  }  read(){    if(typeof this.fd !== 'number'){       return this.once('open',() => this.read());    }    let howMuchToRead = this.end?Math.min(this.end-this.pos+1,this.highWaterMark): this.highWaterMark;    fs.read(this.fd, this.buffer, 0, howMuchToRead, this.pos, (error, byteRead) => {       this.pos += byteRead;       // this.buffer默认就是三个      let b = this.encoding ? this.buffer.slice(0, byteRead).toString(this.encoding) : this.buffer.slice(0, byteRead);      this.emit('data', b);      if ((byteRead === this.highWaterMark)&&this.flowing){        return this.read();       }      if (byteRead < this.highWaterMark){        this.emit('end');         this.destroy();       }    });  }  destroy() {    if (typeof this.fd != 'number') { return this.emit('close'); }    fs.close(this.fd, () => {      this.emit('close');    });  }  open() {    fs.open(this.path, this.flags, (err, fd) => {      if (err) {        if (this.autoClose) {          this.destroy();        }        this.emit('error', err);         return;      }      this.fd = fd;       this.emit('open', this.fd);    });  }  pipe(dest){    this.on('data',(data)=>{      let flag = dest.write(data);      if(!flag){        this.pause();      }    });    dest.on('drain',()=>{      console.log('写一下听一下')      this.resume();    });  }}module.exports = ReadStream;复制代码

转载地址:http://vpcha.baihongyu.com/

你可能感兴趣的文章
安装win2008r2、域控、IIS、证书服务器、部署exchange2010
查看>>
centos6.2安装tomcat
查看>>
利用ansible实现一键化部署 rsync服务
查看>>
nginx根据条件跳转+跳转规则
查看>>
(转载)Javascript异步编程的4种方法
查看>>
ACM suvey
查看>>
Oracle的case 用法
查看>>
Python之路【第二十七篇】:反射
查看>>
敌兵布阵
查看>>
Web.config详解 [转]
查看>>
PHP杂记
查看>>
面试题整理10
查看>>
POP跳转页面,从3号跳回1号,
查看>>
[Android] keytools生成jsk文件以及获取sha1码
查看>>
一道算法题
查看>>
qt.network.ssl: QSslSocket: cannot call unresolved function SSLv23_client_method
查看>>
WM-结汇
查看>>
概述--Nginx集成Vcenter 6.X HTML Console系列之 1--(共4)
查看>>
mysql查询重复
查看>>
ORACLE触发器的管理与实际应用【weber出品】
查看>>