reactor pattern skeleton example

下面这个示例代码基本都是从Doug Lea的这个 PPT,Scalable IO in Java 中复制的,只是简单做了修改,以便这个Server能够初步可以运行起来。

以下为这个示例的源代码:

ReactorMain.java

package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;public class ReactorMain {    private static final Logger LOGGER = LoggerFactory.getLogger(ReactorMain.class);    public static void main(String[] args) throws IOException {        Reactor reactor = new Reactor(8080);        Thread thread = new Thread(reactor);        thread.setName("MAIN-REACTOR");        thread.start();        LOGGER.info("reactor start on port : 8080");    }}

Handler.java

package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public final class Handler implements Runnable {    private static final Logger LOGGER = LoggerFactory.getLogger(Handler.class);    private static final AtomicInteger IO_WORKER_NUMBER = new AtomicInteger(1);    private static final ExecutorService IO_WORKER_POOL =            new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS,                    new LinkedBlockingQueue<>(), new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r);                    thread.setName("IO-WORKER-" + IO_WORKER_NUMBER.getAndIncrement());                    return thread;                }            }, new ThreadPoolExecutor.CallerRunsPolicy());    private static final int MAX_IN = 1024;    private static final int MAX_OUT = 1024;    private static final int READING = 0;    private static final int SENDING = 1;    private static final int PROCESSING = 3;    private ByteBuffer input = ByteBuffer.allocate(MAX_IN);    private ByteBuffer output = ByteBuffer.allocate(MAX_OUT);    private int state = READING;    private final SocketChannel socketChannel;    private final SelectionKey selectionKey;    public Handler(Selector selector, SocketChannel socketChannel) throws IOException {        this.socketChannel = socketChannel;        socketChannel.configureBlocking(false);        // Optionally try first read now        selectionKey = this.socketChannel.register(selector, 0);        selectionKey.attach(this);        selectionKey.interestOps(SelectionKey.OP_READ);        selector.wakeup();    }    boolean inputIsComplete() {        return true;    }    boolean outputIsComplete() {        return true;    }    private void process() {        LOGGER.info("process ...");    }    @Override    public void run() {        try {            if (state == READING) {                read();            } else if (state == SENDING) {                send();            }        } catch (IOException ex) {            ex.printStackTrace();        }    }    synchronized void read() throws IOException {        socketChannel.read(input);        if (inputIsComplete()) {            // for single IO process thread            // process();            // state = SENDING;            // Normally also do first write now            // selectionKey.interestOps(SelectionKey.OP_WRITE);            state = PROCESSING;            IO_WORKER_POOL.execute(new Processor());        }    }    synchronized void processAndHandOff() {        process();        state = SENDING; // or rebind attachment including data for response        selectionKey.interestOps(SelectionKey.OP_WRITE);    }    class Processor implements Runnable {        @Override        public void run() {            processAndHandOff();        }    }    void send() throws IOException {        socketChannel.write(output);        if (outputIsComplete()) selectionKey.cancel();    }}

Reactor.java

package com.example.nio.reactor.pattern;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;public class Reactor implements Runnable {    private static final Logger LOGGER = LoggerFactory.getLogger(Reactor.class);    private static final int SIZE = 2;    private final Selector[] selectors = new Selector[SIZE];    private int next = 0;    private final ServerSocketChannel serverSocketChannel;    private static final AtomicInteger SUB_REACTOR_THREAD_NUMBER = new AtomicInteger(1);    private static final ExecutorService SUB_REACTOR_THREAD_POOL =            new ThreadPoolExecutor(SIZE, SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r);                    thread.setName("SUB-REACTOR-" + SUB_REACTOR_THREAD_NUMBER.getAndIncrement());                    return thread;                }            }, new ThreadPoolExecutor.CallerRunsPolicy());    private static final AtomicInteger HANDLER_THREAD_NUMBER = new AtomicInteger(1);    private static final ExecutorService HANDLER_THREAD_POOL =            new ThreadPoolExecutor(SIZE, SIZE, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadFactory() {                @Override                public Thread newThread(Runnable r) {                    Thread thread = new Thread(r);                    thread.setName("HANDLER-" + HANDLER_THREAD_NUMBER.getAndIncrement());                    return thread;                }            }, new ThreadPoolExecutor.CallerRunsPolicy());    private final Selector selector;    public Reactor(int port) throws IOException {        selector = Selector.open();        serverSocketChannel = ServerSocketChannel.open();        serverSocketChannel.socket().bind(new InetSocketAddress(port));        serverSocketChannel.configureBlocking(false);        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);        selectionKey.attach(new Acceptor());        startSubReactors();    }    private void startSubReactors() throws IOException {        for (int i = 0; i < selectors.length; i++) {            selectors[i] = Selector.open();            final Selector subSelector = selectors[i];            SUB_REACTOR_THREAD_POOL.submit(() -> {                while (!Thread.interrupted()) {                    try {                        int select = subSelector.select(1000L);                        if (select == 0) {                            synchronized (subSelector) {                                // release selector.publicKeys object monitor                                subSelector.wait(10L);                            }                        } else {                            Set<SelectionKey> selectionKeys = subSelector.selectedKeys();                            Iterator<SelectionKey> iterator = selectionKeys.iterator();                            while (iterator.hasNext()) {                                SelectionKey selectionKey = iterator.next();                                handle(selectionKey);                            }                            selectionKeys.clear();                        }                    } catch (IOException e) {                        e.printStackTrace();                    } catch (InterruptedException e) {                        Thread.currentThread().interrupt();                        LOGGER.info("sub reactor selector wait interrupted", e);                    }                }            });        }    }    private void handle(SelectionKey selectionKey) {        Runnable attachment = (Runnable) selectionKey.attachment();        if (attachment != null) {            // attachment is Handler            HANDLER_THREAD_POOL.submit(attachment);        }    }    @Override    public void run() {        try {            while (!Thread.interrupted()) {                selector.select();                Set selected = selector.selectedKeys();                Iterator it = selected.iterator();                while (it.hasNext()) {                    dispatch((SelectionKey) it.next());                }                selected.clear();            }        } catch (IOException e) {            LOGGER.error("main reactor exception", e);        }    }    public void dispatch(SelectionKey selectionKey) {        Runnable attachment = (Runnable) (selectionKey.attachment());        if (attachment != null) {            // attachment is Reactor$Acceptor            LOGGER.info("{} accept {} new socket connection", selectionKey, selectionKey.channel());            attachment.run();        }    }    /**     * run in main reactor thread     */    class Acceptor implements Runnable {        @Override        public synchronized void run() {            try {                SocketChannel socketChannel = serverSocketChannel.accept();                if (socketChannel != null) {                    new Handler(selectors[next], socketChannel);                }                if (++next == selectors.length) {                    next = 0;                }            } catch (IOException e) {                LOGGER.error("acceptor exception", e);            }        }    }}

logback.xml

为了方便查看线程运行情况,加入logback依赖及其配置文件,内容如下:

<?xml version="1.0" encoding="UTF-8"?><configuration>    <property name="CONSOLE_LOG_PATTERN"              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(-%5p) --- [%15.15thread] %cyan(%-40.40logger{39} [%5line] :) %m%n%ex"/>    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">        <encoder>            <pattern>${CONSOLE_LOG_PATTERN}</pattern>            <charset>utf8</charset>        </encoder>    </appender>    <root level="DEBUG">        <appender-ref ref="CONSOLE"/>    </root></configuration>

selector.publicKeys 对象锁竞争问题

线程在运行selector.select(1000)时此会一直执有selector.publicKeys对象锁,Handler中注册socketChannel时,获取不到selector.publicKeys对象锁而被一直阻塞,因此需要在运行selector.select(1000)多路复用器阻塞的线程中释放一下执有的selector.publicKeys对象锁才能让Handler构造方法中socketChannel注册完成。

/** * <pre> * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // SelectorImpl 类的 register() 这个方法中会同步这个 selector.publicKeys 属性,就是 selector.keys() 返回的 * // 对象,如果在 Acceptor 中注册 socketChannel 到 SubReactor 中时,SubReactor 线程因为执有 publicKeys 对象锁并一直 * // while(true) 循环,从而 Acceptor 中的 socketChannel.register() 就被 BLOCK 了,线程栈如下: * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // java.lang.Thread.State: BLOCKED (on object monitor) * //     at sun.nio.ch.SelectorImpl.register(SelectorImpl.java:132) * //     - waiting to lock <0x000000076e8c7390> (a java.util.Collections$UnmodifiableSet) * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // SelectorImpl 中关于同步 publicKeys 对象的代码如下 * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * // protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) { * //     if (!(var1 instanceof SelChImpl)) { * //         throw new IllegalSelectorException(); * //     } else { * //         SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this); * //         var4.attach(var3); * //         synchronized(this.publicKeys) { * //             this.implRegister(var4); * //         } * // * //         var4.interestOps(var2); * //         return var4; * //     } * // } * /////////////////////////////////////////////////////////////////////////////////////////////////////////// * </pre> */int select = subSelector.select(1000L);if (select == 0) {    synchronized (subSelector) {        // release selector.publicKeys object monitor        subSelector.wait(10L);    }}

References

  1. Scalable IO in Java