Skip to contentPackage: MultiQueue$TopicAndMessage
MultiQueue$TopicAndMessage
Coverage
1: /*
2: * *************************************************************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2024 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *************************************************************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12: * You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17: * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18: *
19: * *************************************************************************************************************************************************************
20: *
21: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23: *
24: * *************************************************************************************************************************************************************
25: */
26: package it.tidalwave.messagebus.spi;
27:
28: import javax.annotation.Nonnull;
29: import java.util.ArrayList;
30: import java.util.Comparator;
31: import java.util.List;
32: import java.util.Queue;
33: import java.util.concurrent.ConcurrentNavigableMap;
34: import java.util.concurrent.ConcurrentSkipListMap;
35: import java.util.concurrent.LinkedBlockingQueue;
36: import lombok.Getter;
37: import lombok.RequiredArgsConstructor;
38: import lombok.ToString;
39: import lombok.extern.slf4j.Slf4j;
40:
41: /***************************************************************************************************************************************************************
42: *
43: * @author Fabrizio Giudici
44: *
45: **************************************************************************************************************************************************************/
46: @Slf4j
47: public class MultiQueue
48: {
49: @RequiredArgsConstructor @Getter @ToString
50: static class TopicAndMessage<T>
51: {
52: @Nonnull
53: private final Class<T> topic;
54:
55: @Nonnull
56: private final T message;
57: }
58:
59: private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic =
60: new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName));
61:
62: private Class<?> latestSentTopic = null;
63:
64: /***********************************************************************************************************************************************************
65: * Adds a message of the given topic to this queue and issues a notification.
66: *
67: * @param <T> the static type of the message
68: * @param topic the dynamic type of the message
69: * @param message the message
70: **********************************************************************************************************************************************************/
71: public synchronized <T> void add (@Nonnull final Class<T> topic, @Nonnull final T message)
72: {
73: getQueue(topic).add(message);
74: notifyAll();
75: }
76:
77: /***********************************************************************************************************************************************************
78: * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available.
79: *
80: * @param <T> the static type of the topic
81: * @return the topic and message
82: * @throws InterruptedException if interrupted while waiting
83: **********************************************************************************************************************************************************/
84: @Nonnull
85: public synchronized <T> TopicAndMessage<T> remove()
86: throws InterruptedException
87: {
88: for (;;)
89: {
90: for (final var topic : reorderedTopics())
91: {
92: final var queue = queueMapByTopic.get(topic);
93: final var message = queue.poll();
94:
95: if (message != null)
96: {
97: latestSentTopic = topic;
98:
99: if (log.isTraceEnabled())
100: {
101: log.trace("stats {}", stats());
102: }
103:
104: return new TopicAndMessage<>((Class<T>)topic, (T)message);
105: }
106: }
107:
108: if (log.isTraceEnabled())
109: {
110: log.trace("all queues empty; stats {}", stats());
111: }
112:
113: wait();
114: }
115: }
116:
117: /***********************************************************************************************************************************************************
118: * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around.
119: **********************************************************************************************************************************************************/
120: @Nonnull
121: private List<Class<?>> reorderedTopics()
122: {
123: final var keySet = queueMapByTopic.navigableKeySet();
124: final List<Class<?>> scanSet = new ArrayList<>();
125:
126: if (latestSentTopic == null)
127: {
128: scanSet.addAll(keySet);
129: }
130: else
131: {
132: scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true));
133: scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true));
134: }
135:
136: return scanSet;
137: }
138:
139: /***********************************************************************************************************************************************************
140: **********************************************************************************************************************************************************/
141: private synchronized String stats()
142: {
143: final var b = new StringBuilder();
144: var separator = "";
145:
146: for (final var e : queueMapByTopic.entrySet())
147: {
148: b.append(separator).append(String.format("%s[%s]: %d",
149: e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size()));
150: separator = ", ";
151: }
152:
153: return b.toString();
154: }
155:
156: /***********************************************************************************************************************************************************
157: * Returns the queue associated to a given topic. The queue is created if the topic is new.
158: *
159: * @param topic the topic
160: * @return the queue
161: **********************************************************************************************************************************************************/
162: @Nonnull
163: private synchronized <T> Queue<T> getQueue (@Nonnull final Class<T> topic)
164: {
165: // TODO Java 8 would make this easier
166: var queue = (Queue<T>)queueMapByTopic.get(topic);
167:
168: if (queue == null)
169: {
170: queue = new LinkedBlockingQueue<>();
171: queueMapByTopic.put(topic, queue);
172: }
173:
174: return queue;
175: }
176: }