`
jinyanhui2008
  • 浏览: 311608 次
  • 性别: Icon_minigender_1
  • 来自: 青岛
社区版块
存档分类
最新评论

常见NIO开源框架(MINA、xSocket)学习 (转自javaeye博客)

    博客分类:
  • Java
阅读更多

http://unbounder.iteye.com/blog/481396

http://unbounder.iteye.com/blog/481668

 

 

基于io包的阻塞式socket通信代码简单,在连接数很少的情况下是一个不错的选择。不过实际应用中一个socket服务器采用传统的阻塞式socket方式通信可能会是一场灾难,一路socket同时进行读写操作可能就需要两条线程,如果需要并发一百路socket(这个量其实很小了),可能就是两百条线程,大概几分钟后cpu占用率就是高居不下了。 

基于原生nio的socket通信时一种很好的解决方案,基于事件的通知模式使得多并发时不用维持高数量的线程,高并发的socket服务器的java实现成为现实。不过原生nio代码十分复杂,无论编写还是修改都是一件头疼的事。“屏蔽底层的繁琐工作,让程序员将注意力集中于业务逻辑本身”,有需求就有生产力进步,于是各式各样的nio框架涌现而出,而笔者使用到的是其中最常见的两种:xSocket和MINA。 

1 xsocket框架 

官网:http://xsocket.sourceforge.net/ 

xSocket是一套非常简洁的nio框架,利用这套框架,你可以在完全不了解nio的情况下设计出高并发的socket服务器。 
server端代码: 

Java代码  收藏代码
  1. public class ProjectServer extends Thread {  
  2.       
  3.     private static final int PORT=9099;  
  4.       
  5.     public void run() {  
  6.         IServer srv = null;  
  7.         try {  
  8.             //建立handler  
  9.             srv = new Server(PORT, new ProjectHandle());  
  10.         } catch (UnknownHostException e) {  
  11.             // TODO Auto-generated catch block  
  12.             e.printStackTrace();  
  13.         } catch (IOException e) {  
  14.             // TODO Auto-generated catch block  
  15.             e.printStackTrace();  
  16.         }  
  17.         //服务器运行  
  18.         srv.run();  
  19.         System.out.println("The ProjectServer start on port: "+PORT);  
  20.     }  
  21.       
  22.     public static void main(String[] args) {  
  23.         ProjectServer projectServer = new ProjectServer();  
  24.         projectServer.start();  
  25.     }  
  26. }  

handler代码 
Java代码  收藏代码
  1. public class ProjectHandle implements IDataHandler, IConnectHandler,  
  2.         IDisconnectHandler {  
  3.     /* 处理连接建立事件 */  
  4.     @Override  
  5.     public boolean onConnect(INonBlockingConnection nbc) throws IOException,  
  6.             BufferUnderflowException, MaxReadSizeExceededException {  
  7.         // TODO Auto-generated method stub  
  8.         System.out.println(nbc.getId() + "is connect!");  
  9.         return true;  
  10.     }  
  11.   
  12.     /* 处理连接断开事件 */  
  13.     @Override  
  14.     public boolean onDisconnect(INonBlockingConnection nbc) throws IOException {  
  15.         // TODO Auto-generated method stub  
  16.         System.out.println(nbc.getId() + "is disconnect!");  
  17.         return true;  
  18.     }  
  19.   
  20.     /* 处理接受数据事件 */  
  21.     @Override  
  22.     public boolean onData(INonBlockingConnection nbc) throws IOException,  
  23.             BufferUnderflowException, ClosedChannelException,  
  24.             MaxReadSizeExceededException {  
  25.         // TODO Auto-generated method stub  
  26.         String str = nbc.readStringByDelimiter("\0");  
  27.         System.out.println(str);  
  28.         return true;  
  29.     }  
  30. }  

这里我们以"\0"为间隔读取数据,xSocket还提供按照长度读取以及全部读取到一个ByteBuffer几种读取数据的方式,如果传递过程中采取byte数组格式,在接受数据时还可以使用xSocket自带的工具类DataConverter进行转化。譬如接收数据时如此操作: 
Java代码  收藏代码
  1. ByteBuffer copyBuffer = ByteBuffer.allocate(20000);  
  2. nbc.read(copyBuffer);  
  3. copyBuffer.flip();  
  4. String str = DataConverter.toString(copyBuffer, "utf-8");  


就可以将二进制格式的传输内容还原为原始字符串了。 

需要说明的是虽然xSocket代码简单、开发快捷,但是由于太“上层”了,所以很多地方不利于coder自己控制。譬如socket通信中一个很常见的“半包连包”问题用xsocket处理起来就会很麻烦,而且如果传输协议是经过特殊设计的,xsocket也无法像mina那样自己编写解码器。当然如果你不是特别计较这些方面,那么非常适于上手的xSocket就是适合你的nio框架。 

 

 

3 MINA 
项目主页:http://mina.apache.org/ 

闲话不说,上代码 

Java代码  收藏代码
  1. public class Server extends Thread {  
  2.     private static final int PORT = 23;  
  3.   
  4.     public void run() {  
  5.         IoAcceptor acceptor = new NioSocketAcceptor();  
  6.         acceptor.setHandler(new TestHandler());  
  7.         acceptor.getFilterChain().addLast("codec",  
  8.                 new ProtocolCodecFilter(new TextLineCodecFactory()));  
  9.         acceptor.getSessionConfig().setReadBufferSize(2048);  
  10.         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  
  11.         try {  
  12.             acceptor.bind(new InetSocketAddress(PORT));  
  13.         } catch (IOException e) {  
  14.             // TODO Auto-generated catch block  
  15.             e.printStackTrace();  
  16.         }  
  17.         System.out.println("The ProjectServer start on port: " + PORT);  
  18.     }  
  19.   
  20.     public static void main(String[] args) {  
  21.         Server server = new Server();  
  22.         server.start();  
  23.     }  
  24. }  


Java代码  收藏代码
  1. public class TestHandler extends IoHandlerAdapter {  
  2.   
  3.     @Override  
  4.     public void exceptionCaught(IoSession session, Throwable cause)  
  5.             throws Exception {  
  6.         // TODO Auto-generated method stub  
  7.         session.close(true);  
  8.     }  
  9.   
  10.     @Override  
  11.     public void sessionCreated(IoSession session) throws Exception {  
  12.         // TODO Auto-generated method stub  
  13.         System.out.println("the new session is connecting");  
  14.     }  
  15.   
  16.     @Override  
  17.     public void messageReceived(IoSession session, Object message)  
  18.             throws Exception {  
  19.         // TODO Auto-generated method stub  
  20.         String str = message.toString();  
  21.         System.out.println(str);  
  22.     }  
  23. }  

最简单的一个mina服务器实现,telnet就可以看到效果。 
稍微解释一下 
mina的实现主要在于给IoAcceptor增加过滤器 
new ProtocolCodecFilter(new TextLineCodecFactory()):这是一个解码器,可以自己实现ProtocolCodecFactory接口编写特定的解码器。不过注意一下,解码器必须和编码器同时使用,服务器端实现特定解码器的同时需要客户端应用特定编码。 
常用过滤器还有日志LoggingFilter()等,具体可以查看api。 

这里笔者想说的是对于一般的socket服务器,可能客户端并不会使用mina,譬如j2me或者干脆是一个c++的socket请求,此时我们上面的demo将毫无作用,具体来说就是解码过滤器失效。在实际应用中,我们一般是这样的处理的: 

Java代码  收藏代码
  1. public class Server extends Thread {  
  2.     private static final int PORT = 11001;  
  3.   
  4.     public void run() {  
  5.         IoAcceptor acceptor = new NioSocketAcceptor();  
  6.         acceptor.setHandler(new TestHandler());  
  7.             acceptor.getFilterChain().addLast("ddd"new StreamWriteFilter());  
  8.         acceptor.getSessionConfig().setReadBufferSize(2048);  
  9.         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);  
  10.         try {  
  11.             acceptor.bind(new InetSocketAddress(PORT));  
  12.         } catch (IOException e) {  
  13.             // TODO Auto-generated catch block  
  14.             e.printStackTrace();  
  15.         }  
  16.         System.out.println("The ProjectServer start on port: " + PORT);  
  17.     }  
  18.   
  19.     public static void main(String[] args) {  
  20.         Server server = new Server();  
  21.         server.start();  
  22.     }  
  23. }  

注意我们并没有再使用acceptor.getFilterChain().addLast("codec",new ProtocolCodecFilter(new TextLineCodecFactory())); 
而是acceptor.getFilterChain().addLast("ddd", new StreamWriteFilter()); 
这个过滤器是直接对写入流操作,即原始的数据流 

handler端代码也要修改 
Java代码  收藏代码
  1. public class TestHandler extends IoHandlerAdapter {  
  2.   
  3.     @Override  
  4.     public void exceptionCaught(IoSession session, Throwable cause)  
  5.             throws Exception {  
  6.         // TODO Auto-generated method stub  
  7.         session.close(true);  
  8.     }  
  9.   
  10.     @Override  
  11.     public void sessionCreated(IoSession session) throws Exception {  
  12.         // TODO Auto-generated method stub  
  13.         System.out.println("the new session is connecting");  
  14.     }  
  15.   
  16.     @Override  
  17.     public void messageReceived(IoSession session, Object message)  
  18.             throws Exception {  
  19.         // TODO Auto-generated method stub  
  20.   
  21.         IoBuffer buffer=(IoBuffer)message;  
  22.         ByteBuffer bf= buffer.buf();  
  23.         byte[] tempBuffer=new byte[bf.limit()];  
  24.         bf.get(tempBuffer);  
  25.         String str=new String(tempBuffer);  
  26.         System.out.println(str);  
  27.     }  
  28. }  

注意这段 
Java代码  收藏代码
  1. IoBuffer buffer=(IoBuffer)message;  
  2. ByteBuffer bf= buffer.buf();  
  3. byte[] tempBuffer=new byte[bf.limit()];  
  4. bf.get(tempBuffer);  
  5. String str=new String(tempBuffer);  
  6. System.out.println(str);  

将原始的数据流还原为字符串,如果传输协议不是字符串而是byte数组,就直接对tempBuffer操作即可。 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics