Skip to contentMethod: suspend()
1: /*
2: * *************************************************************************************************************************************************************
3: *
4: * TheseFoolishThings: Miscellaneous utilities
5: * http://tidalwave.it/projects/thesefoolishthings
6: *
7: * Copyright (C) 2009 - 2025 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *************************************************************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
12: * You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
17: * CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
18: *
19: * *************************************************************************************************************************************************************
20: *
21: * git clone https://bitbucket.org/tidalwave/thesefoolishthings-src
22: * git clone https://github.com/tidalwave-it/thesefoolishthings-src
23: *
24: * *************************************************************************************************************************************************************
25: */
26: package it.tidalwave.actor.impl;
27:
28: import javax.annotation.Nonnegative;
29: import jakarta.annotation.Nonnull;
30: import java.time.Duration;
31: import java.time.ZonedDateTime;
32: import java.util.ArrayList;
33: import java.util.List;
34: import java.util.UUID;
35: import java.io.Serializable;
36: import it.tidalwave.actor.Collaboration;
37: import it.tidalwave.actor.CollaborationCompletedMessage;
38: import it.tidalwave.actor.CollaborationStartedMessage;
39: import it.tidalwave.actor.annotation.Message;
40: import lombok.EqualsAndHashCode;
41: import lombok.Getter;
42: import lombok.RequiredArgsConstructor;
43: import lombok.ToString;
44: import lombok.extern.slf4j.Slf4j;
45: import static lombok.AccessLevel.PRIVATE;
46:
47: /***************************************************************************************************************************************************************
48: *
49: * @author Fabrizio Giudici
50: *
51: **************************************************************************************************************************************************************/
52: @RequiredArgsConstructor(access = PRIVATE) @EqualsAndHashCode(of = "id") @Slf4j @ToString(of = "id")
53: public class DefaultCollaboration implements Serializable, Collaboration
54: {
55: /***********************************************************************************************************************************************************
56: *
57: **********************************************************************************************************************************************************/
58: private static final DefaultCollaboration NULL_DEFAULT_COLLABORATION = new DefaultCollaboration(new Object())
59: {
60: @Override
61: public void bindToCurrentThread()
62: {
63: }
64:
65: @Override
66: public void unbindFromCurrentThread()
67: {
68: }
69: };
70:
71: /***********************************************************************************************************************************************************
72: *
73: **********************************************************************************************************************************************************/
74: @RequiredArgsConstructor @ToString
75: private static class IdentityWrapper
76: {
77: @Nonnull
78: private final Object object;
79:
80: @Override
81: public boolean equals (final Object object)
82: {
83: if ((object == null) || (getClass() != object.getClass()))
84: {
85: return false;
86: }
87:
88: final var other = (IdentityWrapper)object;
89: return this.object == other.object;
90: }
91:
92: @Override
93: public int hashCode()
94: {
95: return object.hashCode();
96: }
97: }
98:
99: private static final ThreadLocal<DefaultCollaboration> THREAD_LOCAL = new ThreadLocal<>();
100:
101: private final UUID id = UUID.randomUUID();
102:
103: @Nonnull @Getter
104: private final Object originatingMessage;
105:
106: private final ZonedDateTime startTime = ZonedDateTime.now();
107:
108: @Getter
109: private boolean completed;
110:
111: private final List<Object> suspensionTokens = new ArrayList<>();
112:
113: /** List of threads currently working for this Collaboration. */
114: // No need for being weak, since objects are explicitly removed
115: private final List<Thread> runningThreads = new ArrayList<>();
116:
117: /** List of messages currently being delivered as part of this Collaboration. */
118: // No need for being weak, since objects are explicitly removed
119: private final List<Object> deliveringMessages = new ArrayList<>();
120:
121: /** List of messages pending to be consumed as part of this Collaboration. */
122: // No need for being weak, since objects are explicitly removed
123: private final List<IdentityWrapper> pendingMessages = new ArrayList<>();
124:
125: private boolean collaborationStartedMessageSent = false;
126:
127: /***********************************************************************************************************************************************************
128: * Factory method to retrieve a {@link Collaboration}. This method accepts any object; if it is an implementation
129: * of {@link Provider}, the object is queried; otherwise {@link #NULL_DEFAULT_COLLABORATION} is returned.
130: *
131: * @param object the object associated to the {@code Collaboration}
132: * @return the {@code Collaboration}
133: **********************************************************************************************************************************************************/
134: @Nonnull
135: public static DefaultCollaboration getCollaboration (@Nonnull final Object object)
136: {
137: return (object instanceof Provider) ? (DefaultCollaboration)((Provider)object).getCollaboration()
138: : NULL_DEFAULT_COLLABORATION;
139: }
140:
141: /***********************************************************************************************************************************************************
142: * Gets the {@link Collaboration} bound to the current thread or creates a new one.
143: *
144: * @param originator the object that will be considered the originator of the {@code Collaboration} in case it
145: * is created
146: * @return the {@code Collaboration}
147: **********************************************************************************************************************************************************/
148: @Nonnull
149: public static DefaultCollaboration getOrCreateCollaboration (@Nonnull final Object originator)
150: {
151: var collaboration = THREAD_LOCAL.get();
152:
153: if (collaboration == null)
154: {
155: collaboration = new DefaultCollaboration(originator);
156: }
157:
158: return collaboration;
159: }
160:
161: /***********************************************************************************************************************************************************
162: * {@inheritDoc}
163: **********************************************************************************************************************************************************/
164: @Override @Nonnull
165: public ZonedDateTime getStartTime()
166: {
167: return startTime;
168: }
169:
170: /***********************************************************************************************************************************************************
171: * {@inheritDoc}
172: **********************************************************************************************************************************************************/
173: @Override @Nonnull
174: public Duration getDuration()
175: {
176: return Duration.between(startTime, ZonedDateTime.now());
177: }
178:
179: /***********************************************************************************************************************************************************
180: * {@inheritDoc}
181: **********************************************************************************************************************************************************/
182: @Override
183: public synchronized void waitForCompletion()
184: throws InterruptedException
185: {
186: while (!isCompleted())
187: {
188: wait();
189: }
190: }
191:
192: /***********************************************************************************************************************************************************
193: * {@inheritDoc}
194: **********************************************************************************************************************************************************/
195: @Override @Nonnegative
196: public synchronized int getDeliveringMessagesCount()
197: {
198: return deliveringMessages.size();
199: }
200:
201: /***********************************************************************************************************************************************************
202: * {@inheritDoc}
203: **********************************************************************************************************************************************************/
204: @Override @Nonnegative
205: public synchronized int getPendingMessagesCount()
206: {
207: return pendingMessages.size();
208: }
209:
210: /***********************************************************************************************************************************************************
211: * {@inheritDoc}
212: **********************************************************************************************************************************************************/
213: @Override @Nonnegative
214: public synchronized int getRunningThreadsCount()
215: {
216: return runningThreads.size();
217: }
218:
219: /***********************************************************************************************************************************************************
220: * {@inheritDoc}
221: **********************************************************************************************************************************************************/
222: @Override
223: public void resume (@Nonnull final Object suspensionToken, @Nonnull final Runnable runnable)
224: {
225: try
226: {
227: bindToCurrentThread();
228: runnable.run();
229: suspensionTokens.remove(suspensionToken);
230: }
231: finally
232: {
233: unbindFromCurrentThread();
234: }
235: }
236:
237: /***********************************************************************************************************************************************************
238: * {@inheritDoc}
239: **********************************************************************************************************************************************************/
240: @Override
241: public synchronized void resumeAndDie (@Nonnull final Object suspensionToken)
242: {
243: resume(suspensionToken, () -> {});
244: }
245:
246: /***********************************************************************************************************************************************************
247: * {@inheritDoc}
248: **********************************************************************************************************************************************************/
249: @Override
250: public synchronized Object suspend()
251: {
252: final Object suspensionToken = UUID.randomUUID();
253: suspensionTokens.add(suspensionToken);
254: return suspensionToken;
255: }
256:
257: /***********************************************************************************************************************************************************
258: * {@inheritDoc}
259: **********************************************************************************************************************************************************/
260: @Override
261: public synchronized boolean isSuspended()
262: {
263: return !suspensionTokens.isEmpty();
264: }
265:
266: /***********************************************************************************************************************************************************
267: *
268: **********************************************************************************************************************************************************/
269: public synchronized void bindToCurrentThread()
270: {
271: log.trace("bindToCurrentThread()");
272: THREAD_LOCAL.set(this);
273: runningThreads.add(Thread.currentThread());
274: notifyAll();
275: log();
276: }
277:
278: /***********************************************************************************************************************************************************
279: *
280: **********************************************************************************************************************************************************/
281: public synchronized void unbindFromCurrentThread()
282: {
283: log.trace("unbindFromCurrentThread()");
284: runningThreads.remove(Thread.currentThread());
285: THREAD_LOCAL.remove();
286: notifyAll();
287: log();
288: eventuallySendCompletionMessage();
289: }
290:
291: /***********************************************************************************************************************************************************
292: * Registers that the given {@link Message} is being delivered.
293: *
294: * @param message the {@code Message}
295: **********************************************************************************************************************************************************/
296: public synchronized void registerDeliveringMessage (@Nonnull final Object message)
297: {
298: log.trace("registerDeliveringMessage({})", message);
299:
300: final var annotation = message.getClass().getAnnotation(Message.class);
301:
302: if (annotation == null)
303: {
304: throw new IllegalArgumentException("Message must be annotated with @Message: " + message.getClass());
305: }
306:
307: if (annotation.daemon())
308: {
309: deliveringMessages.add(message);
310:
311: // Do this *after* enlisting message in deliveringMessages
312: if (!collaborationStartedMessageSent && !(message instanceof CollaborationStartedMessage))
313: {
314: CollaborationStartedMessage.forCollaboration(this).send();
315: collaborationStartedMessageSent = true;
316: }
317:
318: notifyAll();
319: log();
320: }
321: }
322:
323: /***********************************************************************************************************************************************************
324: * Registers that the given {@link Message} is no more being delivered.
325: *
326: * @param message the {@code Message}
327: **********************************************************************************************************************************************************/
328: public synchronized void unregisterDeliveringMessage (@Nonnull final Object message)
329: {
330: log.trace("unregisterDeliveringMessage({})", message);
331:
332: if (message.getClass().getAnnotation(Message.class).daemon())
333: {
334: deliveringMessages.remove(message);
335: notifyAll();
336: log();
337: eventuallySendCompletionMessage();
338: }
339: }
340:
341: /***********************************************************************************************************************************************************
342: * Registers that the given {@link Message} is pending - this means it is in the recipient's queue, waiting to be
343: * consumed.
344: *
345: * @param message the {@code Message}
346: **********************************************************************************************************************************************************/
347: public synchronized void registerPendingMessage (@Nonnull final Object message)
348: {
349: log.trace("registerPendingMessage({})", message);
350:
351: if (message.getClass().getAnnotation(Message.class).daemon())
352: {
353: pendingMessages.add(new IdentityWrapper(message));
354: notifyAll();
355: log();
356: }
357: }
358:
359: /***********************************************************************************************************************************************************
360: * Registers that the given {@link Message} is no more pending.
361: *
362: * @param message the {@code Message}
363: **********************************************************************************************************************************************************/
364: public synchronized void unregisterPendingMessage (@Nonnull final Object message)
365: {
366: log.trace("unregisterPendingMessage({})", message);
367:
368: if (message.getClass().getAnnotation(Message.class).daemon())
369: {
370: pendingMessages.remove(new IdentityWrapper(message));
371: notifyAll();
372: log();
373: eventuallySendCompletionMessage();
374: }
375: }
376:
377: /***********************************************************************************************************************************************************
378: * If this {@link Collaboration} has been completed (that is, there are no more messages around for it), sends a
379: * {@link CollaborationCompletedMessage}.
380: **********************************************************************************************************************************************************/
381: private void eventuallySendCompletionMessage()
382: {
383: final var enqueuedMessageCount = deliveringMessages.size()
384: + pendingMessages.size()
385: + runningThreads.size()
386: + suspensionTokens.size();
387:
388: if (!completed && (enqueuedMessageCount == 0))
389: {
390: log.debug(">>>> sending completion message for {}", this);
391: completed = true;
392: THREAD_LOCAL.remove();
393: CollaborationCompletedMessage.forCollaboration(this).send();
394: }
395: }
396:
397: /***********************************************************************************************************************************************************
398: *
399: **********************************************************************************************************************************************************/
400: private void log() // FIXME: drop or move out of synchronized
401: {
402: // log.trace("{}: delivering messages: {}, pending messages: {}, running threads: {}, suspension tokens: {}",
403: // new Object[] {this, deliveringMessages.size(), pendingMessages.size(), runningThreads.size(), suspensionTokens.size()});
404: //
405: // if (pendingMessages.size() < 2)
406: // {
407: // log.trace(">>>> pending messages: {}", pendingMessages);
408: // }
409: }
410: }