`

mina

 
阅读更多

Mina之session

http://www.cnblogs.com/ggzwtj/archive/2011/10/14/2212095.html

 

1、IoSession与底层的传输层类型无关,表示通信双端的连接。提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关信息。每个会话都由一个Service来提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法就是read和write,这两个方法都是异步执行,如要真正完成必须在其结果上进行等待。关闭会话的方法close也是异步执行的,也就是应等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。

  一个会话主要包括两方面的数据:属性映射图和写请求队列,这里使用工厂模式来为新创建的会话提供这些数据结构,定义如下:

public interface IoSessionDataStructureFactory {
    //返回属性
    IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
    //返回写请求队列
    WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}

2、IoSessionConfig表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量的时间间隔,指定会话段的空闲时间,写请求操作超时时间等。这个里面有两个方法需要注意,如下:

复制代码
/*
 * 只有在IoSession的read方法可用的时候返回true。 如果可用,受到的消息
 * 保存在BlockingQueue这样对那个客户端应用程序来说更方便取到消息。开
 * 启这个对服务器没什么好处,并且可能造成内存漏洞,默认是不开启的。
 */
boolean isUseReadOperation();
/* 
 * 打开或关闭IoSession的read方法。
 */
void setUseReadOperation(boolean useReadOperation);
复制代码

3、IoSessionInitializer定义了一个回调函数,用于把用户自定义的会话初始化行为剥离出来:

public interface IoSessionInitializer<T extends IoFuture> {
    void initializeSession(IoSession session, T future);
}

4、IoSessionRecycler为一个无连接的传输服务提供回收现有会话的服务:

复制代码
public interface IoSessionRecycler {
    /**
     * 一个虚假的recycler(并不回收任何session)。但是用这个可以使得所有session
     * 的生命周期事件被fired
     */
    static IoSessionRecycler NOOP = new IoSessionRecycler() {
        public void put(IoSession session) {}
        public IoSession recycle(SocketAddress localAddress,SocketAddress remoteAddress) {
            return null;
        }
        public void remove(IoSession session) {}
    };
    /* 
     * 创建或写Iossion的时候被调用。
     */
    void put(IoSession session);
    /*
     * 尝试获取一个被回收了的IoSession。
     */
    IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
    /*
     * 会话被关闭的时候调用。
     */
    void remove(IoSession session);
}
复制代码

ExpiringSessionRecycler是IoSessionRecycler的一个实现,用来回收超时失效的会话:

private ExpiringMap<Object, IoSession> sessionMap;//待处理的会话
private ExpiringMap<Object, IoSession>.Expirer mapExpirer;//负责具体的回收工作

下面来看Key是什么样的:

    private Object generateKey(SocketAddress localAddress,SocketAddress remoteAddress) {
        List<SocketAddress> key = new ArrayList<SocketAddress>(2);
        key.add(remoteAddress);
        key.add(localAddress);
        return key;
    }

ExpiringMap中保存了超过限制的对象和该对象的监听器,如下:

复制代码
public class ExpiringMap<K, V> implements Map<K, V> {
    public static final int DEFAULT_TIME_TO_LIVE = 60;
    public static final int DEFAULT_EXPIRATION_INTERVAL = 1;
    private static volatile int expirerCount = 1;
    private final ConcurrentHashMap<K, ExpiringObject> delegate;
    private final CopyOnWriteArrayList<ExpirationListener<V>> expirationListeners;
    private final Expirer expirer;
}
复制代码

其中的ExpiringObject表示一个超过限制的对象,是ExpiringMap的一个内部类,如下:

private class ExpiringObject {
    private K key;
    private V value;
    private long lastAccessTime;//上次访问时间
    private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock();
}

mapExpirer用来移除sessionMap上超过临界值的项,关键代码如下:

复制代码
private void processExpires() {
    long timeNow = System.currentTimeMillis();//当前时间
    for (ExpiringObject o : delegate.values()) {
        if (timeToLiveMillis <= 0) {
            continue;
        }
        long timeIdle = timeNow - o.getLastAccessTime();
        if (timeIdle >= timeToLiveMillis) {
            delegate.remove(o.getKey());//移除
            for (ExpirationListener<V> listener : expirationListeners) {
                listener.expired(o.getValue());//终止监听?
            }
        }
    }
}
复制代码

启动关闭该县城都需要进行封锁机制。

5、Mina中的I/O事件类型如下:

复制代码
public enum IoEventType {
    SESSION_CREATED,
    SESSION_OPENED,
    SESSION_CLOSED,
    MESSAGE_RECEIVED,
    MESSAGE_SENT,
    SESSION_IDLE,
    EXCEPTION_CAUGHT,
    WRITE,
    CLOSE,
}
复制代码

IoEvent表示一个I/O事件或者一个I/O请求,包括时间类型、所属会话、时间参数:

复制代码
public class IoEvent implements Runnable {
    private final IoEventType type;
    private final IoSession session;
    private final Object parameter;
    public IoEvent(IoEventType type, IoSession session, Object parameter) {    
        //...
    }
    //根据事件类型向会话的过滤链上的众多监听者发出事件到来的信号。
    public void fire() {    
        switch (getType()) {    
        case MESSAGE_RECEIVED:    
            getSession().getFilterChain().fireMessageReceived(getParameter());break;
        case MESSAGE_SENT:
            getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());break;
        case WRITE:
            getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());break;
        case CLOSE:
            getSession().getFilterChain().fireFilterClose();break;
        case EXCEPTION_CAUGHT:
            getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());break;
        case SESSION_IDLE:
            getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());break;
        case SESSION_OPENED:
            getSession().getFilterChain().fireSessionOpened();
            break;
        case SESSION_CREATED:
            getSession().getFilterChain().fireSessionCreated();
            break;
        case SESSION_CLOSED:
            getSession().getFilterChain().fireSessionClosed();
            break;
        default:
            throw new IllegalArgumentException("Unknown event type: " + getType());
        }
    }    
    public String toString() {
        if (getParameter() == null) {
            return "[" + getSession() + "] " + getType().name();
        }
        return "[" + getSession() + "] " + getType().name() + ": "+ getParameter();    
    }
}
复制代码

Mina的会话中,有三种类型的闲置状态:READER_IDLE读端空闲、WRITER_IDLE写端空闲、BOTH_IDLE读写都空闲。为了节省会话资源可以让用户设置当空闲超过一定时间后关闭会话,因为此会话可能在一段出现问题,从而导致另一端空闲超过太长时间。
6、DefaultIoSessionDataStructureFactory是IoSessionDataStructureFactory的一个默认的实现:

复制代码
public class DefaultIoSessionDataStructureFactory implements IoSessionDataStructureFactory {
    public IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception {
        return new DefaultIoSessionAttributeMap();
    }    
    public WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception {
        return new DefaultWriteRequestQueue();
    }
    private static class DefaultIoSessionAttributeMap implements IoSessionAttributeMap {
        private final Map<Object, Object> attributes = Collections.synchronizedMap(new HashMap<Object, Object>(4));
        public DefaultIoSessionAttributeMap() {
            super();
        }        
        public Object getAttribute(IoSession session, Object key, Object defaultValue) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            Object answer = attributes.get(key);
            if (answer == null) {
                return defaultValue;
            }            
            return answer;
        }

        public Object setAttribute(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return attributes.remove(key);
            }            
            return attributes.put(key, value);
        }
        public Object setAttributeIfAbsent(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return null;
            }

            Object oldValue;
            synchronized (attributes) {
                oldValue = attributes.get(key);
                if (oldValue == null) {
                    attributes.put(key, value);
                }
            }
            return oldValue;
        }
        public Object removeAttribute(IoSession session, Object key) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            return attributes.remove(key);
        }
        public boolean removeAttribute(IoSession session, Object key, Object value) {
            if (key == null) {
                throw new NullPointerException("key");
            }
            if (value == null) {
                return false;
            }
            synchronized (attributes) {
                if (value.equals(attributes.get(key))) {
                    attributes.remove(key);
                    return true;
                }
            }
            return false;
        }
        public boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue) {
            synchronized (attributes) {
                Object actualOldValue = attributes.get(key);
                if (actualOldValue == null) {
                    return false;
                }
                if (actualOldValue.equals(oldValue)) {
                    attributes.put(key, newValue);
                    return true;
                }                
                return false;
            }
        }
        public boolean containsAttribute(IoSession session, Object key) {
            return attributes.containsKey(key);
        }
        public Set<Object> getAttributeKeys(IoSession session) {
            synchronized (attributes) {
                return new HashSet<Object>(attributes.keySet());
            }
        }
        public void dispose(IoSession session) throws Exception {}
    }    
    private static class DefaultWriteRequestQueue implements WriteRequestQueue {
        //一个队列存储传入的写请求
        private final Queue<WriteRequest> q = new CircularQueue<WriteRequest>(16);
        public DefaultWriteRequestQueue() {
            super();
        }
        //...
    }
}
复制代码

7、AbstractIoSession是IoSession的一个抽象实现类,如下:

private IoSessionAttributeMap attributes;//会话属性
private WriteRequestQueue writeRequestQueue;//写请求队列
private WriteRequest currentWriteRequest;//当前写请求

下面是关闭的时候涉及的成员:

复制代码
//要结束当前会话时发送写请求CLOSE_REQUEST
private static final WriteRequest CLOSE_REQUEST =
 new DefaultWriteRequest(new Object());
//在连接关闭时状态被设置为closed
private final CloseFuture closeFuture = new DefaultCloseFuture(this);
//关闭的监听器
private static final IoFutureListener<CloseFuture> SCHEDULED_COUNTER_RESETTER =
    new IoFutureListener<CloseFuture>() {
        public void operationComplete(CloseFuture future) {
            AbstractIoSession session = (AbstractIoSession) future.getSession();
            session.scheduledWriteBytes.set(0);
            session.scheduledWriteMessages.set(0);
            session.readBytesThroughput = 0;
            session.readMessagesThroughput = 0;
            session.writtenBytesThroughput = 0;
            session.writtenMessagesThroughput = 0;
        }
    };
复制代码

close和closeOnFlush都是异步操作的,区别是前者立即关闭连接,后者是在写请求队列中放入一个CLOSE_REQUEST并将其即时刷新储蓄,若要真正等到关闭完成,需要调用方法在返回的CloseFuture等待。下面是close的代码:

复制代码
    public final CloseFuture close() {
        synchronized (lock) {
            if (isClosing()) {
                return closeFuture;
            }
            
            closing = true;
        }
        getFilterChain().fireFilterClose();//fire出关闭事件
        return closeFuture;
    }
复制代码

下面是closeOnFlush的代码:

    private final CloseFuture closeOnFlush() {
        getWriteRequestQueue().offer(this, CLOSE_REQUEST);
        getProcessor().flush(this);
        return closeFuture;
    }

对于读的情况,下面是取得读的数据队列:

复制代码
    //返回可被读取数据队列
    private Queue<ReadFuture> getReadyReadFutures() {
        Queue<ReadFuture> readyReadFutures = (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES_KEY);
        //如果是第一次读取数据
        if (readyReadFutures == null) {
            //构造一个新的读数据队列
            readyReadFutures = new CircularQueue<ReadFuture>();
            Queue<ReadFuture> oldReadyReadFutures = (Queue<ReadFuture>) setAttributeIfAbsent(READY_READ_FUTURES_KEY, readyReadFutures);
            if (oldReadyReadFutures != null) {
                readyReadFutures = oldReadyReadFutures;
            }
        }
        return readyReadFutures;
    }
复制代码

读数据的过程:

复制代码
    public final ReadFuture read() {
        //配置不允许读数据
        if (!getConfig().isUseReadOperation()) {
            throw new IllegalStateException("useReadOperation is not enabled.");
        }
        //获得已经被许可的可被读数据队列
        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
        ReadFuture future;
        synchronized (readyReadFutures) {
            future = readyReadFutures.poll();
            if (future != null) {
                //如果关联的会话已关闭,通知读者
                if (future.isClosed()) {
                    readyReadFutures.offer(future);
                }
            } else {
                future = new DefaultReadFuture(this);
                //将数据出入等待读的队列
                getWaitingReadFutures().offer(future);
            }
        }
        return future;
    }
复制代码

写数据的过程:

复制代码
    //IoBuffer、文件、文件部分区域
    public WriteFuture write(Object message, SocketAddress remoteAddress) {
        if (message == null) {
            throw new NullPointerException("message");
        }
        //如果没有远端地址
        if (!getTransportMetadata().isConnectionless() && remoteAddress != null) {
            throw new UnsupportedOperationException();
        }
        //如果会话已被关闭
        if (isClosing() || !isConnected()) {
            WriteFuture future = new DefaultWriteFuture(this);
            WriteRequest request = new DefaultWriteRequest(message, future, remoteAddress);
            WriteException writeException = new WriteToClosedSessionException(request);
            future.setException(writeException);
            return future;
        }
        FileChannel openedFileChannel = null;
        try {            
            if (message instanceof IoBuffer && !((IoBuffer) message).hasRemaining()) {//如果是空消息
                throw new IllegalArgumentException("message is empty. Forgot to call flip()?");
            } else if (message instanceof FileChannel) {//文件的某一区域                
                FileChannel fileChannel = (FileChannel) message;
                message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());
            } else if (message instanceof File) {//要发送的是文件
                File file = (File) message;
                openedFileChannel = new FileInputStream(file).getChannel();//打开文件通道 
                message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());
            }
        } catch (IOException e) {
            ExceptionMonitor.getInstance().exceptionCaught(e);
            return DefaultWriteFuture.newNotWrittenFuture(this, e);
        }
        //构造写请求,通过过滤器链发送出去
        WriteFuture writeFuture = new DefaultWriteFuture(this);
        WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
        IoFilterChain filterChain = getFilterChain();
        filterChain.fireFilterWrite(writeRequest);

        //如果打开文件通道应该在完成时关闭通道
        if (openedFileChannel != null) {
            final FileChannel finalChannel = openedFileChannel;
            writeFuture.addListener(new IoFutureListener<WriteFuture>() {
                public void operationComplete(WriteFuture future) {
                    try {
                        finalChannel.close();
                    } catch (IOException e) {
                        ExceptionMonitor.getInstance().exceptionCaught(e);
                    }
                }
            });
        }
        return writeFuture;
    }
复制代码
分享到:
评论

相关推荐

    mina2.0 含11个jar包

    mina-core-2.0.0-M6.jar mina-example-2.0.0-M6.jar mina-filter-codec-netty-2.0.0-M6.jar mina-filter-compression-2.0.0-M6.jar mina-integration-beans-2.0.0-M6.jar mina-integration-jmx-2.0.0-M6.jar mina-...

    关于apache Mina Server

    深入理解Apache_Mina_(1)----_Mina的几个类 深入理解Apache_Mina_(2)----_与IoFilter相关的几个类 深入理解Apache_Mina_(3)----_与IoHandler相关的几个类 深入理解Apache_Mina_(4)----_IoFilter和IoHandler的区别和...

    mina连接 mina心跳连接 mina断线重连

    mina连接,mina心跳连接,mina断线重连。其中客户端可直接用在android上。根据各方参考资料,经过自己的理解弄出来的。CSDN的资源分太难得了。

    Mina 2.0 User Guide(Mina 2.0 用户指南)

    MINA 2.0 User Guide Part I - Basics Chapter 1 - Getting Started Chapter 2 - Basics Chapter 3 - Service Chapter 4 - Session Chapter 5 - Filters Chapter 6 - Transports Chapter 7 - Handler Part II - ...

    mina socket客户度工程相关类

    1.mina socket客户度工程相关类,添加mina jar包后可独立运行。 2.mina若有空闲连接则使用已有连接,若无则新建mina连接; 3.mina空闲连接超过保活时间25分钟后,自动删除; 4.mina发送指令后,接收指定时长内收到的...

    Apache Mina核心jar包:mina-core-2.0.7

    Apache MINA是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。 当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版...

    Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)

    Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)Apache Mina Server 2.0中文参考手册V1.0,Apache Mina2.0学习笔记(修订版)

    mina自定义编解码器详解

    许多刚接触mina的朋友,对于mina的编解码器的编写很迷惑.希望这个文档可以帮助朋友们少走弯路。 资源中是一个比较典型的编解码器写法。生成了可执行文件。并对编解码器的代码有详细注释。

    Apache MINA开发相关jar包

    apache-mina-2.0.7-bin.zip,apache-mina-2.0.7-src.zip,log4j-1.2.17.zip,slf4j-api-1.6.6.jar,slf4j-api-1.6.6-sources.jar,slf4j-log4j12-1.6.6.jar,mina-example-2.0.7.jar,mina-example-2.0.7-sources....

    mina使用mina使用mina使用

    mina的使用初步入门mina的使用初步入门mina的使用初步入门

    MINA NIO 高性能异步并发网络通讯框架

    利用 Mina 可以高效地完成以下任务: &lt;br&gt;TCP/IP 和 UDP/IP 通讯 串口通讯 VM 间的管道通讯 SSL/TLS JXM 集成 IoC 容器集成( Spring 、 Pico 等) 状态机 &lt;br&gt;据官方评测, APR 的...

    mina开发相关jar包

    最新的 mina相关jar包 合集,里边有apache-mina-2.0.7-bin.zip,apache-mina-2.0.7-src.zip,log4j-1.2.17.zip,slf4j-api-1.6.6.jar,slf4j-api-1.6.6-sources.jar,slf4j-log4j12-1.6.6.jar,mina-example-2.0.7....

    MINA_API+MINA_DOC+mina

    里面包含mina2.0的api(英文)和mina自学手册,还有mina的开发指导

    mina心跳包机制

    项目包含有mina的服务端与客户端,客户端发送心跳包,服务端响应心跳包

    Mina状态机介绍和实例

    MINA(Multipurpose Infrastructure for Network Applications)是用于开发高性能和高可用性的网络应用程序的基础框架。通过使用MINA框架可以可以省下处理底层I/O和线程并发等复杂工作,开发人员能够把更多的精力投入...

    MINA断线重连死锁解决

    mina客户端,服务器端的demo

    mina2.0.7实例(服务器端与客户端)

    服务器端模拟SFS2X的一些实用功能对mina框架作了一定的封装,使用起来己经和SFS2X没有 太多的区别,但客户端只能使用mina组件(也就是说只能是JAVA客户端,这个实在有点麻烦,一直 想应用在unity3d中,没有实现,不懂...

    Apache MINA 2.0 用户指南中英文对照阅读版[带书签]

    本资源包含两个 pdf 文档,一本根据官方最新文档 (http://mina.apache.org/mina-project/userguide/user-guide-toc.html) 整理的 mina_2.0_user_guide_en.pdf,一个中文翻译的 mina_2.0_user_guide_cn.pdf。...

    apache-mina-2.0.4.rar_apache mina_mina

    mina内部源码,可以深入的研究下,重构修改后获得的效率更加突出

    spring mvc + Mina 配置部署

    之前的项目需要用到mina,实现的功能主要是:服务端主动发送消息到客户端,这个的服务端为外网的tomcat,客户端为内网的tomcat,由于无法知道内网tomcat 的地址,也就不能直接通过http的方式发送信息回来,最后想来...

Global site tag (gtag.js) - Google Analytics