IoFilterChain位于通讯层与业务层之间,负责将byte[]转化成业务层需要的业务逻辑bean,在mina框架中起着承前启后的作用。
DefaultIoFilterChain的构建
在初始话的时候,DefaultIoFilterChain的构造函数如下:
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new NullPointerException("session");
}
this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}
整体结构图:
IoFilterChain初始化的典型代码
IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));
acceptor.setHandler( new TimeServerHandler() );
初始化以后的结构图为
Message received流程
1. NioProcessor的工作者线程一旦发现有数据来则分别处理各个Session上的数据;
public void run() {
int nSessions = 0;
lastIdleCheckTime = System.currentTimeMillis();
for (;;) {
try {
boolean selected = select(1000);
nSessions += add();
updateTrafficMask();
if (selected) {
process();
}
//以下省略
private void process() throws Exception {
for (Iterator<T> i = selectedSessions(); i.hasNext();) {
process(i.next());
i.remove();
}
}
private void process(T session) {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
}
if (isWritable(session) && session.getTrafficMask().isWritable()) {
scheduleFlush(session);
}
}
2. 读取session上的所有数据,传给session的IoFilterChain
private void read(T session) {
//读取socket的数据,具体省略
if (readBytes > 0) {
session.getFilterChain().fireMessageReceived(buf);
buf = null;
if (hasFragmentation) {
if (readBytes << 1 < config.getReadBufferSize()) {
session.decreaseReadBufferSize();
} else if (readBytes == config.getReadBufferSize()) {
session.increaseReadBufferSize();
}
}
}
if (ret < 0) {
scheduleRemove(session);
}
} catch (Throwable e) {
if (e instanceof IOException) {
scheduleRemove(session);
}
session.getFilterChain().fireExceptionCaught(e);
}
}
3. DefaultIoFilterChain调用HeadFilter的
public void fireMessageReceived(Object message) {
if (message instanceof IoBuffer) {
session.increaseReadBytes(
((IoBuffer) message).remaining(),
System.currentTimeMillis());
}
Entry head = this.head;
callNextMessageReceived(head, session, message);
}
private void callNextMessageReceived(
Entry entry, IoSession session, Object message) {
try {
entry.getFilter().messageReceived(
entry.getNextFilter(), session, message);
} catch (Throwable e) {
fireExceptionCaught(e);
}
}
4. 如上图head.getFilter()得到的是HeadFilter,注意EntryImpl的构造函数
public DefaultIoFilterChain(AbstractIoSession session) {
if (session == null) {
throw new NullPointerException("session");
}
this.session = session;
head = new EntryImpl(null, null, "head", new HeadFilter());
tail = new EntryImpl(head, null, "tail", new TailFilter());
head.nextEntry = tail;
}
private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry,
String name, IoFilter filter) {
if (filter == null) {
throw new NullPointerException("filter");
}
if (name == null) {
throw new NullPointerException("name");
}
this.prevEntry = prevEntry;
this.nextEntry = nextEntry;
this.name = name;
this.filter = filter;
this.nextFilter = new NextFilter() {
public void sessionCreated(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionCreated(nextEntry, session);
}
public void sessionOpened(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionOpened(nextEntry, session);
}
public void sessionClosed(IoSession session) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionClosed(nextEntry, session);
}
public void sessionIdle(IoSession session, IdleStatus status) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextSessionIdle(nextEntry, session, status);
}
public void exceptionCaught(IoSession session, Throwable cause) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextExceptionCaught(nextEntry, session, cause);
}
public void messageReceived(IoSession session, Object message) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageReceived(nextEntry, session, message);
}
public void messageSent(IoSession session,
WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.nextEntry;
callNextMessageSent(nextEntry, session, writeRequest);
}
public void filterWrite(IoSession session,
WriteRequest writeRequest) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterWrite(nextEntry, session, writeRequest);
}
public void filterClose(IoSession session) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterClose(nextEntry, session);
}
public void filterSetTrafficMask(IoSession session,
TrafficMask trafficMask) {
Entry nextEntry = EntryImpl.this.prevEntry;
callPreviousFilterSetTrafficMask(nextEntry, session, trafficMask);
}
};
}
5. HeadFilter除了调用LogingFilter.MessageRecevied什么都不做
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) {
nextFilter.messageReceived(session, message);
}
6. LoggingFilter和ProtocolCodecFilter都是在执行自己的逻辑之后调用下一个IoFilter
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
log(IoEventType.MESSAGE_RECEIVED, "RECEIVED: {}", message);
nextFilter.messageReceived(session, message);
}
7. 在TailFilter的MessageReceved则将消息转发给Session的handler,完成传递
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
if (!(message instanceof IoBuffer)) {
s.increaseReadMessages(System.currentTimeMillis());
} else if (!((IoBuffer) message).hasRemaining()) {
s.increaseReadMessages(System.currentTimeMillis());
}
try {
session.getHandler().messageReceived(s, message);
} finally {
if (s.getConfig().isUseReadOperation()) {
s.offerReadFuture(message);
}
}
}
IoFilter事件执行
IoFilter一共处理10种事件
1 created
2 opened
3 closed
4 sessionIdle
5 exceptionCaught
6 messageReceived
7 messageSent
8 fireWrite
9 fireFilterClose
10 filterSetTrafficMask
其中1-7种事件都是从前往后执行,依次为:
HeadFilter->filter1->filter2->...->filterN->TailFilter->IoHandler
而8-10正好相反,如fireWrite的执行顺序为:
iosession.write->TailFilter->filterN->....-->filter1->HeadFilter
- 大小: 116.1 KB
- 大小: 86 KB
- 大小: 96 KB
分享到:
相关推荐
mina 通讯 实现server端与基于Android系统的client端通讯
Mina框架研究与实现 Mina框架研究与实现
基于Mina的网络通讯,分为服务端和客户端。 研究selector NIO实现时,发现了这个架构。...Mina的底层实现实际就是selector和SocketChannel。所以如果对Mina源码感兴趣的可以先去看下selector相关的例子。
公司需求,做的简单的Demo,可以拓展,Mina自定义协议简单实现,象征性得收取2积分
我自己写的使用mina框架实现cmpp2.0服务端,经过一段使用解决了几个bug现在比较稳定。
本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...
实现了mina 的简单通信通信,内部配置了累积协议编解码器、工具类和客户端与服务端的端口配置。能够实现基本功能,下载完成需要四个基本jar包才能实现功能。我的博客上有相应资源支持下载。
工作中的一个小项目,分享给大家参考,望大家不吝批评指教,本人常年从事JAVA软件开发,有丰富的MINA通信软件开发经验,现在已经有成熟的底层框架(结合了反射、DynaBean、Spring等多种技术),可以实现程序自动对...
本源码是《NIO框架入门(三):iOS与MINA2、Netty4的跨平台UDP双向通信实战》一文的服务端实现(MINA2版),详见:http://www.52im.net/thread-378-1-1.html
mina服务器--实现纯文本和非纯文本的加密通讯
Apache_MINA_2_用户指南.pdf
MINA入门实例,实现长连接,短连接通讯。
Apache MINA 2.0 用户指南
MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...
mina服务器和客服端实现,包含编解码,等一系列的代码。
mina通信基本代码实现和学习文件,里面包含学习的文件和基本代码,
通过Mina与Socket实现通信,其包含客户端与服务端的实现代码
mina实现简单的登录功能,详细见博客:http://blog.csdn.net/guozeming122/article/details/18605937
Mina长连接框架实现Android客户端与服务器端通信
mina2.0用户指南,mina_2.0_user_guide_cn.pdf