Package: RoundRobinAsyncMessageDelivery$1
RoundRobinAsyncMessageDelivery$1
name | instruction | branch | complexity | line | method | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
dispatchMessage(MultiQueue.TopicAndMessage) |
|
|
|
|
|
||||||||||||||||||||
run() |
|
|
|
|
|
||||||||||||||||||||
{...} |
|
|
|
|
|
Coverage
1: /*
2: * *********************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
23: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.messagebus.spi;
28:
29: import javax.annotation.Nonnull;
30: import java.util.concurrent.Executor;
31: import it.tidalwave.messagebus.spi.MultiQueue.TopicAndMessage;
32: import lombok.Getter;
33: import lombok.Setter;
34: import lombok.ToString;
35: import lombok.extern.slf4j.Slf4j;
36:
37: /***********************************************************************************************************************
38: *
39: * An implementation of {@link MessageDelivery} that dispatches messages in a round-robin fashion, topic by topic.
40: * Each delivery is performed in a separated thread.
41: *
42: * @author Fabrizio Giudici
43: * @since 2.2
44: *
45: **********************************************************************************************************************/
46: @Slf4j @ToString(of = "workers")
47: public class RoundRobinAsyncMessageDelivery implements MessageDelivery
48: {
49: @Nonnull
50: private SimpleMessageBus messageBusSupport;
51:
52: @Getter @Setter
53: private int workers = 10;
54:
55: private final MultiQueue multiQueue = new MultiQueue();
56:
57: /*******************************************************************************************************************
58: *
59: *
60: ******************************************************************************************************************/
61: private final Runnable dispatcher = new Runnable()
62: {
63: @Override
64: public void run()
65: {
66: for (;;)
67: {
68: try
69: {
70: dispatchMessage(multiQueue.remove());
71: }
72: catch (InterruptedException e)
73: {
74: break;
75: }
76: }
77: }
78:
79: private <TOPIC> void dispatchMessage (@Nonnull final TopicAndMessage<TOPIC> tam)
80: {
81: messageBusSupport.dispatchMessage(tam.getTopic(), tam.getMessage());
82: }
83: };
84:
85: /*******************************************************************************************************************
86: *
87: * {@inheritDoc}
88: *
89: ******************************************************************************************************************/
90: @Override
91: public void initialize (@Nonnull final SimpleMessageBus messageBusSupport)
92: {
93: this.messageBusSupport = messageBusSupport;
94: final Executor executor = this.messageBusSupport.getExecutor();
95:
96: for (int i = 0; i < workers; i++)
97: {
98: executor.execute(dispatcher);
99: }
100: }
101:
102: /*******************************************************************************************************************
103: *
104: * {@inheritDoc}
105: *
106: ******************************************************************************************************************/
107: @Override
108: public <TOPIC> void deliverMessage (@Nonnull final Class<TOPIC> topic, @Nonnull final TOPIC message)
109: {
110: multiQueue.add(topic, message);
111: }
112: }