RxJS
定义: 通过使用observable序列来编写异步和基于事件的程序
核心思想: 函数式编程 + 响应式编程
原因
Reactive Programming的兴起
Observable标准化
多语言支持
解决的问题(异步带来的问题)
竞态条件
内存泄露
复杂的状态
异常处理
ps: 统一API
函数式编程
1 2 3 4 5 6 7 8 9 10 11 12
| var sayHello = function () { console.log('hello') }
document.getElementById('a').click(sayHello)
function getFunc () { return function (content) { console.log(content) } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| function addArray (array, item) { array.push(item) return array }
let array = [1, 2, 3] function addArray(item) { let result = [] result.push(...array) result.push(item) return result }
function addArray (array, item) { let result = [] result.push(...array) result.push(item) return result }
|
观察者模式(生产者推送数据)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| class Producer {
constructor() { this.listeners = [] }
addListener(listener) { if(typeof listener === 'function') { this.listeners.push(listener) } else { throw new Error('listener必须是function') } }
removeListener(listener) { this.listeners.splice(this.listeners.indexOf(listener), 1) }
notify(message) { this.listeners.forEach(listener => { listener(message) }) } }
|
迭代器模式(消费者拉取数据)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| class IteratorFromArray { constructor(arr) { this._array = arr this._cursor = 0 } next() { return this._cursor < this._array.length ? { value: this._array[this._cursor++], done: false } : { done: true } } }
|
ps:
|
单个值 |
多个值 |
拉取 |
Function |
Iterator |
推送 |
Promise |
Observable |
Observable具备生产者推送数据的能力,又拥有序列处理数据的方法
Observable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| var observable = Rx.Observable .create(function(observer) { observer.next('Hello') observer.next('World') observer.complete() observer.next('not work') })
var observer = { next: function(value) { console.log(value) }, error: function(error) { console.log(error) }, complete: function() { console.log('complete') } }
observable.subscribe(observer)
|
Observable实现细节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| function subscribe(observer) { observer.next('Hello') observer.next('World') }
subscribe({ next: function(value) { console.log(value) }, error: function(error) { console.log(error) }, complete: function() { console.log('complete') } })
|
操作符
- 创建操作符、转换操作符、过滤操作符、组合操作符、多播操作符、错误处理操作符、工具操作符、条件和布尔操作符、数学和聚合操作符
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| var people = Rx.Observable.of('World', 'China')
function map(source, callback) { return Rx.Observable.create((observer) => { return source.subscribe( (value) => { try{ observer.next(callback(value)) } catch(e) { observer.error(e) } }, (err) => { observer.error(err) }, () => { observer.complete() } ) }) }
var helloPeople = map(people, (item) => ' Hello~' + item)
helloPeople.subscribe(console.log)
|
Subject
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| var source = Rx.Observable.interval(1000).take(3)
var observerA = { next: value => console.log('A next: ' + value),
error: error => console.log('A error: ' + error),
complete: () => console.log('A complete!') }
var observerB = { next: value => console.log('B next: ' + value),
error: error => console.log('B error: ' + error),
complete: () => console.log('B complete!') }
var subject = { observers: [],
addObserver: function(observer) { this.observers.push(observer) },
next: function(value) { this.observers.forEach(o => o.next(value)) },
error: function(error){ this.observers.forEach(o => o.error(error)) },
complete: function() { this.observers.forEach(o => o.complete()) } }
subject.addObserver(observerA)
source.subscribe(subject)
subject.addObserver(observerB)
|