Package: MultiQueue$TopicAndMessage
MultiQueue$TopicAndMessage
name | instruction | branch | complexity | line | method | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
MultiQueue.TopicAndMessage(Class, Object) |
|
|
|
|
|
||||||||||||||||||||
getMessage() |
|
|
|
|
|
||||||||||||||||||||
getTopic() |
|
|
|
|
|
||||||||||||||||||||
toString() |
|
|
|
|
|
Coverage
1: /*
2: * *********************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
23: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.messagebus.spi;
28:
29: import javax.annotation.Nonnull;
30: import java.util.ArrayList;
31: import java.util.Comparator;
32: import java.util.List;
33: import java.util.Map;
34: import java.util.NavigableSet;
35: import java.util.Queue;
36: import java.util.concurrent.ConcurrentNavigableMap;
37: import java.util.concurrent.ConcurrentSkipListMap;
38: import java.util.concurrent.LinkedBlockingQueue;
39: import lombok.Getter;
40: import lombok.RequiredArgsConstructor;
41: import lombok.ToString;
42: import lombok.extern.slf4j.Slf4j;
43:
44: /***********************************************************************************************************************
45: *
46: * @author Fabrizio Giudici
47: *
48: **********************************************************************************************************************/
49: @Slf4j
50: public class MultiQueue
51: {
52:• @RequiredArgsConstructor @Getter @ToString
53: static class TopicAndMessage<TOPIC>
54: {
55: @Nonnull
56: private final Class<TOPIC> topic;
57:
58: @Nonnull
59: private final TOPIC message;
60: }
61:
62: private final ConcurrentNavigableMap<Class<?>, Queue<?>> queueMapByTopic =
63: new ConcurrentSkipListMap<>(Comparator.comparing(Class::getName));
64:
65: private Class<?> latestSentTopic = null;
66:
67: /*******************************************************************************************************************
68: *
69: * Adds a message of the given topic to this queue and issues a notification.
70: *
71: * @param <TOPIC> the static type of the message
72: * @param topic the dynamic type of the message
73: * @param message the message
74: *
75: ******************************************************************************************************************/
76: public synchronized <TOPIC> void add (@Nonnull final Class<TOPIC> topic, @Nonnull final TOPIC message)
77: {
78: getQueue(topic).add(message);
79: notifyAll();
80: }
81:
82: /*******************************************************************************************************************
83: *
84: * Removes and returns the next pair (topic, message) from the queue. Blocks until one is available.
85: *
86: * @param <TOPIC> the static type of the topic
87: * @return the topic and message
88: * @throws InterruptedException if interrupted while waiting
89: *
90: ******************************************************************************************************************/
91: @Nonnull
92: public synchronized <TOPIC> TopicAndMessage<TOPIC> remove()
93: throws InterruptedException
94: {
95: for (;;)
96: {
97: for (final Class<?> topic : reorderedTopics())
98: {
99: final Queue<?> queue = queueMapByTopic.get(topic);
100: final Object message = queue.poll();
101:
102: if (message != null)
103: {
104: latestSentTopic = topic;
105:
106: if (log.isTraceEnabled())
107: {
108: log.trace("stats {}", stats());
109: }
110:
111: return new TopicAndMessage<>((Class<TOPIC>)topic, (TOPIC)message);
112: }
113: }
114:
115: if (log.isTraceEnabled())
116: {
117: log.trace("all queues empty; stats {}", stats());
118: }
119:
120: wait();
121: }
122: }
123:
124: /*******************************************************************************************************************
125: *
126: * Returns the list of topics reordered, so it starts just after latestSentTopic and wraps around.
127: *
128: ******************************************************************************************************************/
129: @Nonnull
130: private List<Class<?>> reorderedTopics()
131: {
132: final NavigableSet<Class<?>> keySet = queueMapByTopic.navigableKeySet();
133: final List<Class<?>> scanSet = new ArrayList<>();
134:
135: if (latestSentTopic == null)
136: {
137: scanSet.addAll(keySet);
138: }
139: else
140: {
141: scanSet.addAll(keySet.subSet(latestSentTopic, false, keySet.last(), true));
142: scanSet.addAll(keySet.subSet(keySet.first(), true, latestSentTopic, true));
143: }
144:
145: return scanSet;
146: }
147:
148: /*******************************************************************************************************************
149: *
150: *
151: ******************************************************************************************************************/
152: private synchronized String stats()
153: {
154: final StringBuilder b = new StringBuilder();
155: String separator = "";
156:
157: for (final Map.Entry<Class<?>, Queue<?>> e : queueMapByTopic.entrySet())
158: {
159: b.append(separator).append(String.format("%s[%s]: %d",
160: e.getKey().getSimpleName(), e.getKey().equals(latestSentTopic) ? "X" : " ", e.getValue().size()));
161: separator = ", ";
162: }
163:
164: return b.toString();
165: }
166:
167: /*******************************************************************************************************************
168: *
169: * Returns the queue associated to a given topic. The queue is created if the topic is new.
170: *
171: * @param topic the topic
172: * @return the queue
173: *
174: ******************************************************************************************************************/
175: @Nonnull
176: private synchronized <TOPIC> Queue<TOPIC> getQueue (@Nonnull final Class<TOPIC> topic)
177: {
178: // TODO Java 8 would make this easier
179: Queue<TOPIC> queue = (Queue<TOPIC>)queueMapByTopic.get(topic);
180:
181: if (queue == null)
182: {
183: queue = new LinkedBlockingQueue<>();
184: queueMapByTopic.put(topic, queue);
185: }
186:
187: return queue;
188: }
189: }