反应式编程

反应式编程是一种编程范例,用于处理数据流和变化的传播.这意味着当一个组件发出数据流时,更改将通过响应式编程库传播到其他组件.变化的传播将持续到达最终接收器.事件驱动和反应式编程之间的区别在于事件驱动编程围绕事件而反应式编程围绕数据.

ReactiveX或RX用于反应式编程

ReactiveX或Raective Extension是最着名的反应式编程实现. ReactiveX的工作取决于以下两个类 :

Observable类

此类是数据流或事件的来源,它包装传入数据,以便数据可以从一个线程传递到另一个线程.在某些观察者订阅数据之前,它不会提供数据.

Observer类

此类使用 observable发出的数据流.可以有多个具有可观察性的观察者,每个观察者将接收发射的每个数据项.观察者可以通过订阅observable : 来接收三种类型的事件;

  • on_next()事件 : 这意味着数据流中有一个元素.

  • on_completed()事件 : 它意味着排放结束,不再有物品到来.

  • on_error()事件 : 它还意味着发射结束,但是如果 observable 引发错误.

RxPY  -  Python用于反应式编程的模块

RxPY是一个可用于反应式编程的Python模块.我们需要确保安装该模块.以下命令可用于安装RxPY模块 :

 
 pip install RxPY

示例

以下是一个Python脚本,它使用 RxPY 模块及其类 Observable 观察用于反应式编程.基本上有两个类和减号;

  • get_strings() : 从观察者那里获取字符串.

  • PrintObserver() : 用于从观察者打印字符串.它使用观察者类的所有三个事件.它还使用subscribe()类.

from rx import Observable, Observer
def get_strings(observer):
   observer.on_next("Ram")
   observer.on_next("Mohan")
   observer.on_next("Shyam")
      observer.on_completed()
class PrintObserver(Observer):
   def on_next(self, value):
      print("Received {0}".format(value))
   def on_completed(self):
   print("Finished")
   def on_error(self, error):
      print("Error: {0}".format(error))
source = Observable.create(get_strings)
source.subscribe(PrintObserver())

输出

Received Ram
Received Mohan
Received Shyam
Finished

用于反应式编程的PyFunctional库

PyFunctional 是另一个可用于反应式编程的Python库.它使我们能够使用Python编程语言创建功能程序.它很有用,因为它允许我们使用链式函数运算符创建数据管道.

RxPY和PyFunctional之间的区别

这两个库都用于反应式编程并以类似的方式处理流,但两者之间的主要区别取决于数据的处理. RxPY 处理系统中的数据和事件,而 PyFunctional 专注于使用函数式编程范例转换数据.

安装PyFunctional Module

我们需要在使用之前安装此模块.它可以在pip命令的帮助下安装如下 :

 
 pip install pyfunctional

示例

以下示例使用 PyFunctional 模块及其 seq 类作为流对象我们可以迭代和操纵.在这个程序中,它使用lamda函数映射序列,该函数将每个值加倍,然后过滤x大于4的值,最后将序列缩减为所有剩余值的总和.

from functional import seq

result = seq(1,2,3).map(lambda x: x*2).filter(lambda x: x > 4).reduce(lambda x, y: x + y)

print ("Result: {}".format(result))

输出

Result: 6