Skip to contentMethod: unsubscribe()
1: /*
2: * *********************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
23: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.actor.impl;
28:
29: import java.lang.annotation.Annotation;
30: import java.lang.reflect.Method;
31: import javax.annotation.Nonnull;
32: import javax.inject.Provider;
33: import java.util.ArrayList;
34: import java.util.List;
35: import it.tidalwave.actor.CollaborationCompletedMessage;
36: import it.tidalwave.actor.CollaborationStartedMessage;
37: import it.tidalwave.actor.annotation.OriginatedBy;
38: import it.tidalwave.actor.spi.ActorActivatorStats;
39: import it.tidalwave.actor.spi.CollaborationAwareMessageBus;
40: import it.tidalwave.messagebus.MessageBus;
41: import it.tidalwave.messagebus.annotation.ListensTo;
42: import lombok.RequiredArgsConstructor;
43: import lombok.extern.slf4j.Slf4j;
44: import static it.tidalwave.messagebus.spi.ReflectionUtils.*;
45:
46: /***********************************************************************************************************************
47: *
48: * @author Fabrizio Giudici
49: *
50: **********************************************************************************************************************/
51: @RequiredArgsConstructor @Slf4j
52: public class CollaborationAwareMessageBusAdapter implements MethodProcessor
53: {
54: @Nonnull
55: private final Object owner;
56:
57: @Nonnull
58: private final ExecutorWithPriority executor;
59:
60: @Nonnull
61: private final ActorActivatorStats stats;
62:
63: private final List<MessageBus.Listener<?>> messageBusListeners = new ArrayList<>();
64:
65: private final Provider<CollaborationAwareMessageBus> messageBus =
66: Locator.createProviderFor(CollaborationAwareMessageBus.class);
67:
68: /*******************************************************************************************************************
69: *
70: *
71: *
72: ******************************************************************************************************************/
73: @Override @Nonnull
74: public FilterResult filter (@Nonnull final Class<?> clazz)
75: {
76: return FilterResult.ACCEPT; // TODO: should filter with @Actor?
77: }
78:
79: /*******************************************************************************************************************
80: *
81: *
82: *
83: ******************************************************************************************************************/
84: @Override
85: public void process (@Nonnull final Method method)
86: {
87: final Annotation[][] parameterAnnotations = method.getParameterAnnotations();
88:
89: if ((parameterAnnotations.length == 1) && containsAnnotation(parameterAnnotations[0], ListensTo.class))
90: {
91: registerMessageListener(method);
92: }
93: else if ((parameterAnnotations.length == 2) && containsAnnotation(parameterAnnotations[0], ListensTo.class)
94: && containsAnnotation(parameterAnnotations[1], OriginatedBy.class))
95: {
96: registerCollaborationListener(method);
97: }
98: }
99:
100: /*******************************************************************************************************************
101: *
102: *
103: *
104: ******************************************************************************************************************/
105: public void unsubscribe()
106: {
107:• for (final MessageBus.Listener<?> listener : messageBusListeners)
108: {
109: messageBus.get().unsubscribe(listener);
110: }
111: }
112:
113: /*******************************************************************************************************************
114: *
115: *
116: *
117: ******************************************************************************************************************/
118: private <Topic> void registerMessageListener (@Nonnull final Method method)
119: {
120: log.info("registerMessageListener({})", method);
121:
122: final Class<Topic> topic = (Class<Topic>)method.getParameterTypes()[0];
123: addListener(method, new MessageListenerAdapter<>(owner, method, executor, stats), topic);
124: }
125:
126: /*******************************************************************************************************************
127: *
128: *
129: *
130: ******************************************************************************************************************/
131: private <Topic> void registerCollaborationListener (@Nonnull final Method method)
132: {
133: log.info("registerCollaborationListener({})", method);
134: final Class<?> collaborationMessageType = method.getParameterTypes()[0];
135: final Class<?> messageType = method.getParameterTypes()[1];
136:
137: final MessageBus.Listener messageListener = collaborationMessageType.equals(CollaborationStartedMessage.class)
138: ? new CollaborationMessageListenerAdapter<CollaborationStartedMessage>(owner, method, executor, messageType, stats)
139: : new CollaborationMessageListenerAdapter<CollaborationCompletedMessage>(owner, method, executor, messageType, stats);
140: addListener(method, messageListener, collaborationMessageType);
141: }
142:
143: /*******************************************************************************************************************
144: *
145: *
146: *
147: ******************************************************************************************************************/
148: private <Topic> void addListener (@Nonnull final Method method,
149: @Nonnull final MessageBus.Listener<Topic> messageListener,
150: @Nonnull final Class<Topic> topic) throws SecurityException
151: {
152: method.setAccessible(true);
153: messageBusListeners.add(messageListener);
154: messageBus.get().subscribe(topic, messageListener);
155: }
156: }