`
uniseraph
  • 浏览: 82517 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

[mina指南]mina中Half Sync/Half Async模式

阅读更多
在解释Half Sync/Half Async模式之前,先介绍一个亲身经历的项目。曾经使用一个通讯支撑模块EMF,该模块完成了底层的socket通讯功能,和外部应用建立长连接,同时为上层应用提供一个回调接口如下:


public interface Hook {
          void call(Message msg);
}



上层应用可以根据自己业务逻辑的需要,实现该接口。


class MyHook implements Hook{
          public void callback(Message msg){
                    //应用的业务逻辑
          }

}

  然后应用可以将该实现注册到通讯支撑模块中。

//注册应用的回调实现
EmfHook hook = new MyHook();
EmfService.regist(hook);


在EMF的实现中负责将受到的socket数据拼装成应用需要的消息结构,然后在一定的匹配规则,找到一个应用的Hook,回调之(其实是多个hook,有可能多个应用对同一种消息感兴趣,简化之)。


//从socket上收到数据,
Byte[] buf=.....;

//将数据解析成应用的消息结构
Message msg=parse(buf);

//根据消息内容查找相应的EmfHook
EmfHook  hook = findHook(msg);

//调用应用的实现
hook.callback(msg);


一开始,这个模块运行的很好,性能不错,应用扩展也很方便,大家都很是满意,看来这个月的kpi一定不错,呵呵。然有一天,在做压力测试的时候发现,性能很差了,好久才能处理一条消息,数据吞吐量非常小,cpu却也不忙。这是怎么回事哪?


后来发现问题在于EMF的socket数据读取并解析工作和上层应用的hook操作是在一个线程中的,而某些应用的Hook实现有一些比较耗时操作,而所以导致在执行上层应用操作的时候,EMF并没有去进行I/O操作,读取数据,整个系统停在哪儿了。


最后稍作改动,将hook.callback(data)放在另外一个线程池中,这样EMF的线程和应用逻辑的线程就不再相互干扰,性能大大的提供。


从这个项目中得到的一个教训是:[size=small;]I/O密集操作的线程应该和业务逻辑的线程尽量分开[/size]。



再后来看POSA2,发现这不就是一个Half Sync/Half Async模式吗?

POSA2中写道
引用

半同步半异步(Half Sync/Half Async)体系结构模式将并发系统中的异步和同步服务分开,简化了编程,同时又没有降低性能。



通俗一点说,Half Sync/Half Async模式其实就是将异步请求排队到一个同步队列中,然后再从队列中取出请求处理,其实就是一个扩大的生产者/消费者问题。在socket编程的关键在于,将业务逻辑操作和底层的io操作分散到不同的线程中,避免业务逻辑操作可能导致整个线程的堵塞。


在mina中也存在Half Sync/Half Async模式,在默认情况下,mina的业务逻辑处理接口IoHandler的实现类就是在NioProcessor 的worker现在中执行的(NioProcessor的执行机制请参考)。如果业务逻辑接口堵塞或者耗时都将导致NioProcessor线程无法充分利用。


 

  ExecutorFilter

        IoAcceptor acceptor = new NioSocketAcceptor();
        
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
        acceptor.getFilterChain().addLast("executor", new ExecutorFilter());

        acceptor.setHandler(  new TimeServerHandler() );

        acceptor.getSessionConfig().setReadBufferSize( 2048 );
        acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        
        acceptor.bind( new InetSocketAddress(PORT) );
        


如上述代码,通过在FilterChain中增加一个ExecutorFilter,mina将NioProcessor的IO操作线程和TimeServerHandler的业务处理线程分开了。在上述例子中,当NioProcessor收到socket上的数据,filter和handler的执行顺序为:logger->codec->executor->TimeServerHandler(有关IoFilter内容具体请参考http://uniseraph.iteye.com/blog/228194)。


其中logger->codec就在NioProcessor的工作者线程worker中,而executor和TimeServerHandler则是在executorFilter的线程池中。具体参考代码如下:

 private ExecutorFilter(Executor executor, boolean createdExecutor, IoEventType... eventTypes) {
        if (executor == null) {
            throw new NullPointerException("executor");
        }/*这里将evenType初始化为所有事件类型的集合*/
        if (eventTypes == null || eventTypes.length == 0) {
            eventTypes = new IoEventType[] { IoEventType.EXCEPTION_CAUGHT,
                    IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT,
                    IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
                    IoEventType.SESSION_OPENED, };
        }
        //下面省略
    }
    public final void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) {
    /*executorFilter对该事件有效*/   if (eventTypes.contains(IoEventType.MESSAGE_RECEIVED)) {
            fireEvent(new IoFilterEvent(nextFilter,
                    IoEventType.MESSAGE_RECEIVED, session, message));
        } else {
            nextFilter.messageReceived(session, message);
        }
    } 
    protected void fireEvent(IoFilterEvent event) {
        getExecutor().execute(event);
    } 


由上可知,如果executorFilter对某个事件有效,那么将在线程池中执行该事件,如果无效则在原有的NioProcessor.Worker线程中执行下一个IoFilter或者IoHandler。


当然也可以不用ExecutorFilter,而是在IoHandler的实现类中放一个线程池,但是多数情况下是没有这样的必要。








分享到:
评论
1 楼 xiaopeng187 2014-10-23  
I/O密集操作的线程应该和业务逻辑的线程尽量分开,mina中是通过ExecutorFilter实现的,但是在项目中ExecutorFilter的线程池大小应该配置为多大才合适呢?

相关推荐

Global site tag (gtag.js) - Google Analytics