Skip to contentMethod: publish(Object)
1: /*
2: * *********************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
23: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.messagebus.spi;
28:
29: import java.lang.ref.WeakReference;
30: import javax.annotation.Nonnull;
31: import javax.annotation.concurrent.ThreadSafe;
32: import java.util.ArrayList;
33: import java.util.HashMap;
34: import java.util.HashSet;
35: import java.util.List;
36: import java.util.Map;
37: import java.util.concurrent.Executor;
38: import java.util.concurrent.Executors;
39: import it.tidalwave.messagebus.MessageBus;
40: import lombok.Getter;
41: import lombok.extern.slf4j.Slf4j;
42:
43: /***********************************************************************************************************************
44: *
45: * A partial implementation of {@link MessageBus}.
46: *
47: * @author Fabrizio Giudici
48: *
49: **********************************************************************************************************************/
50: @ThreadSafe @Slf4j
51: public class SimpleMessageBus implements MessageBus
52: {
53: private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>();
54:
55: private final MessageDelivery messageDelivery;
56:
57: @Getter
58: private final Executor executor;
59:
60: /*******************************************************************************************************************
61: *
62: * Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own
63: * thread pool.
64: *
65: ******************************************************************************************************************/
66: public SimpleMessageBus()
67: {
68: this(Executors.newFixedThreadPool(10));
69: }
70:
71: /*******************************************************************************************************************
72: *
73: * Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery.
74: *
75: * @param executor the {@link Executor}
76: *
77: ******************************************************************************************************************/
78: public SimpleMessageBus (@Nonnull final Executor executor)
79: {
80: this(executor, new SimpleAsyncMessageDelivery());
81: }
82:
83: /*******************************************************************************************************************
84: *
85: * Creates a new instance given an executor and a strategy for delivery.
86: *
87: * @param executor the {@link Executor}
88: * @param messageDelivery the strategy for delivery
89: *
90: ******************************************************************************************************************/
91: public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery)
92: {
93: this.executor = executor;
94: this.messageDelivery = messageDelivery;
95: this.messageDelivery.initialize(this);
96: log.info("MessageBusSupport configured with {}", messageDelivery);
97: }
98:
99: /*******************************************************************************************************************
100: *
101: * {@inheritDoc}
102: *
103: ******************************************************************************************************************/
104: @Override
105: public <TOPIC> void publish (@Nonnull final TOPIC message)
106: {
107: publish((Class<TOPIC>)message.getClass(), message);
108: }
109:
110: /*******************************************************************************************************************
111: *
112: * {@inheritDoc}
113: *
114: ******************************************************************************************************************/
115: @Override
116: public <TOPIC> void publish (@Nonnull final Class<TOPIC> topic, @Nonnull final TOPIC message)
117: {
118: log.trace("publish({}, {})", topic, message);
119: messageDelivery.deliverMessage(topic, message);
120: }
121:
122: /*******************************************************************************************************************
123: *
124: * {@inheritDoc}
125: *
126: ******************************************************************************************************************/
127: @Override
128: public <TOPIC> void subscribe (@Nonnull final Class<TOPIC> topic, @Nonnull final Listener<TOPIC> listener)
129: {
130: log.debug("subscribe({}, {})", topic, listener);
131: findListenersByTopic(topic).add(new WeakReference<>(listener));
132: }
133:
134: /*******************************************************************************************************************
135: *
136: * {@inheritDoc}
137: *
138: ******************************************************************************************************************/
139: @Override
140: public void unsubscribe (@Nonnull final Listener<?> listener)
141: {
142: log.debug("unsubscribe({})", listener);
143:
144: for (final List<WeakReference<Listener<?>>> list : listenersMapByTopic.values())
145: {
146: list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener));
147: }
148: }
149:
150: /*******************************************************************************************************************
151: *
152: * Dispatches a message.
153: *
154: * @param <TOPIC> the static type of the topic
155: * @param topic the dynamic type of the topic
156: * @param message the message
157: *
158: ******************************************************************************************************************/
159: protected <TOPIC> void dispatchMessage (@Nonnull final Class<TOPIC> topic, @Nonnull final TOPIC message)
160: {
161: final HashSet<Map.Entry<Class<?>, List<WeakReference<MessageBus.Listener<?>>>>> clone =
162: new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs
163:
164: for (final Map.Entry<Class<?>, List<WeakReference<MessageBus.Listener<?>>>> e : clone)
165: {
166: if (e.getKey().isAssignableFrom(topic))
167: {
168: final List<WeakReference<MessageBus.Listener<TOPIC>>> listeners = (List)e.getValue();
169:
170: for (final WeakReference<MessageBus.Listener<TOPIC>> listenerReference : listeners)
171: {
172: final MessageBus.Listener<TOPIC> listener = listenerReference.get();
173:
174: if (listener != null)
175: {
176: try
177: {
178: listener.notify(message);
179: }
180: catch (Throwable t)
181: {
182: log.warn("deliverMessage()", t);
183: }
184: }
185: }
186: }
187: }
188: }
189:
190: /*******************************************************************************************************************
191: *
192: *
193: ******************************************************************************************************************/
194: @Nonnull
195: private <TOPIC> List<WeakReference<Listener<TOPIC>>> findListenersByTopic (@Nonnull final Class<TOPIC> topic)
196: {
197: List<WeakReference<Listener<TOPIC>>> listeners = (List)listenersMapByTopic.get(topic);
198:
199: if (listeners == null)
200: {
201: listeners = new ArrayList<>();
202: listenersMapByTopic.put(topic, (List)listeners);
203: }
204:
205: return listeners;
206: }
207: }