折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。
前不久自己动手写了一个Android的聊天工具,跟服务器的交互还是基于HTTP方式的,在一般通讯上还算凑活,但是在即时聊天的时候就有点恶心了,客户端开启Service每隔3秒去询问服务器是否有自己的新消息(当然3秒有点太快了),在心疼性能和流量的前提下,只能自己动手写个服务器,传统的Socket是阻塞的,这样的话服务器对每个Socket都需要建立一个线程来操作,资源开销很大,而且线程多了直接会影响服务端的性能(曾经测试开了3000多个线程就不让创建了,所以并发数目也是有限制的),听说从JDK1.5就多了个New
IO,灰常不错的样子,找了找相关的资料,网上竟然全都是最最最简单的一个demo,然后去CSDN发帖,基本上都是建议直接使用MINA框架的,这样一来根本达不到学习NIO的目的,而且现在的技术也太快餐了,只知道使用前辈留下的东西,知其然不知其所以然。
折腾了一个周,终于搞出来了一个雏形,相比于xmpp的xml,本人更喜欢json的简洁,为了防止客户端异常断开等,准备采用心跳检测的机制来判断用户是否在线,另外还有一种方法是学习例如Tomcat等Servlet中间件的方式,设置Session周期,定时清除过期Session。本Demo暂时实现了Session过期检测,心跳检测有空再搞,如果本例子在使用过程中有性能漏洞或者什么bug请及时通知我,谢谢。
废话不多说,关于NIO的SelectionKey、Selector、Channel网上的介绍例子都很多,直接上代码:
JsonParser
Json的解析类,随便封装了下,使用的最近比较火的fastjson
- public class JsonParser {
- private static JSONObject mJson;
- public synchronized static String get(String json,String key) {
- mJson = JSON.parseObject(json);
- return mJson.getString(key);
- }
- }
Main
入口,不解释
- public class Main {
- public static void main(String… args) {
- new SeekServer().start();
- }
- }
Log
- public class Log {
- public static void i(Object obj) {
- System.out.println(obj);
- }
- public static void e(Object e) {
- System.err.println(e);
- }
- }
SeekServer:
服务器端的入口,请求的封装和接收都在此类,端口暂时写死在了代码里,mSelector.select(TIME_OUT) > 0
目的是为了当服务器空闲的时候(没有任何读写甚至请求断开事件),循环时有个间隔时间,不然基本上相当于while(true){//nothing}了,你懂的。
- public class SeekServer extends Thread{
- private final int ACCPET_PORT = 55555;
- private final int TIME_OUT = 1000;
- private Selector mSelector = null;
- private ServerSocketChannel mSocketChannel = null;
- private ServerSocket mServerSocket = null;
- private InetSocketAddress mAddress = null;
- public SeekServer() {
- long sign = System.currentTimeMillis();
- try {
- mSocketChannel = ServerSocketChannel.open();
- if(mSocketChannel == null) {
- System.out.println(“can’t open server socket channel”);
- }
- mServerSocket = mSocketChannel.socket();
- mAddress = new InetSocketAddress(ACCPET_PORT);
- mServerSocket.bind(mAddress);
- Log.i(“server bind port is “ + ACCPET_PORT);
- mSelector = Selector.open();
- mSocketChannel.configureBlocking(false);
- SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT);
- key.attach(new Acceptor());
- //检测Session状态
- Looper.getInstance().loop();
- //开始处理Session
- SessionProcessor.start();
- Log.i(“Seek server startup in “ + (System.currentTimeMillis() – sign) + “ms!”);
- } catch (ClosedChannelException e) {
- Log.e(e.getMessage());
- } catch (IOException e) {
- Log.e(e.getMessage());
- }
- }
- public void run() {
- Log.i(“server is listening…”);
- while(!Thread.interrupted()) {
- try {
- if(mSelector.select(TIME_OUT) > 0) {
- Set<SelectionKey> keys = mSelector.selectedKeys();
- Iterator<SelectionKey> iterator = keys.iterator();
- SelectionKey key = null;
- while(iterator.hasNext()) {
- key = iterator.next();
- Handler at = (Handler) key.attachment();
- if(at != null) {
- at.exec();
- }
- iterator.remove();
- }
- }
- } catch (IOException e) {
- Log.e(e.getMessage());
- }
- }
- }
- class Acceptor extends Handler{
- public void exec(){
- try {
- SocketChannel sc = mSocketChannel.accept();
- new Session(sc, mSelector);
- } catch (ClosedChannelException e) {
- Log.e(e);
- } catch (IOException e) {
- Log.e(e);
- }
- }
- }
- }
Handler:
只有一个抽象方法exec,Session将会继承它。
- public abstract class Handler {
- public abstract void exec();
- }
Session:
封装了用户的请求和SelectionKey和SocketChannel,每次接收到新的请求时都重置它的最后活动时间,通过状态mState=READING
or SENDING 去执行消息的接收与发送,当客户端异常断开时则从SessionManager清除该会话。
- public class Session extends Handler{
- private SocketChannel mChannel;
- private SelectionKey mKey;
- private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240);
- private Charset charset = Charset.forName(“UTF-8”);
- private CharsetDecoder mDecoder = charset.newDecoder();
- private CharsetEncoder mEncoder = charset.newEncoder();
- private long lastPant;//最后活动时间
- private final int TIME_OUT = 1000 * 60 * 5; //Session超时时间
- private String key;
- private String sendData = “”;
- private String receiveData = null;
- public static final int READING = 0,SENDING = 1;
- int mState = READING;
- public Session(SocketChannel socket, Selector selector) throws IOException {
- this.mChannel = socket;
- mChannel = socket;
- mChannel.configureBlocking(false);
- mKey = mChannel.register(selector, 0);
- mKey.attach(this);
- mKey.interestOps(SelectionKey.OP_READ);
- selector.wakeup();
- lastPant = Calendar.getInstance().getTimeInMillis();
- }
- public String getReceiveData() {
- return receiveData;
- }
- public void clear() {
- receiveData = null;
- }
- public void setSendData(String sendData) {
- mState = SENDING;
- mKey.interestOps(SelectionKey.OP_WRITE);
- this.sendData = sendData + “n”;
- }
- public boolean isKeekAlive() {
- return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis();
- }
- public void setAlive() {
- lastPant = Calendar.getInstance().getTimeInMillis();
- }
- /**
- * 注销当前Session
- */
- public void distroy() {
- try {
- mChannel.close();
- mKey.cancel();
- } catch (IOException e) {}
- }
- @Override
- public synchronized void exec() {
- try {
- if(mState == READING) {
- read();
- }else if(mState == SENDING) {
- write();
- }
- } catch (IOException e) {
- SessionManager.remove(key);
- try {
- mChannel.close();
- } catch (IOException e1) {
- Log.e(e1);
- }
- mKey.cancel();
- }
- }
- public void read() throws IOException{
- mRreceiveBuffer.clear();
- int sign = mChannel.read(mRreceiveBuffer);
- if(sign == –1) { //客户端连接关闭
- mChannel.close();
- mKey.cancel();
- }
- if(sign > 0) {
- mRreceiveBuffer.flip();
- receiveData = mDecoder.decode(mRreceiveBuffer).toString();
- setAlive();
- setSign();
- SessionManager.addSession(key, this);
- }
- }
- private void setSign() {
- //设置当前Session的Key
- key = JsonParser.get(receiveData,“imei”);
- //检测消息类型是否为心跳包
- // String type = jo.getString(“type”);
- // if(type.equals(“HEART_BEAT”)) {
- // setAlive();
- // }
- }
- /**
- * 写消息
- */
- public void write() {
- try {
- mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData)));
- sendData = null;
- mState = READING;
- mKey.interestOps(SelectionKey.OP_READ);
- } catch (CharacterCodingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- try {
- mChannel.close();
- } catch (IOException e1) {
- Log.e(e1);
- }
- }
- }
- }
SessionManager:
将所有Session存放到ConcurrentHashMap,这里使用手机用户的imei做key,ConcurrentHashMap因为是线程安全的,所以能很大程度上避免自己去实现同步的过程,
封装了一些操作Session的方法例如get,remove等。
- public class SessionManager {
- private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>();
- public static void addSession(String key,Session session) {
- sessions.put(key, session);
- }
- public static Session getSession(String key) {
- return sessions.get(key);
- }
- public static Set<String> getSessionKeys() {
- return sessions.keySet();
- }
- public static int getSessionCount() {
- return sessions.size();
- }
- public static void remove(String[] keys) {
- for(String key:keys) {
- if(sessions.containsKey(key)) {
- sessions.get(key).distroy();
- sessions.remove(key);
- }
- }
- }
- public static void remove(String key) {
- if(sessions.containsKey(key)) {
- sessions.get(key).distroy();
- sessions.remove(key);
- }
- }
- }
SessionProcessor
里面使用了JDK自带的线程池,用来分发处理所有Session中当前需要处理的请求(线程池的初始化参数不是太熟,望有了解的童鞋能告诉我),内部类Process则是将Session再次封装成SocketRequest和SocketResponse(看到这里是不是有点熟悉的感觉,对没错,JavaWeb里到处都是request和response)。
- public class SessionProcessor implements Runnable{
- private static Runnable processor = new SessionProcessor();
- private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy());
- public static void start() {
- new Thread(processor).start();
- }
- @Override
- public void run() {
- while(true) {
- Session tmp = null;
- for(String key:SessionManager.getSessionKeys()) {
- tmp = SessionManager.getSession(key);
- //处理Session未处理的请求
- if(tmp.getReceiveData() != null) {
- pool.execute(new Process(tmp));
- }
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- Log.e(e);
- }
- }
- }
- class Process implements Runnable {
- private SocketRequest request;
- private SocketResponse response;
- public Process(Session session) {
- //将Session封装成Request和Response
- request = new SocketRequest(session);
- response = new SocketResponse(session);
- }
- @Override
- public void run() {
- new RequestTransform().transfer(request, response);
- }
- }
- }
RequestTransform里的transfer方法利用反射对请求参数中的请求类别和请求动作来调用不同类的不同方法(UserHandler和MessageHandler)
- public class RequestTransform {
- public void transfer(SocketRequest request,SocketResponse response) {
- String action = request.getValue(“action”);
- String handlerName = request.getValue(“handler”);
- //根据Session的请求类型,让不同的类方法去处理
- try {
- Class<?> c= Class.forName(“com.seek.server.handler.” + handlerName);
- Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class};
- Method method=c.getMethod(action,arg);
- method.invoke(c.newInstance(), new Object[]{request,response});
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
SocketRequest和SocketResponse
- public class SocketRequest {
- private Session mSession;
- private String mReceive;
- public SocketRequest(Session session) {
- mSession = session;
- mReceive = session.getReceiveData();
- mSession.clear();
- }
- public String getValue(String key) {
- return JsonParser.get(mReceive, key);
- }
- public String getQueryString() {
- return mReceive;
- }
- }
- public class SocketResponse {
- private Session mSession;
- public SocketResponse(Session session) {
- mSession = session;
- }
- public void write(String msg) {
- mSession.setSendData(msg);
- }
- }
最后则是两个处理请求的Handler
- public class UserHandler {
- public void login(SocketRequest request,SocketResponse response) {
- System.out.println(request.getQueryString());
- //TODO: 处理用户登录
- response.write(“你肯定收到消息了”);
- }
- }
- public class MessageHandler {
- public void send(SocketRequest request,SocketResponse response) {
- System.out.println(request.getQueryString());
- //消息发送
- String key = request.getValue(“imei”);
- Session session = SessionManager.getSession(key);
- new SocketResponse(session).write(request.getValue(“sms”));
- }
- }
还有个监测是否超时的类Looper,定期去删除Session
- public class Looper extends Thread{
- private static Looper looper = new Looper();
- private static boolean isStart = false;
- private final int INTERVAL = 1000 * 60 * 5;
- private Looper(){}
- public static Looper getInstance() {
- return looper;
- }
- public void loop() {
- if(!isStart) {
- isStart = true;
- this.start();
- }
- }
- public void run() {
- Task task = new Task();
- while(true) {
- //Session过期检测
- task.checkState();
- //心跳包检测
- //task.sendAck();
- try {
- Thread.sleep(INTERVAL);
- } catch (InterruptedException e) {
- Log.e(e);
- }
- }
- }
- }
- public class Task {
- public void checkState() {
- Set<String> keys = SessionManager.getSessionKeys();
- if(keys.size() == 0) {
- return;
- }
- List<String> removes = new ArrayList<String>();
- Iterator<String> iterator = keys.iterator();
- String key = null;
- while(iterator.hasNext()) {
- key = iterator.next();
- if(!SessionManager.getSession(key).isKeekAlive()) {
- removes.add(key);
- }
- }
- if(removes.size() > 0) {
- Log.i(“sessions is time out,remove “ + removes.size() + “session”);
- }
- SessionManager.remove(removes.toArray(new String[removes.size()]));
- }
- public void sendAck() {
- Set<String> keys = SessionManager.getSessionKeys();
- if(keys.size() == 0) {
- return;
- }
- Iterator<String> iterator = keys.iterator();
- while(iterator.hasNext()) {
- iterator.next();
- //TODO 发送心跳包
- }
- }
- }
注意,在Task和SessionProcessor类里都有对SessionManager的sessions做遍历,文中使用的方法并不是很好,主要是效率问题,推荐使用遍历Entry的方式来获取Key和Value,因为一直在JavaWeb上折腾,所以会的童鞋看到Request和Response会挺亲切,这个例子没有经过任何安全和性能测试,如果需要放到生产环境上得话请先自行做测试-
-!
客户端请求时的数据内容例如{handler:”UserHandler”,action:”login”,imei:”2364656512636″…….},这些约定就自己来定了。