Skip to contentMethod: SimpleMessageBus()
1: /*
2: * *************************************************************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *************************************************************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12: * You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17: * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18: *
19: * *************************************************************************************************************************************************************
20: *
21: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23: *
24: * *************************************************************************************************************************************************************
25: */
26: package it.tidalwave.messagebus.spi;
27:
28: import java.lang.ref.WeakReference;
29: import javax.annotation.Nonnull;
30: import javax.annotation.concurrent.ThreadSafe;
31: import java.util.ArrayList;
32: import java.util.HashMap;
33: import java.util.HashSet;
34: import java.util.List;
35: import java.util.Map;
36: import java.util.concurrent.Executor;
37: import java.util.concurrent.Executors;
38: import it.tidalwave.messagebus.MessageBus;
39: import lombok.Getter;
40: import lombok.extern.slf4j.Slf4j;
41:
42: /***************************************************************************************************************************************************************
43: *
44: * A partial implementation of {@link MessageBus}.
45: *
46: * @author Fabrizio Giudici
47: *
48: **************************************************************************************************************************************************************/
49: @ThreadSafe @Slf4j
50: public class SimpleMessageBus implements MessageBus
51: {
52: private final Map<Class<?>, List<WeakReference<Listener<?>>>> listenersMapByTopic = new HashMap<>();
53:
54: private final MessageDelivery messageDelivery;
55:
56: @Getter
57: private final Executor executor;
58:
59: /***********************************************************************************************************************************************************
60: * Creates a new instance with a {@link SimpleAsyncMessageDelivery} strategy for delivery. It will use its own
61: * thread pool.
62: **********************************************************************************************************************************************************/
63: public SimpleMessageBus()
64: {
65: this(Executors.newFixedThreadPool(10));
66: }
67:
68: /***********************************************************************************************************************************************************
69: * Creates a new instance given an executor and a {@link SimpleAsyncMessageDelivery} strategy for delivery.
70: *
71: * @param executor the {@link Executor}
72: **********************************************************************************************************************************************************/
73: public SimpleMessageBus (@Nonnull final Executor executor)
74: {
75: this(executor, new SimpleAsyncMessageDelivery());
76: }
77:
78: /***********************************************************************************************************************************************************
79: * Creates a new instance given an executor and a strategy for delivery.
80: *
81: * @param executor the {@link Executor}
82: * @param messageDelivery the strategy for delivery
83: **********************************************************************************************************************************************************/
84: public SimpleMessageBus (@Nonnull final Executor executor, @Nonnull final MessageDelivery messageDelivery)
85: {
86: this.executor = executor;
87: this.messageDelivery = messageDelivery;
88: this.messageDelivery.initialize(this);
89: log.info("MessageBusSupport configured with {}", messageDelivery);
90: }
91:
92: /***********************************************************************************************************************************************************
93: * {@inheritDoc}
94: **********************************************************************************************************************************************************/
95: @Override
96: public <T> void publish (@Nonnull final T message)
97: {
98: publish((Class<T>)message.getClass(), message);
99: }
100:
101: /***********************************************************************************************************************************************************
102: * {@inheritDoc}
103: **********************************************************************************************************************************************************/
104: @Override
105: public <T> void publish (@Nonnull final Class<T> topic, @Nonnull final T message)
106: {
107: log.trace("publish({}, {})", topic, message);
108: messageDelivery.deliverMessage(topic, message);
109: }
110:
111: /***********************************************************************************************************************************************************
112: * {@inheritDoc}
113: **********************************************************************************************************************************************************/
114: @Override
115: public <T> void subscribe (@Nonnull final Class<T> topic, @Nonnull final Listener<T> listener)
116: {
117: log.debug("subscribe({}, {})", topic, listener);
118: findListenersByTopic(topic).add(new WeakReference<>(listener));
119: }
120:
121: /***********************************************************************************************************************************************************
122: * {@inheritDoc}
123: **********************************************************************************************************************************************************/
124: @Override
125: public void unsubscribe (@Nonnull final Listener<?> listener)
126: {
127: log.debug("unsubscribe({})", listener);
128:
129: for (final var list : listenersMapByTopic.values())
130: {
131: list.removeIf(ref -> (ref.get() == null) || (ref.get() == listener));
132: }
133: }
134:
135: /***********************************************************************************************************************************************************
136: * Dispatches a message.
137: *
138: * @param <T> the static type of the topic
139: * @param topic the dynamic type of the topic
140: * @param message the message
141: **********************************************************************************************************************************************************/
142: protected <T> void dispatchMessage (@Nonnull final Class<T> topic, @Nonnull final T message)
143: {
144: final var clone = new HashSet<>(listenersMapByTopic.entrySet()); // FIXME: marked as dubious by SpotBugs
145:
146: for (final var e : clone)
147: {
148: if (e.getKey().isAssignableFrom(topic))
149: {
150: final List<WeakReference<MessageBus.Listener<T>>> listeners = (List)e.getValue();
151:
152: for (final var listenerReference : listeners)
153: {
154: final var listener = listenerReference.get();
155:
156: if (listener != null)
157: {
158: try
159: {
160: listener.notify(message);
161: }
162: catch (Throwable t)
163: {
164: log.warn("deliverMessage()", t);
165: }
166: }
167: }
168: }
169: }
170: }
171:
172: /***********************************************************************************************************************************************************
173: **********************************************************************************************************************************************************/
174: @Nonnull
175: private <T> List<WeakReference<Listener<T>>> findListenersByTopic (@Nonnull final Class<T> topic)
176: {
177: // FIXME: use putIfAbsent()
178: List<WeakReference<Listener<T>>> listeners = (List)listenersMapByTopic.get(topic);
179:
180: if (listeners == null)
181: {
182: listeners = new ArrayList<>();
183: listenersMapByTopic.put(topic, (List)listeners);
184: }
185:
186: return listeners;
187: }
188: }