Skip to content

Method: eventuallySendCompletionMessage()

1: /*
2: * *********************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
23: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.actor.impl;
28:
29: import javax.annotation.Nonnegative;
30: import javax.annotation.Nonnull;
31: import java.util.ArrayList;
32: import java.util.List;
33: import java.io.Serializable;
34: import com.eaio.uuid.UUID;
35: import it.tidalwave.actor.Collaboration;
36: import it.tidalwave.actor.CollaborationCompletedMessage;
37: import it.tidalwave.actor.CollaborationStartedMessage;
38: import it.tidalwave.actor.annotation.Message;
39: import org.joda.time.DateTime;
40: import org.joda.time.Duration;
41: import lombok.EqualsAndHashCode;
42: import lombok.Getter;
43: import lombok.RequiredArgsConstructor;
44: import lombok.ToString;
45: import lombok.extern.slf4j.Slf4j;
46: import static lombok.AccessLevel.PRIVATE;
47:
48: /***********************************************************************************************************************
49: *
50: * @author Fabrizio Giudici
51: *
52: **********************************************************************************************************************/
53: @RequiredArgsConstructor(access = PRIVATE) @EqualsAndHashCode(of = "id") @Slf4j @ToString(of = "id")
54: public class DefaultCollaboration implements Serializable, Collaboration
55: {
56: /*******************************************************************************************************************
57: *
58: *
59: *
60: ******************************************************************************************************************/
61: private static final DefaultCollaboration NULL_DEFAULT_COLLABORATION = new DefaultCollaboration(new Object())
62: {
63: @Override
64: public void bindToCurrentThread()
65: {
66: }
67:
68: @Override
69: public void unbindFromCurrentThread()
70: {
71: }
72: };
73:
74: /*******************************************************************************************************************
75: *
76: *
77: *
78: ******************************************************************************************************************/
79: @RequiredArgsConstructor @ToString
80: private static class IdentityWrapper
81: {
82: @Nonnull
83: private final Object object;
84:
85: @Override
86: public boolean equals (final Object object)
87: {
88: if ((object == null) || (getClass() != object.getClass()))
89: {
90: return false;
91: }
92:
93: final IdentityWrapper other = (IdentityWrapper)object;
94: return this.object == other.object;
95: }
96:
97: @Override
98: public int hashCode()
99: {
100: return object.hashCode();
101: }
102: }
103:
104: private static final ThreadLocal<DefaultCollaboration> THREAD_LOCAL = new ThreadLocal<>();
105:
106: private final UUID id = new UUID();
107:
108: @Nonnull @Getter
109: private final Object originatingMessage;
110:
111: private final long startTime = System.currentTimeMillis();
112:
113: @Getter
114: private boolean completed;
115:
116: private final List<Object> suspensionTokens = new ArrayList<>();
117:
118: /** List of threads currently working for this Collaboration. */
119: // No need for being weak, since objects are explicitly removed
120: private final List<Thread> runningThreads = new ArrayList<>();
121:
122: /** List of messages currently being delivered as part of this Collaboration. */
123: // No need for being weak, since objects are explicitly removed
124: private final List<Object> deliveringMessages = new ArrayList<>();
125:
126: /** List of messages pending to be consumed as part of this Collaboration. */
127: // No need for being weak, since objects are explicitly removed
128: private final List<IdentityWrapper> pendingMessages = new ArrayList<>();
129:
130: private boolean collaborationStartedMessageSent = false;
131:
132: /*******************************************************************************************************************
133: *
134: * Factory method to retrieve a {@link Collaboration}. This method accepts any object; if it is an implementation
135: * of {@link Provider}, the object is queried; otherwise {@link #NULL_DEFAULT_COLLABORATION} is returned.
136: *
137: * @param object the object associated to the {@code Collaboration}
138: * @return the {@code Collaboration}
139: *
140: ******************************************************************************************************************/
141: @Nonnull
142: public static DefaultCollaboration getCollaboration (@Nonnull final Object object)
143: {
144: return (object instanceof Provider) ? (DefaultCollaboration)((Provider)object).getCollaboration()
145: : NULL_DEFAULT_COLLABORATION;
146: }
147:
148: /*******************************************************************************************************************
149: *
150: * Gets the {@link Collaboration} bound to the current thread or creates a new one.
151: *
152: * @param originator the object that will be considered the originator of the {@code Collaboration} in case it
153: * is created
154: * @return the {@code Collaboration}
155: *
156: ******************************************************************************************************************/
157: @Nonnull
158: public static DefaultCollaboration getOrCreateCollaboration (@Nonnull final Object originator)
159: {
160: DefaultCollaboration collaboration = THREAD_LOCAL.get();
161:
162: if (collaboration == null)
163: {
164: collaboration = new DefaultCollaboration(originator);
165: }
166:
167: return collaboration;
168: }
169:
170: /*******************************************************************************************************************
171: *
172: * {@inheritDoc}
173: *
174: ******************************************************************************************************************/
175: @Override @Nonnull
176: public DateTime getStartTime()
177: {
178: return new DateTime(startTime);
179: }
180:
181: /*******************************************************************************************************************
182: *
183: * {@inheritDoc}
184: *
185: ******************************************************************************************************************/
186: @Override @Nonnull
187: public Duration getDuration()
188: {
189: return new Duration(startTime, System.currentTimeMillis());
190: }
191:
192: /*******************************************************************************************************************
193: *
194: * {@inheritDoc}
195: *
196: ******************************************************************************************************************/
197: @Override
198: public synchronized void waitForCompletion()
199: throws InterruptedException
200: {
201: while (!isCompleted())
202: {
203: wait();
204: }
205: }
206:
207: /*******************************************************************************************************************
208: *
209: * {@inheritDoc}
210: *
211: ******************************************************************************************************************/
212: @Override @Nonnegative
213: public synchronized int getDeliveringMessagesCount()
214: {
215: return deliveringMessages.size();
216: }
217:
218: /*******************************************************************************************************************
219: *
220: * {@inheritDoc}
221: *
222: ******************************************************************************************************************/
223: @Override @Nonnegative
224: public synchronized int getPendingMessagesCount()
225: {
226: return pendingMessages.size();
227: }
228:
229: /*******************************************************************************************************************
230: *
231: * {@inheritDoc}
232: *
233: ******************************************************************************************************************/
234: @Override @Nonnegative
235: public synchronized int getRunningThreadsCount()
236: {
237: return runningThreads.size();
238: }
239:
240: /*******************************************************************************************************************
241: *
242: * {@inheritDoc}
243: *
244: ******************************************************************************************************************/
245: @Override
246: public void resume (@Nonnull final Object suspensionToken, @Nonnull final Runnable runnable)
247: {
248: try
249: {
250: bindToCurrentThread();
251: runnable.run();
252: suspensionTokens.remove(suspensionToken);
253: }
254: finally
255: {
256: unbindFromCurrentThread();
257: }
258: }
259:
260: /*******************************************************************************************************************
261: *
262: * {@inheritDoc}
263: *
264: ******************************************************************************************************************/
265: @Override
266: public synchronized void resumeAndDie (@Nonnull final Object suspensionToken)
267: {
268: resume(suspensionToken, () -> {});
269: }
270:
271: /*******************************************************************************************************************
272: *
273: * {@inheritDoc}
274: *
275: ******************************************************************************************************************/
276: @Override
277: public synchronized Object suspend()
278: {
279: final Object suspensionToken = new UUID();
280: suspensionTokens.add(suspensionToken);
281: return suspensionToken;
282: }
283:
284: /*******************************************************************************************************************
285: *
286: * {@inheritDoc}
287: *
288: ******************************************************************************************************************/
289: @Override
290: public synchronized boolean isSuspended()
291: {
292: return !suspensionTokens.isEmpty();
293: }
294:
295: /*******************************************************************************************************************
296: *
297: *
298: *
299: ******************************************************************************************************************/
300: public synchronized void bindToCurrentThread()
301: {
302: log.trace("bindToCurrentThread()");
303: THREAD_LOCAL.set(this);
304: runningThreads.add(Thread.currentThread());
305: notifyAll();
306: log();
307: }
308:
309: /*******************************************************************************************************************
310: *
311: *
312: *
313: ******************************************************************************************************************/
314: public synchronized void unbindFromCurrentThread()
315: {
316: log.trace("unbindFromCurrentThread()");
317: runningThreads.remove(Thread.currentThread());
318: THREAD_LOCAL.remove();
319: notifyAll();
320: log();
321: eventuallySendCompletionMessage();
322: }
323:
324: /*******************************************************************************************************************
325: *
326: * Registers that the given {@link Message} is being delivered.
327: *
328: * @param message the {@code Message}
329: *
330: ******************************************************************************************************************/
331: public synchronized void registerDeliveringMessage (@Nonnull final Object message)
332: {
333: log.trace("registerDeliveringMessage({})", message);
334:
335: final Message annotation = message.getClass().getAnnotation(Message.class);
336:
337: if (annotation == null)
338: {
339: throw new IllegalArgumentException("Message must be annotated with @Message: " + message.getClass());
340: }
341:
342: if (annotation.daemon())
343: {
344: deliveringMessages.add(message);
345:
346: // Do this *after* enlisting message in deliveringMessages
347: if (!collaborationStartedMessageSent && !(message instanceof CollaborationStartedMessage))
348: {
349: CollaborationStartedMessage.forCollaboration(this).send();
350: collaborationStartedMessageSent = true;
351: }
352:
353: notifyAll();
354: log();
355: }
356: }
357:
358: /*******************************************************************************************************************
359: *
360: * Registers that the given {@link Message} is no more being delivered.
361: *
362: * @param message the {@code Message}
363: *
364: ******************************************************************************************************************/
365: public synchronized void unregisterDeliveringMessage (@Nonnull final Object message)
366: {
367: log.trace("unregisterDeliveringMessage({})", message);
368:
369: if (message.getClass().getAnnotation(Message.class).daemon())
370: {
371: deliveringMessages.remove(message);
372: notifyAll();
373: log();
374: eventuallySendCompletionMessage();
375: }
376: }
377:
378: /*******************************************************************************************************************
379: *
380: * Registers that the given {@link Message} is pending - this means it is in the recipient's queue, waiting to be
381: * consumed.
382: *
383: * @param message the {@code Message}
384: *
385: ******************************************************************************************************************/
386: public synchronized void registerPendingMessage (@Nonnull final Object message)
387: {
388: log.trace("registerPendingMessage({})", message);
389:
390: if (message.getClass().getAnnotation(Message.class).daemon())
391: {
392: pendingMessages.add(new IdentityWrapper(message));
393: notifyAll();
394: log();
395: }
396: }
397:
398: /*******************************************************************************************************************
399: *
400: * Registers that the given {@link Message} is no more pending.
401: *
402: * @param message the {@code Message}
403: *
404: ******************************************************************************************************************/
405: public synchronized void unregisterPendingMessage (@Nonnull final Object message)
406: {
407: log.trace("unregisterPendingMessage({})", message);
408:
409: if (message.getClass().getAnnotation(Message.class).daemon())
410: {
411: pendingMessages.remove(new IdentityWrapper(message));
412: notifyAll();
413: log();
414: eventuallySendCompletionMessage();
415: }
416: }
417:
418: /*******************************************************************************************************************
419: *
420: * If this {@link Collaboration} has been completed (that is, there are no more messages around for it), sends a
421: * {@link CollaborationCompletedMessage}.
422: *
423: ******************************************************************************************************************/
424: private void eventuallySendCompletionMessage()
425: {
426: final int enqueuedMessageCount = deliveringMessages.size()
427: + pendingMessages.size()
428: + runningThreads.size()
429: + suspensionTokens.size();
430:
431:• if (!completed && (enqueuedMessageCount == 0))
432: {
433: log.debug(">>>> sending completion message for {}", this);
434: completed = true;
435: THREAD_LOCAL.remove();
436: CollaborationCompletedMessage.forCollaboration(this).send();
437: }
438: }
439:
440: /*******************************************************************************************************************
441: *
442: *
443: *
444: ******************************************************************************************************************/
445: private void log() // FIXME: drop or move out of synchronized
446: {
447: // log.trace("{}: delivering messages: {}, pending messages: {}, running threads: {}, suspension tokens: {}",
448: // new Object[] {this, deliveringMessages.size(), pendingMessages.size(), runningThreads.size(), suspensionTokens.size()});
449: //
450: // if (pendingMessages.size() < 2)
451: // {
452: // log.trace(">>>> pending messages: {}", pendingMessages);
453: // }
454: }
455: }