手写事件发布订阅框架

2022年5月15日10:38:16

一、前言

发布订阅模式也叫观察者模式,利用该模式可以进行代码解耦,很多框架都用到该设计模式,比如Spring的事件机制,guava的EventBus(事件总线)等,不清楚观察者模式的话可以查看本人之前写的博客《设计模式之观察者模式》。

为了更好的了解现有的事件框架实现原理,便手写了一个简单的事件发布/订阅框架供大家参考。

二、设计编码

首先创建一个事件类继承,所有的事件都继承该类。

/**
 * @author 2YSP
 * @date 2022/4/16 16:00
 */
public class Event extends EventObject {


    /**
     * Constructs a prototypical Event.
     *
     * @param source The object on which the Event initially occurred.
     * @throws IllegalArgumentException if source is null.
     */
    public Event(Object source) {
        super(source);
    }
}

JDK要求所有事件都继承EventObject,并通过source得到事件源。

然后定义事件监听器接口EventListener

/**
 * @author 2YSP
 * @description: 事件监听器
 * @date 2022/4/10 14:45
 */
public interface EventListener<E extends Event> {

    /**
     * 触发事件
     * @param e
     */
    void onEvent(E e);

}

核心部分就是需要一个类来管理所有的事件监听器,分别具备以下三个方法:

registerListener():注册一个事件监听器

removeListener():移除事件监听器

notifyListener():通知该事件触发的所有监听器

package cn.sp.event;

import com.google.common.collect.Lists;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;

import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author 2YSP
 * @date 2022/4/16 16:12
 */
@Component
public class EventManager implements ApplicationContextAware {
    /**
     * 事件map
     */
    private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);


    private static ApplicationContext applicationContext;

    private static final String EVENT_METHOD = "onEvent";

    /**
     * 初始化事件缓存map
     */
    @PostConstruct
    private void initEventMap() {
        Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
        if (beanMap == null) {
            return;
        }
        beanMap.forEach((key, value) -> {
            // 反射获取onEvent方法的参数类型
            Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
            for (Method method : methods) {
                if (method.getName().equals(EVENT_METHOD)) {
                    Parameter parameter = method.getParameters()[0];
                    // 参数必须为Event的子类
                    if (parameter.getType().getName().equals(Event.class.getName())) {
                        continue;
                    }
                    registerListener((Class<? extends Event>) parameter.getType(), value);
                }
            }
        });
    }

    /**
     * 注册一个事件监听器
     *
     * @param clazz
     * @param eventListener
     * @param <E>
     */
    public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
        List<EventListener> list = map.get(clazz);
        if (CollectionUtils.isEmpty(list)) {
            map.put(clazz, Lists.newArrayList(eventListener));
        } else {
            list.add(eventListener);
            map.put(clazz, list);
        }
    }

    /**
     * 移除一个事件监听器
     *
     * @param clazz
     * @param <E>
     */
    public <E extends Event> void removeListener(Class<E> clazz) {
        map.remove(clazz);
    }

    /**
     * 通知所有该事件的监听器
     *
     * @param <E>
     */
    public <E extends Event> void notifyListener(E e) {
        List<EventListener> eventListeners = map.get(e.getClass());
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        eventListeners.forEach(eventListener -> {
                // 同步执行
                eventListener.onEvent(e);
        });
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        EventManager.applicationContext = applicationContext;
    }
}

通过initEventMap()方法在项目启动后,利用反射注册所有的事件监听器,但是notifyListener()方法是串行执行,如果想要异步执行增加一个标记注解@AsyncExecute就行了,优化后版本如下:

/**
 * @author 2YSP
 * @date 2022/4/16 17:35
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface AsyncExecute {

}

EventManager

/**
 * @author 2YSP
 * @date 2022/4/16 16:12
 */
@Component
public class EventManager implements ApplicationContextAware {
    /**
     * 事件map
     */
    private static Map<Class<? extends Event>, List<EventListener>> map = new HashMap<>(64);


    private static ApplicationContext applicationContext;

    private static final String EVENT_METHOD = "onEvent";

    /**
     * 事件执行线程池
     */
    private static ExecutorService eventPool = new ThreadPoolExecutor(4,
            8, 30L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(512), new ThreadFactoryBuilder().setNameFormat("event-pool-%d").build());

    /**
     * 初始化事件缓存map
     */
    @PostConstruct
    private void initEventMap() {
        Map<String, EventListener> beanMap = applicationContext.getBeansOfType(EventListener.class);
        if (beanMap == null) {
            return;
        }
        beanMap.forEach((key, value) -> {
            // 反射获取onEvent方法的参数类型
            Method[] methods = ReflectionUtils.getDeclaredMethods(value.getClass());
            for (Method method : methods) {
                if (method.getName().equals(EVENT_METHOD)) {
                    Parameter parameter = method.getParameters()[0];
                    // 参数必须为Event的子类
                    if (parameter.getType().getName().equals(Event.class.getName())) {
                        continue;
                    }
                    registerListener((Class<? extends Event>) parameter.getType(), value);
                }
            }
        });
    }

    /**
     * 注册一个事件监听器
     *
     * @param clazz
     * @param eventListener
     * @param <E>
     */
    public <E extends Event> void registerListener(Class<? extends Event> clazz, EventListener<E> eventListener) {
        List<EventListener> list = map.get(clazz);
        if (CollectionUtils.isEmpty(list)) {
            map.put(clazz, Lists.newArrayList(eventListener));
        } else {
            list.add(eventListener);
            map.put(clazz, list);
        }
    }

    /**
     * 移除一个事件监听器
     *
     * @param clazz
     * @param <E>
     */
    public <E extends Event> void removeListener(Class<E> clazz) {
        map.remove(clazz);
    }

    /**
     * 通知所有该事件的监听器
     *
     * @param <E>
     */
    public <E extends Event> void notifyListener(E e) {
        List<EventListener> eventListeners = map.get(e.getClass());
        if (CollectionUtils.isEmpty(eventListeners)) {
            return;
        }
        eventListeners.forEach(eventListener -> {
            AsyncExecute asyncExecute = eventListener.getClass().getAnnotation(AsyncExecute.class);
            if (asyncExecute == null) {
                // 同步执行
                eventListener.onEvent(e);
            } else {
                // 异步执行
                eventPool.execute(() -> eventListener.onEvent(e));
            }
        });
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        EventManager.applicationContext = applicationContext;
    }
}

@AsyncExecute注解可用在类上,每次调用notifyListener()方法时通过反射判断是否存在@AsyncExecute注解,如果存在则用线程池异步执行,其实反射的性能不是很好,如果追求性能的话可以考虑在初始化时就将是否异步执行的信息维护到事件缓存map中。

现在就差一个发布事件的工具类EventPublisher

/**
 * @author 2YSP
 * @date 2022/4/16 16:07
 */
@Component
public class EventPublisher<E extends Event> {

    @Resource
    private EventManager eventManager;

    public <E extends Event> void publish(E event) {
        eventManager.notifyListener(event);
    }
}

三、测试

测试的场景是订单创建后,发生订单创建事件,然后有两个监听器都监听了该事件,区别是一个用了@AsyncExecute注解,一个没有。

  1. 创建Order实体
public class Order {

    private String orderNo;

    public String getOrderNo() {
        return orderNo;
    }

    public void setOrderNo(String orderNo) {
        this.orderNo = orderNo;
    }
}

2.创建订单创建事件

public class OrderCreateEvent extends Event {

    private Order order;

    public OrderCreateEvent(Object source, Order order) {
        super(source);
        this.order = order;
    }

    public Order getOrder() {
        return order;
    }

    public void setOrder(Order order) {
        this.order = order;
    }
}

3.创建事件监听器

@Component
public class OrderCreateEventListener implements EventListener<OrderCreateEvent> {

    @Override
    public void onEvent(OrderCreateEvent orderCreateEvent) {
        System.out.println(Thread.currentThread().getName() + "--监听订单创建事件。。。。。。。。。");
        Order order = orderCreateEvent.getOrder();
        System.out.println(order.getOrderNo());
    }
}


@AsyncExecute
@Component
public class OrderCreateEventListener2 implements EventListener<OrderCreateEvent> {

    @Override
    public void onEvent(OrderCreateEvent orderCreateEvent) {
        System.out.println(Thread.currentThread().getName() + "--监听订单创建事件2。。。。。。。。。");
        Order order = orderCreateEvent.getOrder();
        System.out.println(order.getOrderNo());
    }
}

4.发布事件

@RequestMapping("/order")
@RestController
public class OrderController {

    @Resource
    private OrderService orderService;

    @PostMapping("")
    public void create(@RequestBody Order order) {
        orderService.create(order);
    }
}


@Service
public class OrderService {

    @Resource
    private EventPublisher<OrderCreateEvent> publisher;


    /**
     * 创建订单
     *
     * @param order
     */
    public void create(Order order) {
        // 发送订单创建事件
        order.setOrderNo("sssss");
        publisher.publish(new OrderCreateEvent(this, order));
    }
}

测试代码编写完毕,启动项目请求订单创建接口http://localhost:8080/order,控制台输出如下

http-nio-8080-exec-2--监听订单创建事件。。。。。。。。。
sssss
event-pool-0--监听订单创建事件2。。。。。。。。。
sssss

说明两个事件监听器都被触发了,且线程名字不同,说明一个是主线程同步执行,另一个是线程池异步,至此测试成功。

四、总结

写完发现实现一个发布/订阅框架并不难,当然这个功能比较简单,还有优化的空间,代码已经上传到github,点击即可查看

  • 作者:烟味i
  • 原文链接:https://www.cnblogs.com/2YSP/p/16182840.html
    更新时间:2022年5月15日10:38:16 ,共 7423 字。