reactor pattern time server example

下面是Reactor Pattern多Reactor多线程模型的简单示例,可以简单响应客户端的请求,但是没有处理TCP/IP传输数据包的粘包/分包相关问题。

直接运行此类的main()方法,里面有一个简单的同步阻塞的客户端实现。

多Reactor多线程模型示例代码

package com.example.nio.reactor;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.BufferOverflowException;import java.nio.ByteBuffer;import java.nio.channels.*;import java.time.LocalDateTime;import java.util.*;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;/** * 多Reactor多线程模型 * * <pre> * 与单Reactor多线程模型相比,是将Reactor分成两部分: * 1. MainReactor负责监听server socket,用来处理新连接的接收,将建立的socketChannel指定注册给SubReactor。 * 2. SubReactor维护自己的selector,基于mainReactor注册的socketChannel多路分离IO读写事件,读写网络数据, *    将接收到的数据发给业务线程池worker处理,并在SubReactor线程池中返回业务处理的结果。 * 3. 业务逻辑代码通常比较耗时,不要在reactor线程处理。 * </pre> */public class TimeServerMultipleReactor {    private static final Logger LOGGER = LoggerFactory.getLogger(TimeServerMultipleReactor.class);    private static final int POOL_SIZE = 4;    private static final AtomicInteger WORKER_NUMBER = new AtomicInteger(1);    private static final AtomicInteger SUB_REACTOR_NUMBER = new AtomicInteger(1);    private static final AtomicInteger MAIN_REACTOR_LOOPS = new AtomicInteger();    private final List<Queue<SocketChannel>> socketChannelQueues = new ArrayList<>(POOL_SIZE);    private final Selector[] selectors = new Selector[POOL_SIZE];    private final ExecutorService workerPool =            new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE * 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000),                    new ThreadFactory() {                        @Override                        public Thread newThread(Runnable r) {                            Thread thread = new Thread(r);                            thread.setName("WORKER-" + WORKER_NUMBER.getAndIncrement());                            return thread;                        }                    }, new ThreadPoolExecutor.CallerRunsPolicy());    private final ExecutorService subReactorPool =            new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),                    new ThreadFactory() {                        @Override                        public Thread newThread(Runnable r) {                            Thread thread = new Thread(r);                            thread.setName("SUB-REACTOR-" + SUB_REACTOR_NUMBER.getAndIncrement());                            return thread;                        }                    }, new ThreadPoolExecutor.CallerRunsPolicy());    public static void main(String[] args) {        TimeServerMultipleReactor server = new TimeServerMultipleReactor();        server.start();        // Test Client Requests        ExecutorService testExecutorService = Executors.newFixedThreadPool(5);        for (int i = 0; i < 6; i++) {            testExecutorService.submit(new TimeServerMultipleReactorClient());        }        testExecutorService.shutdown();    }    public void start() {        // 接受 TCP 连接的主线程        Thread mainReactorThread = new Thread(new MainReactor());        mainReactorThread.setName("MAIN-REACTOR");        mainReactorThread.start();        // 初始化并启动 SubReactor 线程        for (int i = 0; i < POOL_SIZE; i++) {            socketChannelQueues.add(new ConcurrentLinkedQueue<>());            subReactorPool.submit(new SubReactor(i));        }        subReactorPool.shutdown();    }    private class MainReactor implements Runnable {        @Override        public void run() {            try {                Selector selector = Selector.open();                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();                serverSocketChannel.bind(new InetSocketAddress(8080));                serverSocketChannel.configureBlocking(false);                SelectionKey selectionKey = serverSocketChannel.register(selector, serverSocketChannel.validOps());                selectionKey.attach(new Acceptor());                LOGGER.info("server start at port : {}", 8080);                // 这个是分配 socketChannel 到不同的 SubReactor 中去的索引                int index = 0;                while (true) {                    MAIN_REACTOR_LOOPS.getAndIncrement();                    int select = selector.select(1000);                    int counter = MAIN_REACTOR_LOOPS.get();                    if ((counter & 0x3F) == 0) {                        LOGGER.info("MAIN_REACTOR_LOOPS : {} for main selector {}", counter, select);                    }                    if (select != 0) {                        Set<SelectionKey> selectionKeys = selector.selectedKeys();                        Iterator<SelectionKey> iterator = selectionKeys.iterator();                        while (iterator.hasNext()) {                            SelectionKey sk = iterator.next();                            if (sk.isAcceptable()) {                                Acceptor acceptor = (Acceptor) sk.attachment();                                acceptor.accept(index++, selectionKey);                                if (index >= POOL_SIZE) {                                    index = 0;                                }                            }                        }                        selectionKeys.clear();                    }                }            } catch (Exception e) {                e.printStackTrace();            }        }    }    private class Acceptor {        public synchronized void accept(int index, SelectionKey selectionKey) {            try {                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();                SocketChannel socketChannel = serverSocketChannel.accept();                if (Objects.nonNull(socketChannel)) {                    socketChannel.configureBlocking(false);                    // 使用这个共享队列,而不是使用共享sub Selector,避免在Selector及其publicKeys上产生锁竞争                    boolean offer = socketChannelQueues.get(index).offer(socketChannel);                    // wakeup 对应的用路复用器                    selectors[index].wakeup();                    if (!offer) {                        LOGGER.warn("offer socketChannel failure : {}", socketChannel);                    } else {                        LOGGER.info("offer socketChannel success : {}", socketChannel);                    }                }            } catch (Exception e) {                LOGGER.error("acceptor exception", e);            }        }    }    private class SubReactor implements Runnable {        private int index;        SubReactor(int index) {            this.index = index;        }        @Override        public void run() {            try {                final Selector selector = Selector.open();                LOGGER.info("add sub reactor : {}", selector);                selectors[index] = selector;                // 每个子线程有一个I/O完成的消息发送队列,避免并发操作产生问题                Queue<SelectionKey> queue = new LinkedBlockingQueue<>();                // 每个子线程有一个channelSocket队列,均匀分布请求数                Queue<SocketChannel> socketChannels = socketChannelQueues.get(index);                int counter = 0;                while (!Thread.interrupted()) {                    counter++;                    int select = selector.select(10_000);                    if (select != 0) {                        Set<SelectionKey> selectionKeys = selector.selectedKeys();                        Iterator<SelectionKey> iterator = selectionKeys.iterator();                        while (iterator.hasNext()) {                            dispatch(iterator.next(), queue);                        }                        selectionKeys.clear();                    }                    if ((counter & 0x7F) == 0) {                        LOGGER.info("sub reactor select counter : {} and select : {}", counter, select);                    }                    SocketChannel socketChannel = socketChannels.poll();                    do {                        if (Objects.nonNull(socketChannel)) {                            // register new socketChannel to this selector                            try {                                LOGGER.info("register socketChannel to selector : {}", selector);                                socketChannel.register(selector, SelectionKey.OP_READ);                            } catch (ClosedChannelException e) {                                e.printStackTrace();                            }                        }                    } while (Objects.nonNull(socketChannel = socketChannels.poll()));                }            } catch (IOException e) {                e.printStackTrace();            }        }    }    private void dispatch(SelectionKey selectionKey, final Queue<SelectionKey> queue) {        SelectableChannel selectableChannel = selectionKey.channel();        if (selectionKey.isReadable()) {            SocketChannel channel = (SocketChannel) selectableChannel;            if (!channel.isConnected()) {                LOGGER.info("channel is closed : {}", channel);                return;            }            try {                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);                int read = channel.read(byteBuffer);                if (read == -1) {                    LOGGER.info("close channel");                    channel.close();                } else {                    if (read > 0) {                        byteBuffer.flip();                        int remaining = byteBuffer.remaining();                        byte[] bytes = new byte[remaining];                        byteBuffer.get(bytes);                        String request = new String(bytes);                        // 多线程负责I/O处理                        workerPool.submit(new Processor(request, selectionKey, queue));                    } else {                        LOGGER.info("read 0 bytes for channel : {}", channel);                    }                }            } catch (IOException e) {                try {                    LOGGER.error("close channel for selectionKey : {}", selectionKey, e);                    selectionKey.channel();                    channel.close();                } catch (IOException ex) {                    ex.printStackTrace();                }            }        } else if (selectionKey.isWritable()) {            send(selectionKey, queue);        }    }    private class Processor implements Runnable {        private String request;        private SelectionKey selectionKey;        private Queue<SelectionKey> queue;        Processor(String request, SelectionKey selectionKey, Queue<SelectionKey> queue) {            this.request = request;            this.selectionKey = selectionKey;            this.queue = queue;        }        @Override        public void run() {            try {                // 业务处理时间                TimeUnit.MILLISECONDS.sleep(10L);                LOGGER.info("request : {}", request);                boolean offer = queue.offer(selectionKey);                if (!offer) {                    LOGGER.info("add queue failure for selectionKey : {}", selectionKey);                } else {                    String attachment = request + " : " + LocalDateTime.now();                    // add interest OP_WRITE                    selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);                    // 并添加附件                    selectionKey.attach(attachment);                    // wakeup被select()阻塞的selector                    selectionKey.selector().wakeup();                }            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }    private void send(SelectionKey selectionKey, Queue<SelectionKey> queue) {        if (queue.isEmpty()) {            // queue should not be empty            return;        }        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);        int size = queue.size();        boolean removed = queue.remove(selectionKey);        LOGGER.info("queue size : {}, removed selectionKey : {}", size, removed);        SocketChannel channel = (SocketChannel) selectionKey.channel();        String body = (String) selectionKey.attachment();        if (Objects.isNull(body)) {            return;        }        try {            LOGGER.info("send response : {}", body);            byteBuffer.clear();            try {                byteBuffer.put(body.getBytes());            } catch (BufferOverflowException b) {                byteBuffer.put("ERROR".getBytes());            }            byteBuffer.flip();            channel.write(byteBuffer);            // remove interest OP_WRITE and attachment            selectionKey.attach(null);            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);        } catch (Exception e) {            LOGGER.error("byteBuffer or channel write error : {}", selectionKey, e);            try {                channel.close();            } catch (IOException ex) {                ex.printStackTrace();            }        }    }    private static class TimeServerMultipleReactorClient implements Callable<Boolean> {        private static final Logger LOGGER = LoggerFactory.getLogger(TimeServerMultipleReactorClient.class);        @Override        public Boolean call() throws IOException, InterruptedException {            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(64);            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(8080));            // 同步请求            // socketChannel.configureBlocking(false);            socketChannel.socket().setTcpNoDelay(true);            while (!socketChannel.finishConnect()) {                LOGGER.info("connecting ...");                TimeUnit.MILLISECONDS.sleep(5L);            }            for (int i = 1; i <= 20; i++) {                byteBuffer.put(String.format("%d : TIME", i).getBytes());                byteBuffer.flip();                socketChannel.write(byteBuffer);                TimeUnit.MILLISECONDS.sleep(300L);                byteBuffer.clear();                if (socketChannel.read(byteBuffer) > 0) {                    byteBuffer.flip();                    byte[] response = new byte[byteBuffer.remaining()];                    byteBuffer.get(response);                    LOGGER.info("{} : receive response : {}", i, new String(response));                    byteBuffer.clear();                }            }            socketChannel.shutdownOutput(); // 服务端会收到FIN包            TimeUnit.MILLISECONDS.sleep(300L);            socketChannel.close();            return true;        }    }}

logback.xml

为了方便查看线程运行情况,加入logback依赖及其配置文件,内容如下:

<?xml version="1.0" encoding="UTF-8"?><configuration>    <property name="CONSOLE_LOG_PATTERN"              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(-%5p) --- [%15.15thread] %cyan(%-50.50logger{49} [%5line] :) %m%n%ex"/>    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">        <encoder>            <pattern>${CONSOLE_LOG_PATTERN}</pattern>            <charset>utf8</charset>        </encoder>    </appender>    <root level="DEBUG">        <appender-ref ref="CONSOLE"/>    </root></configuration>

Selector 在不同线程中对象锁竞争问题

多路复用器selectorregister(...)select(...)操作会有锁冲突,在 reactor pattern skeleton example 文章中已经简单说明了多路复用器上有很多同步操作,锁竞争很严重,并可能阻塞线程,为了避免在selector及其属性对象上有同步操作,本示例里将MainReactor新接入的socketChannel加入到对应SubReactor线程的不同队列里,另外IO请求处理完的结果也用队列来维护减少并发问题,可以看源码注释说明。

References

  1. Scalable IO in Java