








    同时整个服务端的流程处理,建立于事件机制上。在 [接受连接->读->业务处理->写 ->关闭连接

    相关事件定义 在这个模型中,我们定义了一些基本的事件:








    当客户端可以开始接受服务端发送数据时触发该事件,通过该事件,我们可以向客户端发送回应数据。在本模型中,事件处理器只需要在该事件中设置 。












    1. public interface Serverlistener
    2. {
    3. public void onError(String error);
    4. public void onAccept() throws Exception;
    5. public void onAccepted(Request request) throws Exception;
    6. public void onRead(Request request) throws Exception;
    7. public void onWrite(Request request, Response response) throws Exception;
    8. public void onClosed(Request request) throws Exception;
    9. }

    2. 事件适配器(EventAdapter):


    1. public abstract class EventAdapter
    2. implements Serverlistener
    3. {
    4. public EventAdapter() {}
    5. public void onError(String error) {}
    6. public void onAccept() throws Exception {}
    7. public void onAccepted(Request request) throws Exception {}
    8. public void onRead(Request request) throws Exception {}
    9. public void onWrite(Request request, Response response) throws Exception {}
    10. public void onClosed(Request request) throws Exception {}
    11. }

    3. 事件触发器(Notifier):


    1. public class Notifier
    2. {
    3. private static Arraylist listeners = null;
    4. private static Notifier instance = null;
    5. private Notifier()
    6. {
    7. listeners = new Arraylist();
    8. }
    9. /**
    10. * 获取事件触发器
    11. * @return 返回事件触发器
    12. */
    13. public static synchronized Notifier
    14. getNotifier()
    15. {
    16. if (instance == null)
    17. {
    18. instance = new Notifier();
    19. return instance;
    20. }
    21. else
    22. {
    23. return instance;
    24. }
    25. }
    26. /**
    27. * 添加事件监听器
    28. * @param l 监听器
    29. */
    30. public void addlistener(Serverlistener l)
    31. {
    32. synchronized (listeners)
    33. {
    34. if (!listeners.contains(l))
    35. {
    36. listeners.add(l);
    37. }
    38. }
    39. }
    40. public void fireOnAccept()
    41. throws Exception
    42. {
    43. for (int i = listeners.size() – 1;
    44. i >= 0; i–)
    45. {
    46. ( (Serverlistener) listeners.
    47. get(i)).onAccept();
    48. }
    49. }
    50. // other fire method
    51. }

    4. 事件处理器(Handler):


    1. public class ServerHandler
    2. extends EventAdapter
    3. {
    4. public ServerHandler() {}
    5. public void onRead(Request request)
    6. throws Exception
    7. {
    8. System.out.println(“Received: “ +
    9. new String(data));
    10. }
    11. }

    5. 事件处理器的注册。


    1. ServerHandler handler = new ServerHandler();
    2. Notifier.addlistener(handler);




    1. 主控服务线程(Server):


    1. public class Server implements Runnable
    2. {
    3. private static List wpool = new LinkedList();
    4. private static Selector selector;
    5. private ServerSocketChannel sschannel;
    6. private InetSocketAddress address;
    7. protected Notifier notifier;
    8. private int port;
    9. private static int MAX_THREADS = 4;
    10. /**
    11. * Creat the main thread
    12. * @param port server port
    13. * @throws java.lang.Exception
    14. */
    15. public Server(int port) throws Exception
    16. {
    17. this.port = port;
    18. // event dispatcher
    19. notifier = Notifier.getNotifier();
    20. // create the thread pool for reading and writing
    21. for (int i = 0; i < MAX_THREADS; i++)
    22. {
    23. Thread r = new Reader();
    24. Thread w = new Writer();
    25. r.start();
    26. w.start();
    27. }
    28. // create nonblocking socket
    29. selector = Selector.open();
    30. sschannel = ServerSocketChannel.open();
    31. sschannel.configureBlocking(false);
    32. address = new InetSocketAddress(port);
    33. ServerSocket ss = sschannel.socket();
    34. ss.bind(address);
    35. sschannel.register(selector, SelectionKey.OP_ACCEPT);
    36. }
    37. public void run()
    38. {
    39. System.out.println(“Server started “);
    40. System.out.println(“Server listening on port: “ + port);
    41. while (true)
    42. {
    43. try
    44. {
    45. int num = 0;
    46. num = selector.select();
    47. if (num > 0)
    48. {
    49. Set selectedKeys = selector.selectedKeys();
    50. Iterator it = selectedKeys.iterator();
    51. while (it.hasNext())
    52. {
    53. SelectionKey key = (SelectionKey) it.next();
    54. it.remove();
    55. if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT)
    56. {
    57. // Accept the new connection
    58. ServerSocketChannel ssc =
    59. (ServerSocketChannel) key.channel();
    60. notifier.fireOnAccept();
    61. SocketChannel sc = ssc.accept();
    62. sc.configureBlocking(false);
    63. Request request = new Request(sc);
    64. notifier.fireOnAccepted(request);
    65. sc.register(selector, SelectionKey.OP_READ,request);
    66. }
    67. else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ)
    68. {
    69. Reader.processRequest(key);
    70. key.cancel();
    71. }
    72. else if ((key.readyOps() & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE)
    73. {
    74. Writer.processRequest(key);
    75. key.cancel();
    76. }
    77. }
    78. }
    79. //this selector’s wakeup method is invoked
    80. else
    81. {
    82. //register new channel for writing to selector
    83. addRegister();
    84. }
    85. }
    86. catch (Exception e)
    87. {
    88. notifier.fireOnError(“Error occured in Server: “
    89. + e.getMessage());
    90. continue;
    91. }
    92. }
    93. }
    94. private void addRegister()
    95. {
    96. synchronized (wpool)
    97. {
    98. while (!wpool.isEmpty())
    99. {
    100. SelectionKey key = (SelectionKey) wpool.remove(0);
    101. SocketChannel schannel = (SocketChannel) key.channel();
    102. try
    103. {
    104. schannel.register(selector, SelectionKey.OP_WRITE, key
    105. .attachment());
    106. }
    107. catch (Exception e)
    108. {
    109. try
    110. {
    111. schannel.finishConnect();
    112. schannel.close();
    113. schannel.socket().close();
    114. notifier.fireOnClosed((Request) key.attachment());
    115. }
    116. catch (Exception e1)
    117. {
    118. }
    119. notifier.fireOnError(“Error occured in addRegister: “
    120. + e.getMessage());
    121. }
    122. }
    123. }
    124. }
    125. public static void processWriteRequest(SelectionKey key)
    126. {
    127. synchronized (wpool)
    128. {
    129. wpool.add(wpool.size(), key);
    130. wpool.notifyAll();
    131. }
    132. selector.wakeup();
    133. }
    134. }

    2. 读线程(Reader):


    1. public class Reader extends Thread
    2. {
    3. public void run()
    4. {
    5. while (true)
    6. {
    7. try
    8. {
    9. SelectionKey key;
    10. synchronized (pool)
    11. {
    12. while (pool.isEmpty())
    13. {
    14. pool.wait();
    15. }
    16. key = (SelectionKey) pool.remove(0);
    17. }
    18. // 读取客户端数据,并触发onRead事件
    19. read(key);
    20. }
    21. catch (Exception e)
    22. {
    23. continue;
    24. }
    25. }
    26. }
    27. }

    3. 写线程(Writer):


    1. public final class Writer extends Thread
    2. {
    3. public void run()
    4. {
    5. while (true)
    6. {
    7. try
    8. {
    9. SelectionKey key;
    10. synchronized (pool)
    11. {
    12. while (pool.isEmpty())
    13. {
    14. pool.wait();
    15. }
    16. key = (SelectionKey) pool.remove(0);
    17. }
    18. write(key);
    19. }
    20. catch (Exception e)
    21. {
    22. continue;
    23. }
    24. }
    25. }
    26. }





    1. 实现时间查询服务的事件处理器(TimeHandler):

    1. public class TimeHandler extends EventAdapter
    2. {
    3. public TimeHandler() {}
    4. public void onWrite(Request request, Response response) throws Exception
    5. {
    6. String command = new String(request.getDataInput());
    7. String time = null;
    8. Date date = new Date();
    9. if (command.equals(“GB”))
    10. {
    11. DateFormat cnDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
    12. DateFormat.FulL, Locale.CHINA);
    13. time = cnDate.format(date);
    14. }
    15. else
    16. {
    17. DateFormat enDate = DateFormat.getDateTimeInstance(DateFormat.FulL,
    18. DateFormat.FulL, Locale.US);
    19. time = enDate.format(date);
    20. }
    21. response.send(time.getBytes());
    22. }
    23. }

    2. 实现日志记录服务的事件处理器(LogHandler):

    1. public class LogHandler extends EventAdapter
    2. {
    3. public LogHandler() {}
    4. public void onClosed(Request request)
    5. throws Exception
    6. {
    7. String log = new Date().toString() + ” from “ + request.getAddress().toString();
    8. System.out.println(log);
    9. }
    10. public void onError(String error)
    11. {
    12. System.out.println(“Error: “ + error);
    13. }
    14. }

    3. 启动程序:

    1. public class Start
    2. {
    3. public static void main(String[] args)
    4. {
    5. try
    6. {
    7. LogHandler loger = new LogHandler();
    8. TimeHandler timer = new TimeHandler();
    9. Notifier notifier = Notifier.getNotifier();
    10. notifier.addlistener(loger);
    11. notifier.addlistener(timer);
    12. System.out.println(“Server starting “);
    13. Server server = new Server(5100);
    14. Thread tServer = new Thread(server);
    15. tServer.start();
    16. }
    17. catch (Exception e)
    18. {
    19. System.out.println(“Server error: “ + e.getMessage());
    20. System.exit(-1);
    21. }
    22. }
    23. }

    小  结


