`
uniseraph
  • 浏览: 82515 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

[mina指南]诡异的IoFilterChain实现

阅读更多
IoFilterChain位于通讯层与业务层之间,负责将byte[]转化成业务层需要的业务逻辑bean,在mina框架中起着承前启后的作用。




DefaultIoFilterChain的构建
在初始话的时候,DefaultIoFilterChain的构造函数如下:
    public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new NullPointerException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }


整体结构图:


IoFilterChain初始化的典型代码
        IoAcceptor acceptor = new NioSocketAcceptor();
        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
   
        acceptor.setHandler(  new TimeServerHandler() );


初始化以后的结构图为


Message received流程
1. NioProcessor的工作者线程一旦发现有数据来则分别处理各个Session上的数据;
public void run() {
            int nSessions = 0;
            lastIdleCheckTime = System.currentTimeMillis();

            for (;;) {
                try {
                    boolean selected = select(1000);

                    nSessions += add();
                    updateTrafficMask();

                    if (selected) {
                        process();
                    }

          //以下省略



    private void process() throws Exception {
        for (Iterator<T> i = selectedSessions(); i.hasNext();) {
            process(i.next());
            i.remove();
        }
    }
    private void process(T session) {

        if (isReadable(session) && session.getTrafficMask().isReadable()) {
            read(session);
        }

        if (isWritable(session) && session.getTrafficMask().isWritable()) {
            scheduleFlush(session);
        }
    }

2. 读取session上的所有数据,传给session的IoFilterChain
 private void read(T session) {
            //读取socket的数据,具体省略

            if (readBytes > 0) {


                session.getFilterChain().fireMessageReceived(buf);
          

                buf = null;

                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            session.getFilterChain().fireExceptionCaught(e);
        }
    }


3. DefaultIoFilterChain调用HeadFilter的
    public void fireMessageReceived(Object message) {
        if (message instanceof IoBuffer) {
            session.increaseReadBytes(
                    ((IoBuffer) message).remaining(),
                    System.currentTimeMillis());
        }

        Entry head = this.head;
        callNextMessageReceived(head, session, message);
    }

    private void callNextMessageReceived(
            Entry entry, IoSession session, Object message) {
        try {
            entry.getFilter().messageReceived(
                    entry.getNextFilter(), session, message);
        } catch (Throwable e) {
            fireExceptionCaught(e);
        }
    }


4. 如上图head.getFilter()得到的是HeadFilter,注意EntryImpl的构造函数
  public DefaultIoFilterChain(AbstractIoSession session) {
        if (session == null) {
            throw new NullPointerException("session");
        }

        this.session = session;
        head = new EntryImpl(null, null, "head", new HeadFilter());
        tail = new EntryImpl(head, null, "tail", new TailFilter());
        head.nextEntry = tail;
    }

      private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
                String name, IoFilter filter) {
            if (filter == null) {
                throw new NullPointerException("filter");
            }
            if (name == null) {
                throw new NullPointerException("name");
            }

            this.prevEntry = prevEntry;
            this.nextEntry = nextEntry;
            this.name = name;
            this.filter = filter;
            this.nextFilter = new NextFilter() {
                public void sessionCreated(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionCreated(nextEntry, session);
                }

                public void sessionOpened(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionOpened(nextEntry, session);
                }

                public void sessionClosed(IoSession session) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionClosed(nextEntry, session);
                }

                public void sessionIdle(IoSession session, IdleStatus status) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextSessionIdle(nextEntry, session, status);
                }

                public void exceptionCaught(IoSession session, Throwable cause) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextExceptionCaught(nextEntry, session, cause);
                }

                public void messageReceived(IoSession session, Object message) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageReceived(nextEntry, session, message);
                }

                public void messageSent(IoSession session,
                        WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.nextEntry;
                    callNextMessageSent(nextEntry, session, writeRequest);
                }

                public void filterWrite(IoSession session,
                        WriteRequest writeRequest) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterWrite(nextEntry, session, writeRequest);
                }

                public void filterClose(IoSession session) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterClose(nextEntry, session);
                }

                public void filterSetTrafficMask(IoSession session,
                        TrafficMask trafficMask) {
                    Entry nextEntry = EntryImpl.this.prevEntry;
                    callPreviousFilterSetTrafficMask(nextEntry, session, trafficMask);
                }
            };
        }





5. HeadFilter除了调用LogingFilter.MessageRecevied什么都不做
        
        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) {
            nextFilter.messageReceived(session, message);
        }


6. LoggingFilter和ProtocolCodecFilter都是在执行自己的逻辑之后调用下一个IoFilter
    public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        log(IoEventType.MESSAGE_RECEIVED, "RECEIVED: {}", message);
        nextFilter.messageReceived(session, message);
    }



7. 在TailFilter的MessageReceved则将消息转发给Session的handler,完成传递
        @Override
        public void messageReceived(NextFilter nextFilter, IoSession session,
                Object message) throws Exception {
            AbstractIoSession s = (AbstractIoSession) session;
            if (!(message instanceof IoBuffer)) {
                s.increaseReadMessages(System.currentTimeMillis());
            } else if (!((IoBuffer) message).hasRemaining()) {
                s.increaseReadMessages(System.currentTimeMillis());
            }

            try {
                session.getHandler().messageReceived(s, message);
            } finally {
                if (s.getConfig().isUseReadOperation()) {
                    s.offerReadFuture(message);
                }
            }
        }


IoFilter事件执行


IoFilter一共处理10种事件
1 created
2 opened
3 closed
4 sessionIdle
5 exceptionCaught
6 messageReceived
7 messageSent
8 fireWrite
9 fireFilterClose
10 filterSetTrafficMask

其中1-7种事件都是从前往后执行,依次为:
HeadFilter->filter1->filter2->...->filterN->TailFilter->IoHandler

而8-10正好相反,如fireWrite的执行顺序为:
iosession.write->TailFilter->filterN->....-->filter1->HeadFilter







  • 大小: 116.1 KB
  • 大小: 86 KB
  • 大小: 96 KB
分享到:
评论
3 楼 uniseraph 2011-11-01  
ubuntu dia
2 楼 thomescai 2011-10-14  
请问,你这个类图用什么工具画得?
1 楼 nextw3 2011-03-03  
MINA为什么要采用这种过滤链结构那?请赐教

相关推荐

Global site tag (gtag.js) - Google Analytics