百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Reactor 高性能设计模式

lipiwang 2024-11-26 06:04 7 浏览 0 评论

Reactor 模式

Rreactor 模型是 I/O 多路复用的升级版,底层依赖于 Java NIO。不熟悉 Java NIO 技术的可以看下:《Java NIO:从 Buffer、Channel、Selector 到 Zero-copy、I/O 多路复用》。

Reactor是高性能网络编程中非常经典且重要的一个设计模式。在很多软件设计实现中应用很广泛,像Netty 的设计中很重要的组成部分就是对 Reactor 模型的一种实现;还有 Redis 的实现中也用到了 Reactor 模型,这也是 Redis 之所以底层单线程但是速度却非常快的原因之一。

这里提供两个非常官方且经典的参考资料:

Scalable IO in Java》由 java.util.concurrent 包的作者 Doug Lea 编写的一个关于 Reactor 模型的介绍;

reactor-siemens》 也是由国外作者编写的一篇研究 Reactor 模型的论文;

需要的小伙伴可以关注公号:“程序员个人修养”,号内回复:Reactor 获取下载地址!

Reactor 模式介绍

什么是 Reactor 模式

Reactor 模式一般翻译成反应器模式,也有人称为分发者模式。是基于事件驱动的设计模式,拥有一个或多个并发输入源,有一个服务处理器和多个请求处理器,服务处理器会同步地将输入的请求事件以多路复用的方式分发给相应的请求处理器。简单来说就是 由一个线程来接收所有的请求,然后派发这些请求到相关的工作线程中。

为什么使用 Reactor 模式

java 中,没有 NIO 出现之前都是使用 Socket 编程。Socket 接收请求是阻塞的,需要处理完一个请求才能处理下一个请求,所以在面对高并发的服务请求时,性能就会很差。

那有人就会说使用多线程(如下图所示)。接收到一个请求,就创建一个线程处理,这样就不会阻塞了。实际上这样的确是可以在提升性能上起到一定的作用,但是当请求很多的时候,就会创建大量的线程,维护线程需要资源的消耗,线程之间的切换也需要消耗性能。而且系统创建线程的数量也是有限的,所以当高并发时,会直接把系统拖垮。

因此,基于 JavaDoug Lea 提出了三种形式的 Reactor 模式:单 Reactor 单线程、单 Reactor 多线程和多 Reactor 多线程。

Reactor 模式中有三个重要的角色:

  • Reactor:负责响应事件,将事件分发到绑定了对应事件的 Handler,如果是连接事件,则分发到 Acceptor
  • Handler:事件处理器。负责执行对应事件对应的业务逻辑;
  • Acceptor:绑定了 connect 事件,当客户端发起 connect 请求时,Reactor 会将 accept 事件分发给 Acceptor 处理;

单 Reactor 单线程版本

只有一个 Selector 循环接受请求,客户端注册进来由 Reactor 接收注册事件,然后再由 Reactor 分发出去,由对应的 Handler 进行业务逻辑处理。

伪代码实例

class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocket;
    
    Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        serverSocket.socket().bind(new InetSocketAddress(port));
        serverSocket.configureBlocking(false);
        SelectionKey sk = serverSocket.register(selector,SelectionKey.OP_ACCEPT);
        sk.attach(new Acceptor());
    }
    
    
    public void run() {
        try {
            while(!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while(it.hasNext()) {
                    dispatch((SelectionKey)(it.next()))
                }
                selected.clear();
            }
        }catch(IOException e){
            
        }
    }
    
    void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null){
            r.run();
        }
    }
    
    class Acceptor implements Runnable {
        public void run() {
            try{
                SocketChannel c =  serverSocket.accept();
                if (c != null){
                    new Handler(selector, c);
                }
            }catch(IOException e) {
                
            }
        }
    }

}



final class Handler implements Runnable {
    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(MAXIN);
    ByteBuffer output = ByteBuffer.allocate(MAXOUT);
    static final int READING = 0, SENDING = 1;
    int state = READING;
    
    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // optionally try first read now
        sk = socket.register(sel,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        /**
         * selector.wakeup(); 唤醒阻塞在select方法上的线程,使其立即返回
         */
        sel.wakeup();
    }
    
    boolean inputIsComplete(){/*……*/}
    boolean outputIsComplete(){/*……*/}
    void process(){/*……*/}
        
    public void run() {
        try{
            if (state == READING){
                read();
            }else if(state == SENDING){
                send();
            }
        }catch(IOException e){
            
        }
    }
    
    void read() throws IOException{
        socket.read(input);
        if(inputIsComplete()){
            process();
            state = SENDING;
            // Normally also do first write now
            sk.interestOps(SelectionKey.OP_WRITE);
        }
    } 
    
    void send() throws IOException{
        socket.write(output);
        if(outputIsComplete()){
            sk.cacel();
        }
    }
}

这里需要注意的两点是

  • Selector.wakeup() 方法的作用是:唤醒阻塞在select方法上的线程,使其立即返回。
  • Reactor.dispatch() 方法中,调用的是任务的 run 方法,同步执行。

单线程的问题实际上是很明显的。只要其中一个 Handler 方法阻塞了,那就会导致所有的 clientHandler 都被阻塞了,也会导致注册事件无法处理,无法接收新的请求。所以这种模式用的比较少,因为不能充分利用到多核的资源。因此,这种模式仅仅只能处理 Handler 比较快速完成的场景。

单 Reactor 多线程版本

在多线程 Reactor 中,注册接收事件都是由 Reactor 来做,其它的计算,编解码由一个线程池来做。从图中可以看出工作线程是多线程的,监听注册事件的 Reactor 还是单线程。

伪代码示例

public class Handler implements Runnable{

    final SocketChannel socket;
    final SelectionKey sk;
    ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
    ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
    static final int READING = 0, SENDING = 1;
    int state = READING;

    static ExecutorService pool = Executors.newCachedThreadPool();

    Handler(Selector sel, SocketChannel c) throws IOException {
        socket = c;
        c.configureBlocking(false);
        // optionally try first read now
        sk = socket.register(sel,0);
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
    }

    boolean inputIsComplete(){return true;}
    boolean outputIsComplete(){return true;}
    void process(){}

    public void run() {
        try{
            if (state == READING){
                read();
            }else if(state == SENDING){
                send();
            }
        }catch(IOException e){

        }
    }

    void send() throws IOException {
        socket.write(output);
        if(outputIsComplete()){
            sk.cancel();
        }
    }

    synchronized void read()  throws IOException{
        socket.read(input);
        if(inputIsComplete()){
            pool.execute(new Processer());
        }
    }

    synchronized void processAndHandOff() {
        process();
        sk.attach(this);
        sk.interestOps(SelectionKey.OP_WRITE);
    }

    class Processer implements Runnable {
        @Override
        public void run() {
            processAndHandOff();
        }
    }
}

对于 Reactor 部分,代码不需要调整,因为也是单 ReactorHandler 部分增加了线程池的支持。

对比单 Reactor 单线程模型,多线程 Reactor 模式在 Handler 读写处理时,交给工作线程池处理,可以充分利用多核cpu的处理能力,因为 Reactor 分发和 Handler 处理是分开的,不会导致 Reactor 无法执行。从而提升应用的性能。缺点是 Reactor 只在主线程中运行,承担所有事件的监听和响应,如果短时间的高并发场景下,依然会造成性能瓶颈。

多 Reactor 多线程版本

也称为主从 Reactor 模式,在这种模式下,一般会有两个 ReactormainReactorsubReactormainReactor 负责监听客户端请求,专门处理新连接的建立,再将建立好的连接注册到 subReactorsubReactor 将分配的连接加入到队列进行监听,当有新的事件发生时,会调用连接相对应的 Handler 进行业务处理。

这样的模型使得每个模块更加专一,耦合度更低,能支持更高的并发量。许多框架也使用这种模式。

Reactor 模式的优点

  1. 响应快,不必为单个同步时间所阻塞,虽然 Reactor 本身依然是同步的。
  2. 可以最大程度地避免复杂的多线程及同步问题,并且避免多线程/进程的切换开销。
  3. 扩展性好,可以方便地通过增加 Reactor 实例个数来充分利用 CPU 资源。
  4. 复用性好,Reactor 模式本身与具体事件处理逻辑无关,具有很高的复用性。

相关推荐

linux实例之设置时区的方式有哪些

linux系统下的时间管理是一个复杂但精细的功能,而时区又是时间管理非常重要的一个辅助功能。时区解决了本地时间和UTC时间的差异,从而确保了linux系统下时间戳和时间的准确性和一致性。比如文件的时间...

Linux set命令用法(linux cp命令的用法)

Linux中的set命令用于设置或显示系统环境变量。1.设置环境变量:-setVAR=value:设置环境变量VAR的值为value。-exportVAR:将已设置的环境变量VAR导出,使其...

python环境怎么搭建?小白看完就会!简简单单

很多小伙伴安装了python不会搭建环境,看完这个你就会了Python可应用于多平台包括Linux和MacOSX。你可以通过终端窗口输入"python"命令来查看本地是否...

Linux环境下如何设置多个交叉编译工具链?

常见的Linux操作系统都可以通过包管理器安装交叉编译工具链,比如Ubuntu环境下使用如下命令安装gcc交叉编译器:sudoapt-getinstallgcc-arm-linux-gnueab...

JMeter环境变量配置技巧与注意事项

通过给JMeter配置环境变量,可以快捷的打开JMeter:打开终端。执行jmeter。配置环境变量的方法如下。Mac和Linux系统在~/.bashrc中加如下内容:export...

C/C++|头文件、源文件分开写的源起及作用

1C/C++编译模式通常,在一个C++程序中,只包含两类文件——.cpp文件和.h文件。其中,.cpp文件被称作C++源文件,里面放的都是C++的源代码;而.h文件则被称...

linux中内部变量,环境变量,用户变量的区别

unixshell的变量分类在Shell中有三种变量:内部变量,环境变量,用户变量。内部变量:系统提供,不用定义,不能修改环境变量:系统提供,不用定义,可以修改,可以利用export将用户变量转为环...

在Linux中输入一行命令后究竟发生了什么?

Linux,这个开源的操作系统巨人,以其强大的命令行界面而闻名。无论你是初学者还是经验丰富的系统管理员,理解在Linux终端输入一条命令并按下回车后发生的事情,都是掌握Linux核心的关键。从表面上看...

Nodejs安装、配置与快速入门(node. js安装)

Nodejs是现代JavaScript语言产生革命性变化的一个主要框架,它使得JavaScript从一门浏览器语言成为可以在服务器端运行、开发各种各样应用的通用语言。在不同的平台下,Nodejs的安装...

Ollama使用指南【超全版】(olaplex使用方法图解)

一、Ollama快速入门Ollama是一个用于在本地运行大型语言模型的工具,下面将介绍如何在不同操作系统上安装和使用Ollama。官网:https://ollama.comGithub:http...

linux移植(linux移植lvgl)

1uboot移植l移植linux之前需要先移植一个bootlader代码,主要用于启动linux内核,lLinux系统包括u-boot、内核、根文件系统(rootfs)l引导程序的主要作用将...

Linux日常小技巧参数优化(linux参数调优)

Linux系统参数优化可以让系统更加稳定、高效、安全,提高系统的性能和使用体验。下面列出一些常见的Linux系统参数优化示例,包括修改默认配置、网络等多方面。1.修改默认配置1.1修改默认编辑器默...

Linux系统编程—条件变量(linux 条件变量开销)

条件变量是用来等待线程而不是上锁的,条件变量通常和互斥锁一起使用。条件变量之所以要和互斥锁一起使用,主要是因为互斥锁的一个明显的特点就是它只有两种状态:锁定和非锁定,而条件变量可以通过允许线程阻塞和等...

面试题-Linux系统优化进阶学习(linux系统的优化)

一.基础必备优化:1.关闭SElinux2.FirewalldCenetOS7Iptables(C6)安全组(阿里云)3.网络管理服务||NetworkManager|network...

嵌入式Linux开发教程:Linux Shell

本章重点介绍Linux的常用操作和命令。在介绍命令之前,先对Linux的Shell进行了简单介绍,然后按照大多数用户的使用习惯,对各种操作和相关命令进行了分类介绍。对相关命令的介绍都力求通俗易懂,都给...

取消回复欢迎 发表评论: