反应式编程是处理数据流和变化传播的编程范例。 这意味着,当数据流由一个组件发出时,更改将通过反应式编程库传播到其他组件。变化的传播将持续到最终的接收者。 事件驱动和反应式编程的区别在于,事件驱动式编程围绕事件展开,反应式编程围绕着数据展开。
ReactiveX或RX用于反应式编程
ReactiveX或者Raective Extension是反应式编程最着名的实现。 ReactiveX的工作取决于以下两个类 -
可观察的类
这个类是数据流或事件的来源,它打包传入的数据,以便数据可以从一个线程传递到另一个线程。 在某些观察者订阅它之前,它不会提供数据。
观察员类
该类使用observable
发出的数据流。 可以有多个可观察的观察者,每个观察者将接收每个发射的数据项。 观察者可以通过订阅可观察到的三种类型的事件 -
on_next()
事件 - 它意味着数据流中有一个元素。on_completed()
事件 - 它意味着排放已经结束,没有更多数据项到来。on_error()
事件 - 它也意味着排放的结束,但在可观察到抛出错误的情况下。
RxPY - 用于反应式编程的Python模块
RxPY是一个Python模块,可用于反应式编程。 我们需要确保模块已安装。 以下命令可用于安装RxPY模块 -
pip install RxPY
例子
以下是一个Python脚本,它使用RxPY模块及Observable
类和Observe
类来进行反应式编程。 基本上有两类 -
get_strings()
- 用于从观察者获取字符串。PrintObserver()
- 用于从观察者打印字符串。 它使用观察员班的所有三个事件。 它也使用subscribe()
类。
参考以下实现代码 -
from rx import Observable, Observer
def get_strings(observer):
observer.on_next("Ram")
observer.on_next("Mohan")
observer.on_next("Shyam")
observer.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Finished")
def on_error(self, error):
print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())
执行上面示例代码,得到以下结果 -
Received Ram
Received Mohan
Received Shyam
Finished
用于反应式编程的PyFunctional库
PyFunctionalis
是另一个可用于响应式编程的Python库。 它使我们能够使用Python编程语言创建功能程序。 这很有用,因为它允许我们通过使用链式函数操作符来创建数据管道。
RxPY和PyFunctional之间的区别
这两个库都用于响应式编程,并以类似的方式处理流,但两者的主要区别取决于数据的处理。 RxPY处理系统中的数据和事件,而PyFunctional专注于使用函数式编程范例转换数据。
安装PyFunctional模块
需要在使用之前安装这个模块。可以通过以下pip命令来安装 -
pip install pyfunctional
例子
以下示例使用PyFunctional
模块及其seq
类,它们充当可以迭代和操作的流对象。 在这个程序中,它使用将每个值加倍的lamda
函数映射序列,然后过滤x
大于4
的值,最后将序列减少为所有剩余值的和。
from functional import seq
result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)
print ("Result: {}".format(result))
执行上面示例代码,得到以下结果 -
Result: 6