Skip to content

Method: unsubscribe()

1: /*
2: * *************************************************************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *************************************************************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12: * You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17: * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18: *
19: * *************************************************************************************************************************************************************
20: *
21: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23: *
24: * *************************************************************************************************************************************************************
25: */
26: package it.tidalwave.actor.impl;
27:
28: import java.lang.reflect.Method;
29: import javax.annotation.Nonnull;
30: import javax.inject.Provider;
31: import java.util.ArrayList;
32: import java.util.List;
33: import it.tidalwave.actor.CollaborationCompletedMessage;
34: import it.tidalwave.actor.CollaborationStartedMessage;
35: import it.tidalwave.actor.annotation.OriginatedBy;
36: import it.tidalwave.actor.spi.ActorActivatorStats;
37: import it.tidalwave.actor.spi.CollaborationAwareMessageBus;
38: import it.tidalwave.messagebus.MessageBus;
39: import it.tidalwave.messagebus.annotation.ListensTo;
40: import lombok.RequiredArgsConstructor;
41: import lombok.extern.slf4j.Slf4j;
42: import static it.tidalwave.messagebus.spi.ReflectionUtils.*;
43:
44: /***************************************************************************************************************************************************************
45: *
46: * @author Fabrizio Giudici
47: *
48: **************************************************************************************************************************************************************/
49: @RequiredArgsConstructor @Slf4j
50: public class CollaborationAwareMessageBusAdapter implements MethodProcessor
51: {
52: @Nonnull
53: private final Object owner;
54:
55: @Nonnull
56: private final ExecutorWithPriority executor;
57:
58: @Nonnull
59: private final ActorActivatorStats stats;
60:
61: private final List<MessageBus.Listener<?>> messageBusListeners = new ArrayList<>();
62:
63: private final Provider<CollaborationAwareMessageBus> messageBus =
64: Locator.createProviderFor(CollaborationAwareMessageBus.class);
65:
66: /***********************************************************************************************************************************************************
67: *
68: **********************************************************************************************************************************************************/
69: @Override @Nonnull
70: public FilterResult filter (@Nonnull final Class<?> clazz)
71: {
72: return FilterResult.ACCEPT; // TODO: should filter with @Actor?
73: }
74:
75: /***********************************************************************************************************************************************************
76: *
77: **********************************************************************************************************************************************************/
78: @Override
79: public void process (@Nonnull final Method method)
80: {
81: final var parameterAnnotations = method.getParameterAnnotations();
82:
83: if ((parameterAnnotations.length == 1) && containsAnnotation(parameterAnnotations[0], ListensTo.class))
84: {
85: registerMessageListener(method);
86: }
87: else if ((parameterAnnotations.length == 2) && containsAnnotation(parameterAnnotations[0], ListensTo.class)
88: && containsAnnotation(parameterAnnotations[1], OriginatedBy.class))
89: {
90: registerCollaborationListener(method);
91: }
92: }
93:
94: /***********************************************************************************************************************************************************
95: *
96: **********************************************************************************************************************************************************/
97: public void unsubscribe()
98: {
99:• for (final var listener : messageBusListeners)
100: {
101: messageBus.get().unsubscribe(listener);
102: }
103: }
104:
105: /***********************************************************************************************************************************************************
106: *
107: **********************************************************************************************************************************************************/
108: private <T> void registerMessageListener (@Nonnull final Method method)
109: {
110: log.info("registerMessageListener({})", method);
111:
112: final var topic = (Class<T>)method.getParameterTypes()[0];
113: addListener(method, new MessageListenerAdapter<>(owner, method, executor, stats), topic);
114: }
115:
116: /***********************************************************************************************************************************************************
117: *
118: **********************************************************************************************************************************************************/
119: private <T> void registerCollaborationListener (@Nonnull final Method method)
120: {
121: log.info("registerCollaborationListener({})", method);
122: final var collaborationMessageType = method.getParameterTypes()[0];
123: final var messageType = method.getParameterTypes()[1];
124:
125: final MessageBus.Listener messageListener = collaborationMessageType.equals(CollaborationStartedMessage.class)
126: ? new CollaborationMessageListenerAdapter<CollaborationStartedMessage>(owner, method, executor, messageType, stats)
127: : new CollaborationMessageListenerAdapter<CollaborationCompletedMessage>(owner, method, executor, messageType, stats);
128: addListener(method, messageListener, collaborationMessageType);
129: }
130:
131: /***********************************************************************************************************************************************************
132: *
133: **********************************************************************************************************************************************************/
134: private <T> void addListener (@Nonnull final Method method,
135: @Nonnull final MessageBus.Listener<T> messageListener,
136: @Nonnull final Class<T> topic) throws SecurityException
137: {
138: method.setAccessible(true);
139: messageBusListeners.add(messageListener);
140: messageBus.get().subscribe(topic, messageListener);
141: }
142: }