首页 诗词 字典 板报 句子 名言 友答 励志 学校 网站地图
当前位置: 首页 > 教程频道 > 服务器 > Apache >

深入解析Apache Mina源码(二)——Mina的事件模型

2012-08-01 
深入解析Apache Mina源码(2)——Mina的事件模型1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现2、深入

深入解析Apache Mina源码(2)——Mina的事件模型

1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现2、深入解析Apache Mina源码(2)——Mina的事件模型

?

一、观察者模式的本来面目

工作时间长了,会发现代码中的很多东西都是相通相似的,就说JAVA的事件机制其实就是观察者模式的实现,学会了观察者模式,事件机制自己无师自通,先以我的角度看看什么是观察者模式。

观察者模式,顾名思义,应该有观察者(抽象观察者“Observer”角色)和被观察者(抽象主题“Subject”角色),被观察者一旦有变化就通知观察者更新自己。

形象图:


深入解析Apache Mina源码(二)——Mina的事件模型

下面是观察者模式的类图:


深入解析Apache Mina源码(二)——Mina的事件模型

从我的角度简单点理解,我们知道面向对象编程不过是对象之间的相互调用,那么被观察者应该有个列表(List)来把所有观察者引用起来,当有事件要通知观察者时,我们从List中把所有观察者类取出来挨个调用观察者的方法(如update())。这个过程中只有充分利用了面向对象编程的多态,继承等特性才能实现。

下面是代码实现:

被观察者(抽象主题类):

package com.lifanghu.observer;import java.util.ArrayList;import java.util.List;/** * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午06:18:26 * @name com.lifanghu.observer.Subject.java * @version 1.0 */public abstract class Subject {    private List<Observer> observers = new ArrayList<Observer>();    /**     * 增加一个观察者     * @param observer     * @author lifh     */    public void addObserver(Observer observer) {        observers.add(observer);    }    /**     * 删除一个观察者     * @param observer     * @author lifh     */    public void removeObserver(Observer observer) {        observers.remove(observer);    }    /**     * 通知所有观察者     * @author lifh     */    public void notifyObservers() {        for (Observer observer : observers) {            observer.update(this);        }    }}?

??

实际被观察者(真实主题角色):

package com.lifanghu.observer;/** * 实际被观察者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午10:48:02 * @name com.lifanghu.observer.XiaoMing.java * @version 1.0 */public class XiaoMing extends Subject {    private String state;    //实际要通进行通知的方法    public void change() {        this.notifyObservers();    }    public String getState() {        return state;    }    public void setState(String state) {        this.state = state;    }}
?

?

抽象观察者:

package com.lifanghu.observer;/** * 实际观察者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午10:54:07 * @name com.lifanghu.observer.XiaoWang.java * @version 1.0 */public class XiaoWang implements Observer {    public void update(Subject subject) {        XiaoMing xm = (XiaoMing) subject;        System.out.println("小王得到通知:" + xm.getState() + ";小王说:活该!");    }}?

??

具体观察者1:

package com.lifanghu.observer;/** * 实际观察者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午10:54:07 * @name com.lifanghu.observer.XiaoWang.java * @version 1.0 */public class XiaoWang implements Observer {    public void update(Subject subject) {        XiaoMing xm = (XiaoMing) subject;        System.out.println("小王得到通知:" + xm.getState() + ";小王说:活该!");    }}
?

具体观察者2:

package com.lifanghu.observer;/** * 实际观察者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午10:56:17 * @name com.lifanghu.observer.ZhangShan.java * @version 1.0 */public class ZhangShan implements Observer {    public void update(Subject subject) {        XiaoMing xm = (XiaoMing) subject;        System.out.println("张三得到通知:" + xm.getState() + ";张三说:快吃药吧!");    }}
?

?

客户端调用逻辑:

package com.lifanghu.observer;/** * 实际观察者 * @author lifh * @mail wslfh2005@163.com * @since 2012-6-14 下午10:56:17 * @name com.lifanghu.observer.ZhangShan.java * @version 1.0 */public class ZhangShan implements Observer {    public void update(Subject subject) {        XiaoMing xm = (XiaoMing) subject;        System.out.println("张三得到通知:" + xm.getState() + ";张三说:快吃药吧!");    }}
?

?

输出结果:

说明:对于观察者模式JDK是有相应的实现支持的,内置观察者模式主要有2个类,一个是类Observable,一个是接口类Observer?,大家可以上网去查,或者直接看它的源码,这里就不多说了。

?

二、从观察者模式到事件机制

关于事件机制上面也谈了,它其实就是观察者模式的实现,关于两者的联系也可以看iteye上的另一讨论http://www.iteye.com/topic/182643,里面说的已经很清楚了。

关于事件模型我想说的是里面的三个元素:事件源,事件本身,监听者,对应观察者模式的三个元素分别是:被观察者,观察者方法要传入的参数,观察者。

关于事件机制JDK也有相应的实现java.util.EventListener和java.util.EventObject?。

?

?

三、Mina中的事件以及监听器的实现

Mina?中有主要有三种事件:

1、org.apache.mina.core.service包下面对IoService接口实现和IoSession接口实现的监听,它不是一个纯的事件模型,没有事件本身(EventObject)的存在。

2、org.apache.mina.core.session包下在线程池环境下对于事件的触发从而进入过滤器链进行的处理,它不是一个纯事件的模型,少了监听器(EventListener)的实现。

3、org.apache.mina.core.future包下对于异步IO操作(IoFuture)进行监听。它也不是一个纯的事件模型,也是少了事件本身(EventObject)的存在。

下面分别介绍下这三种事件细节。

1、service包下面主要有两个类是用于监听模式的,监听者IoServiceListener和它的帮助类IoServiceListenerSupport,IoServiceListener接口继承了EventListener,标明自己是个监听者接口,主要监听service和session的创建,空闲,销毁事件的。IoServiceListenerSupport作为它的帮助类,实际上充当了事件源的角色,它存储了IoServiceListener的成员列表。

/** A list of {@link IoServiceListener}s. */private final List<IoServiceListener> listeners = new CopyOnWriteArrayList<IoServiceListener>();

/** * Adds a new listener. * * @param listener The added listener */ public void add(IoServiceListener listener) { if (listener != null) { listeners.add(listener); } } /** * Removes an existing listener. * * @param listener The listener to remove */ public void remove(IoServiceListener listener) { if (listener != null) { listeners.remove(listener); } }?

?

通知监听者的方法:

?

    /**     * Calls {@link IoServiceListener#serviceActivated(IoService)}     * for all registered listeners.     * 通知方法     */    public void fireServiceActivated() {        if (!activated.compareAndSet(false, true)) {            // The instance is already active            return;        }        activationTime = System.currentTimeMillis();        // Activate all the listeners now        for (IoServiceListener listener : listeners) {//循环提取出listeners执行本身的方法            try {                listener.serviceActivated(service);            } catch (Throwable e) {                ExceptionMonitor.getInstance().exceptionCaught(e);            }        }    }
?

?

再来看接口IoService,有对监听器的操作:

?

    /**     * Adds an {@link IoServiceListener} that listens any events related with     * this service.     */    void addListener(IoServiceListener listener);    /**     * Removed an existing {@link IoServiceListener} that listens any events     * related with this service.     */    void removeListener(IoServiceListener listener);

?它的实现AbstractIoService里面可以看到,实际是调用的IoServiceListenerSupport的方法执行的有关操作:

?

    /**     * {@inheritDoc}     */    public final void addListener(IoServiceListener listener) {        listeners.add(listener);    }    /**     * {@inheritDoc}     */    public final void removeListener(IoServiceListener listener) {        listeners.remove(listener);    }

?看下AbstractIoAcceptor类的调用方法:

?

        if (activate) {            //触发监听器的服务创建事件            getListeners().fireServiceActivated();        }

?2、session包下的事件模型主要还是基于多线程模型来实现的,可以看到IoEvent实现了Runnable接口,它作为任务放入到线程池中去执行,看它的run方法:

?

    // 此类实现了 Runnable 它的存在主要为了线程池过滤器ExecutorFilter来使用的。    // 这样如果加入了ExecutorFilter后,后面所有的数据处理都是多线程方式进行的。    // 一般放在handler前面,也就是所有过滤器的后面,因为handler主要做业务处理,可能会有数据库的操作,比较耗时,适合多线程。    public void run() {        //激活各种事件        fire();    }

?IO事件类型:

?

package org.apache.mina.core.session;/** * An {@link Enum} that represents the type of I/O events and requests. * Most users won't need to use this class.  It is usually used by internal * components to store I/O events. * * @author <a href="http://mina.apache.org">Apache MINA Project</a> */public enum IoEventType {    SESSION_CREATED,//session创建    SESSION_OPENED,//session打开    SESSION_CLOSED,//session关闭    MESSAGE_RECEIVED,//消息接收    MESSAGE_SENT,//消息发送    SESSION_IDLE,//session空闲    EXCEPTION_CAUGHT,//发生异常    WRITE,//写事件    CLOSE,//关闭session事件}

?它有一个实现类IoFilterEvent,我们知道如果我们想要利用mina的多线程处理,需要如下加一个过滤器:

?

connector.getFilterChain().addLast("executor", new ExecutorFilter());

?在这个时候我们的IoEvent就派上了用场,ExecutorFilter是一个实现了线程池(线程池在后面的文章中会介绍)的过滤器,看下ExecutorFilter的调用:

?

@Overridepublic final void sessionOpened(NextFilter nextFilter, IoSession session) {    if (eventTypes.contains(IoEventType.SESSION_OPENED)) {        IoFilterEvent event = new IoFilterEvent(nextFilter, IoEventType.SESSION_OPENED,            session, null);         //触发事件        fireEvent(event);    } else {        nextFilter.sessionOpened(session);    }}/** * Fires the specified event through the underlying executor. *  * @param event The filtered event */protected void fireEvent(IoFilterEvent event) {    //将事件提交给线程池执行    executor.execute(event);}

?可以看到这个session下的事件模型没有监听器,所以基本就没有事件模式的概念了,但是还是有一些事件模型的一些影子,所以可以认为它是一个伪事件模型。

?

?

/** * Invoked when the operation associated with the {@link IoFuture} * has been completed even if you add the listener after the completion. * 只有一个处理完成的回调方法,当线程池将我们的任务处理完成后会调用此方法。 * @param future The source {@link IoFuture} which called this * callback. */void operationComplete(F future);

package org.apache.mina.core.future;import java.util.concurrent.TimeUnit;import org.apache.mina.core.session.IoSession;/** * Represents the completion of an asynchronous I/O operation on an * {@link IoSession}. * Can be listened for completion using a {@link IoFutureListener}. * 因为有了线程池模型才会有异步的概念。 * 我们将各种IO操作(连接,关闭,读,写)以任务的方式放入队列中并返回IoFuture供线程池去处理。 * 处理过程和处理完成的操作会改变IoFuture的状态。 * @author <a href="http://mina.apache.org">Apache MINA Project</a> */public interface IoFuture { /** * Returns the {@link IoSession} which is associated with this future. */ IoSession getSession(); /** * Wait for the asynchronous operation to complete. * The attached listeners will be notified when the operation is * completed. */ IoFuture await() throws InterruptedException; /** * Wait for the asynchronous operation to complete with the specified timeout. * 等待操作在指定时间内完成 * @return <tt>true</tt> if the operation is completed. */ boolean await(long timeout, TimeUnit unit) throws InterruptedException; /** * Wait for the asynchronous operation to complete with the specified timeout. * * @return <tt>true</tt> if the operation is completed. */ boolean await(long timeoutMillis) throws InterruptedException; /** * Wait for the asynchronous operation to complete uninterruptibly. * The attached listeners will be notified when the operation is * completed. * 等待异步操作完成,完成后会触发注册的监听器。 * @return the current IoFuture */ IoFuture awaitUninterruptibly(); /** * Wait for the asynchronous operation to complete with the specified timeout * uninterruptibly. * * @return <tt>true</tt> if the operation is completed. */ boolean awaitUninterruptibly(long timeout, TimeUnit unit); /** * Wait for the asynchronous operation to complete with the specified timeout * uninterruptibly. * * @return <tt>true</tt> if the operation is finished. */ boolean awaitUninterruptibly(long timeoutMillis); /** * @deprecated Replaced with {@link #awaitUninterruptibly()}. */ @Deprecated void join(); /** * @deprecated Replaced with {@link #awaitUninterruptibly(long)}. */ @Deprecated boolean join(long timeoutMillis); /** * Returns if the asynchronous operation is completed. */ boolean isDone(); /** * Adds an event <tt>listener</tt> which is notified when * this future is completed. If the listener is added * after the completion, the listener is directly notified. * 增加异步完成后的监听者 */ IoFuture addListener(IoFutureListener<?> listener); /** * Removes an existing event <tt>listener</tt> so it won't be notified when * the future is completed. * 删除异步完成后的监听者 */ IoFuture removeListener(IoFutureListener<?> listener);}

/** * Sets the result of the asynchronous operation, and mark it as finished. */public void setValue(Object newValue) { synchronized (lock) { // Allow only once. if (ready) { return; } result = newValue; ready = true; if (waiters > 0) { lock.notifyAll(); } } //调用监听者的方法 notifyListeners();}

参考文章:

1.?观察者模式

http://ttitfly.iteye.com/blog/152512

?

2.?《JAVA与模式》之观察者模式

http://www.cnblogs.com/java-my-life/archive/2012/05/16/2502279.html

?

3. Java事件机制理解及应用

http://blog.csdn.net/JianZhiZG/article/details/1427073

?

4. java?事件机制

http://www.blogjava.net/chenweicai/archive/2007/04/13/110350.html

?

五、总结

?

?

事件模型作为mina中一个重要的设计模式很好的体现了JAVA高内聚,低耦合的设计思想,也让用户的调用代码简洁易用,本文章本着抛砖引玉的态度希望大家能指出缺点,共同讨论。

每天进步一点点,不做无为的码农。。。。。

2012年6月17日星期日

码农虎虎

http://weibo.com/hurtigf

http://www.lifanghu.com/

wslfh2005@163.com

热点排行