监听者模式,大家应该已经再熟悉不过了。会计Q和销售员A说:”你那边卖了点儿啥就通知我一声,我这边得记上一笔”,描述很简单,这也就是监听者模式的核心,触发者 + 源事件 + 监听器。Spring在应用内部也对事件监听提供了相关的实现,那么这回我就就来看看Spring的事件组件,聊点…你熟系的陌生的,知道的,不知道的。
先吃个栗子
Java对事件相关的构建进行了一些抽象去规范行为,包括源事件EventObject
,监听器类EventListener
,而Spring对此进行了实现,所以在内容开始之前,我们得先知道怎么创建一个事件并触发它。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| class SampleEvent extends ApplicationEvent { private Object someData; public SampleEvent(Object source, Object someData) { super(source); } }
class SampleListener implements ApplicationListener<SampleEvent> { @Override public void onApplication(SampleEvent event) { } }
public static void main(String[] args) { new ApplicationEventPublisher().publish(new SampleEvent(getClass(), "data")); }
|
一个标准的事件定义触发流程并不复杂,但是其中监听器的注册逻辑以及在事件触发时Spring是如何找到对应的监听器的呢?别急,跟着我慢慢看,说不定看着的同时,还会收获一些关于Spring ApplicationListener
小小的新玩法。
Ps: 这回的内容相对于之前我们聊过的Spring AOP
以及Spring Transcation
会简单一些,虽然会依赖到一点点AOP
相关的逻辑,想了解的同学也可以点这里去看看,这次也不需要依赖某个smoke-test
项目。我们可以稍微放松一下,新建一个Spring Boot项目,和我一起慢慢的去理解ApplicationListener
的设计逻辑。
那个发言人呢?
消息实例也好,监听器也罢,中间总得有个玩意把消息发布出来,让监听器能知道事件发生了这才有意义。Spring
抽象出了事件发布器ApplicationEventListener
并规定了事件发布行为方法publishEvent
。我们可以看一下实现了ApplicationEventListener
接口的AbstractApplicationContext
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| public abstract class AbstractApplicationContext extends DefaultResourceLoader implements ConfigurableApplicationContext {
... @Override public void publishEvent(ApplicationEvent event) { publishEvent(event, null); }
@Override public void publishEvent(Object event) { publishEvent(event, null); }
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null");
ApplicationEvent applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent) event; } else { applicationEvent = new PayloadApplicationEvent<>(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType(); } }
if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); }
if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext) this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } } }
|
找到一个大喇叭
从AbstractApplicationContext
中触发事件的逻辑可以知道触发的核心是ApplicationEventMulticaster
,那这个对象是哪来的呢?这就得复习一下我们Spring应用启动的refresh
方法了。在完成SpringBean
扫描之后会通过initApplicationEventMulticaster
方法进行广播器的初始化动作,我们可以看一下它的初始化逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); } else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); } }
|
Tips
由于我们应用中的上下文AnnotationConfigApplicationContext(标准Spring应用上下文对象)
或是AnnotationConfigServletWebServerApplicationContext(应用上下文对象)
,都基于AbstractApplicationContext
进行实现,因此我们也可以直接使用ApplcationContext
对象进行消息的发布。换句话说,在需要进行消息推送的时候我们既可以通过自动注入的ApplicationEventPublisher
对象进行消息发布,也可以直接通过ApplicationContext
对象进行消息发布,毕竟从核心来说,两者是同一个对象。
我们可以通过一个简单的注入证明这一点。但是以语义的方面来看,还是建议使用ApplicationEventPublisher
进行消息发布。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @SpringBootApplication public class SampleApplication implements ApplicationRunner, ApplicationContextAware, ApplicationEventPublisherAware { public static void main(String[] args) { SpringApplication.run(SampleApplication.class, args); } private ApplicationContext applicationContext; private ApplicationEventPublisher eventPublisher;
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }
@Override public void setApplicationEventPublisher(ApplicationEventPublisher eventPublisher) throws BeansException { this.eventPublisher = eventPublisher; } @Override public void run(ApplicationArguments args) throws Exception { Assert.isTrue(this.applicationContext == this.eventPublisher, "not equals"); } }
|
发言人有了,听众也得有
作为Spring的应用组件,当然也与IoC
容器脱离不开干系,我们需要把我们的监听器注册为Bean交付予IoC
容器托管。但是只是单纯的注册为Bean自然是不够的,我们还需要把监听器添加进事件发布器也就是我们的应用上下文中,以在有消息发生时知道有哪些候选监听器。
Spring的监听器定义有两种方式:
- 类实现
ApplicationListener
接口,并标记为SpringBean
声明为监听器 - 在被标记为
SpringBean
的类上,通过对方法标记@EventListener
注解声明为监听器
两种声明方式当然也对应着两种扫描时机和注册时机,我们一个一个来看。
标准的监听器
标准模式的监听器我们可以通过实现接口进行定义,譬如通过实现ApplicationListener
接口定义一个标准的监听器,或实现SmartApplicationListener
接口去根据事件类型和事件源进行事件处理,或实现GenericApplicationListener
接口根据消息对象类型(不再局限于事件类型)和消息源进行事件的处理。感兴趣的同学也可以去上面这些接口看看接口可以实现的行为。
在我们完成一个标准监听器定义并将其标记为SpringBean
之后,Spring在启动时会扫描将其装载进IoC
容器,这时候…如果你是Spring Listener
的责任开发,你会通过什么方式把监听器塞进应用上下文的监听器列表中。
要是我的话,可能会通过这两种方式实现装载:
- 通过定义一个
BeanPostProcessor
对监听器相关的SpringBean
进行处理并将其装载到应用上下文中- 这样实现需要注意要在其它
BeanPostProcessor
完成bean后处理逻辑处理/增强完后再进行装载操作,保证装载的bean已经是后处理的终态
- 在所有Bean初始化完成之后,通过Spring的回调(实现
SmartInitializingSingleton
,InitializingBean
等回调接口类),并注入BeanFactory
,在beanFactory
中扫描符合条件的监听器类,进行处理并将其装载到应用上下文中
Spring对标准的监听器装配采用了第一种方法,至于第二种方法,咱们先撂一撂过会再说。在Spring的应用启动方法refresh()
中,有这么一个方法调用,registerBeanPostProcessors(beanFactory)
,该方法会扫描应用内的beanPostProcessor
并将其配置进BeanFactory
,我们看一下里面的大体逻辑(我会删掉一些与本文不太重要的逻辑内容)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void registerBeanPostProcessors( ConfigurableListableBeanFactory beanFactory, AbstractApplicationContext applicationContext) {
String[] postProcessorNames = beanFactory.getBeanNamesForType(BeanPostProcessor.class, true, false);
beanFactory.addBeanPostProcessor(new ApplicationListenerDetector(applicationContext)); }
|
诶嘿嘿,好像被忽略掉的方法内容有点多,但是不打紧,我们找到了我们装配标准监听器的主角ApplicationListenerDetector
。其会在所有后处理其都处理完成之后,通过beanPostProcessor
的钩子方法postProcessAfterInitialization
将监听器装载进上下文,我们可以看看它的具体逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof ApplicationListener) { Boolean flag = this.singletonNames.get(beanName); if (Boolean.TRUE.equals(flag)) { this.applicationContext.addApplicationListener((ApplicationListener<?>) bean); } else if (Boolean.FALSE.equals(flag)) {
this.singletonNames.remove(beanName); } } return bean; }
|
从以上逻辑我们大致能获取到两个信息:
ApplicationListenerDetector
只能装载类实例为ApplicationListener
的bean,不会处理通过@EventListener
注解标记的方法模式的监听器- 非单例
SpringBean
无法作为监听器被配置进应用上下文中
那么既然ApplicationListenerDetector
无法处理@EventListener
注解,那这部分注解模式的监听器又是如何被添加进应用上下文的呢?不急,下面就说它了。
@EventListener
标注的监听器
关于通过@EventListener
注解进行标记监听器的装配方式,诶,那咱么就得回头看看我们应用上下文ApplicationContext
在初始化的时候都做了些什么了。
我们都知道SpringBoot
应用在启动时会根据类路径中是否存在WebFlux
或者Servlet
相关类对象进行应用的类型推断(不知道的你现在知道了ヽ(✿゚▽゚)ノ),并根据应用类型选择对应的上下文进行初始化,但是无论是哪种上下文,在构造函数内都会创建一个AnnotatedBeanDefinitionReader
的实例。
1 2 3 4 5
| public AnnotationConfigServletWebServerApplicationContext() { this.reader = new AnnotatedBeanDefinitionReader(this); this.scanner = new ClassPathBeanDefinitionScanner(this); }
|
而在AnnotatedBeanDefinitionReader
的初始化过程中,会通过AnnotationConfigUtils#registerAnnotationConfigProcessors
方法向我们的IoC
容器中注册一些对象,而我们要关注扫描装配类也就在这里。让我们来看看其向IoC
容器中注册bean的逻辑(我还是会忽略掉一些与本文关系不大代码)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public static Set<BeanDefinitionHolder> registerAnnotationConfigProcessors( BeanDefinitionRegistry registry, @Nullable Object source) {
DefaultListableBeanFactory beanFactory = unwrapDefaultListableBeanFactory(registry); if (beanFactory != null) { if (!(beanFactory.getDependencyComparator() instanceof AnnotationAwareOrderComparator)) { beanFactory.setDependencyComparator(AnnotationAwareOrderComparator.INSTANCE); } if (!(beanFactory.getAutowireCandidateResolver() instanceof ContextAnnotationAutowireCandidateResolver)) { beanFactory.setAutowireCandidateResolver(new ContextAnnotationAutowireCandidateResolver()); } }
Set<BeanDefinitionHolder> beanDefs = new LinkedHashSet<>(8);
if (!registry.containsBeanDefinition(EVENT_LISTENER_PROCESSOR_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(EventListenerMethodProcessor.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_PROCESSOR_BEAN_NAME)); }
if (!registry.containsBeanDefinition(EVENT_LISTENER_FACTORY_BEAN_NAME)) { RootBeanDefinition def = new RootBeanDefinition(DefaultEventListenerFactory.class); def.setSource(source); beanDefs.add(registerPostProcessor(registry, def, EVENT_LISTENER_FACTORY_BEAN_NAME)); }
return beanDefs; }
|
监听器方法处理器EventListenerMethodProcessor
,实现了BeanFactoryAware
接口和SmartInitializingSingleton
接口,前者用于获取beanFactory
对象,后者用于通过Spring的钩子回调对IoC
容器中的bean进行扫描,并将标记了@EventListener
的方法通过eventListenerFactory
封装为监听器包装类并添加进应用上下文中。这段描述是不是有点耳熟,没错….在标准监听器小节中提到的第二种监听器装配方式,就是给注解模式的监听器所使用的。我们可以具体看看它的扫描装配逻辑,可能有点点长。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| @Override public void afterSingletonsInstantiated() { ConfigurableListableBeanFactory beanFactory = this.beanFactory; String[] beanNames = beanFactory.getBeanNamesForType(Object.class); for (String beanName : beanNames) {
processBean(beanName, type); } }
private void processBean(final String beanName, final Class<?> targetType) {
if (!this.nonAnnotatedClasses.contains(targetType) && AnnotationUtils.isCandidateClass(targetType, EventListener.class) && !isSpringContainerClass(targetType)) {
Map<Method, EventListener> annotatedMethods = null; try { annotatedMethods = MethodIntrospector.selectMethods( targetType, (MethodIntrospector.MetadataLookup<EventListener>) method -> AnnotatedElementUtils .findMergedAnnotation(method, EventListener.class) ); } catch (Throwable ex) { }
if (CollectionUtils.isEmpty(annotatedMethods)) { this.nonAnnotatedClasses.add(targetType); } else { ConfigurableApplicationContext context = this.applicationContext; List<EventListenerFactory> factories = this.eventListenerFactories; for (Method method : annotatedMethods.keySet()) { for (EventListenerFactory factory : factories) { if (factory.supportsMethod(method)) {
Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName)); ApplicationListener<?> applicationListener = factory.createApplicationListener(beanName, targetType, methodToUse); if (applicationListener instanceof ApplicationListenerMethodAdapter) { ((ApplicationListenerMethodAdapter) applicationListener).init(context, this.evaluator); } context.addApplicationListener(applicationListener); break; } } } } } }
|
至此,我们所定义的两种模式的监听器,都被顺利的装在进了应用上下文中。那么万事俱备,现在就拿起我们前边儿准备好的大喇叭喊一嗓子(指发布事件),看看后续的故事又是怎么发展的吧。
Tips - 2
默认的监听器包装工厂类会将@EventListener
标注的方法封装进ApplicationListenerMethodAdapter
对象中,而包装类在构造时会解析监听器方法的其它属性,包括@Order
、@EventListener(condition)
,在事件触发时通过调用method.invoke(args)
方法进行本身方法的调用。标准的监听器方法返回值是Void
,但通过注解标注的监听器方法可以拥有返回值,我们可看一下包装类对方法返回值的处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| protected void handleResult(Object result) {
if (result instanceof CompletionStage) { ((CompletionStage<?>) result).whenComplete((event, ex) -> { if (ex != null) { handleAsyncError(ex); } else if (event != null) { publishEvent(event); } }); } else if (result instanceof ListenableFuture) { ((ListenableFuture<?>) result).addCallback(this::publishEvents, this::handleAsyncError); } else { publishEvents(result); }
}
|
我们可以通过这个特性完成一些异步的操作,如通过事件A触发监听器A进行逻辑处理,监听器A完成逻辑处理后可以返回事件B,让监听器B进行后续操作,完成一个事件驱动的逻辑设计。
发布事件 - 喊一嗓子
监听器和广播器已经全部准备好了,上下文中publishEvent
逻辑我们在找广播器的阶段也已经看过了,那么现在剩下的…就是在一个事件被发布时,要如何找到对应的监听该事件的监听器呢?我们直接进入到ApplicationEvnetMulticaster
的multicastEvent
方法,看看发布事件时的实际操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
|
@Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } }
|
在这里阿肖要插一段,在实际开发过程中主流程在完成一些业务逻辑后发布了事件,而隐藏在事件广播器身后的监听器对于主流程是不可知的,若此时事件广播器的执行流程是串行的,那么监听器的处理逻辑要是稍有纰漏抛出异常,就会导致主流程原本的逻辑无法继续进行,这当然不是我们所期望的,所以在进行事件驱动逻辑开发的时候,一定要做好异常的捕获处理,保证主流程的正常流转。
当然,我们可也以用一些别的小手段,还记得我们在找事件广播器ApplicationEventMulticaster
的时候提到过我们可以自定义广播器么,根据广播器初始化逻辑,会在IoC
容器中查找bean名称为applicationEventMulticaster
的bean,若bean不存在,则进行初始化,反之则会使用已存在的广播器。我们可以利用这部分逻辑定义我们自己的广播器并在配置类中注册为bean,SimpleApplicationEventMilticaster
的类上有两个字段taskExecutor
和errorHandler
,在注册时我们可以配置上时间的异步任务执行器和通用异常处理器,类似这样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Configuration public class MulticasterConfiguration { @Bean public ApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory) { SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(beanFactory); multicaster.setTaskExecutor(Executors.newSingleThreadExecutor()); multicaster.setErrorHandler((ex) -> { logger.warn("has something error occur", ex); }); return multicaster; } }
|
通过事件监听器逻辑的异步执行,我们能保证监听器逻辑不会影响主流程的继续进行。但是由于广播器所配置的任务执行器是固定的,若是线程池的配置不当,可能会导致线程池任务溢出。其次会导致有前后依赖的监听器逻辑执行有误(譬如事件A有两个监听器,监听器2号的执行逻辑需要监听器1号执行完成,异步执行无法保证完成后再执行2号监听器逻辑)。所以我们也可以通过对监听器方法标记@Async
注解实现方法的异步执行,同时可以指明使用的线程池,避免所有事件监听器依赖同一个线程池的问题。当然两种方式各有利弊,还是根据实际的业务开发场景细细斟酌会更好一点。
回到我们事件发布的逻辑上,广播器通过getApplicationListeners
方法获取与之匹配的监听器列表,我们一起来看看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| protected Collection<ApplicationListener<?>> getApplicationListeners( ApplicationEvent event, ResolvableType eventType) { Object source = event.getSource(); Class<?> sourceType = (source != null ? source.getClass() : null); ListenerCacheKey cacheKey = new ListenerCacheKey(eventType, sourceType);
ListenerRetriever retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); }
synchronized (this.retrievalMutex) { retriever = this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); } retriever = new ListenerRetriever(true); Collection<ApplicationListener<?>> listeners = retrieveApplicationListeners(eventType, sourceType, retriever); this.retrieverCache.put(cacheKey, retriever); return listeners; } }
|
在事件触发时,广播器会根据EventType + Source
在监听器列表里查找符合条件的监听器并将其缓存起来。实现SmartApplicationListener
接口或GenericApplicationListener
接口的监听器可以通过supportsEventType
方法和supportsSourceType
方法支持多种事件类型,虽匹配类型可以根据逻辑动态变动,但若在第一次事件触发时没有成功匹配,则不会有第二次匹配机会,监听器也不会按期望被加入监听器列表中。
在retrieveApplicationListeners
方法中,会取出所有的ApplicationListener
实例,并通过supportsEvent
方法判断是是否是事件的目标监听器,在找齐了监听器之后,会根据Order
配置对监听器进行排序,并循环调用各个监听器的onApplication
方法触发事件处理逻辑。由于retrieveApplicationListeners
方法整体逻辑较多,我们就单独把判断类型支持的supportsEvent
方法择出来看看,想知道方法整体逻辑的同学也可以线性的看一看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| protected boolean supportsEvent( ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) { GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ? (GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener)); return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType)); }
@Override public boolean supportsEventType(ResolvableType eventType) { if (this.delegate instanceof SmartApplicationListener) { Class<? extends ApplicationEvent> eventClass = (Class<? extends ApplicationEvent>) eventType.resolve(); return (eventClass != null && ((SmartApplicationListener) this.delegate).supportsEventType(eventClass)); } else {
return (this.declaredEventType == null || this.declaredEventType.isAssignableFrom(eventType)); } }
|
小结
至此,我们找到了广播器,找到了监听器,然后把他们组合在了一起,好好的利用它们能极大的解除系统内逻辑的耦合,使系统可扩展性和可维护性大大提升。我们开发过程中引入的Mq中间件也是在分布式应用的情况下实现了这些模式,只不过中间件所要考虑的问题就会偏向于RPC、消息丢失、持久化等逻辑。Spring Listener
组件的整体架构用到了很多设计模式,监听器模式(根本),装饰模式(各处的Adapter),代理模式(Cglib
的类增强)等等…
还是那句话,Spring是个大家伙,我也不知道什么时候能把它啃完,但是其中很多的组件开发思路也值得我们参考,譬如@EventListener
标记方法的的扫描装配方法,我们也可以用这个思路在项目里实现一些自己的组件。
尾巴
整篇内容从开始准备到写到这里,总共花了十天的时间,本来以为Spring Listener
的内容会比较简单,没想到发散开写也有这么多,不管终归还是写完了。有点点累,但是也鼓励鼓励坚持到现在的自己,下一篇要写什么还没有想好,所有大家伙有什么想看的也可以告诉我,这样我就不用自己想选题了诶嘿嘿,歇一歇也要把20年的年终总结和21年的计划写了。总之…谢谢你能看到这里。