Skip to content

Method: post(ImagingTask)

1: /*
2: * *********************************************************************************************************************
3: *
4: * Mistral: open source imaging engine
5: * http://tidalwave.it/projects/mistral
6: *
7: * Copyright (C) 2003 - 2023 by Tidalwave s.a.s. (http://tidalwave.it)
8: *
9: * *********************************************************************************************************************
10: *
11: * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
12: * the License. You may obtain a copy of the License at
13: *
14: * http://www.apache.org/licenses/LICENSE-2.0
15: *
16: * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
17: * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
18: * specific language governing permissions and limitations under the License.
19: *
20: * *********************************************************************************************************************
21: *
22: * git clone https://bitbucket.org/tidalwave/mistral-src
23: * git clone https://github.com/tidalwave-it/mistral-src
24: *
25: * *********************************************************************************************************************
26: */
27: package it.tidalwave.image.processor;
28:
29: import java.util.ArrayList;
30: import java.util.Collection;
31: import java.util.Collections;
32: import java.util.Iterator;
33: import java.util.List;
34: import java.util.NoSuchElementException;
35: import java.io.Serializable;
36: import it.tidalwave.image.processor.event.ImagingTaskProcessorListener;
37: import lombok.extern.slf4j.Slf4j;
38:
39: /***********************************************************************************************************************
40: *
41: * @author Fabrizio Giudici
42: *
43: **********************************************************************************************************************/
44: @Slf4j
45: public abstract class ImagingTaskProcessor // NOT Serializable
46: {
47: /**
48: * The singleton instance.
49: */
50: private static ImagingTaskProcessor instance;
51:
52: /**
53: * The class to be used for the concrete implementation.
54: */
55: private static Class<? extends ImagingTaskProcessor> defaultClass = LocalImagingTaskProcessor.class;
56:
57: private static int eventNotifierCounter;
58:
59: /**
60: * The max number of workers.
61: */
62: protected static int maxWorkers = Integer.MAX_VALUE;
63:
64: /**
65: * The current number of free workers.
66: */
67: protected int freeWorkers;
68:
69: /**
70: * A lock used to manipulate the inner state.
71: */
72: protected final Object lock = new Object();
73:
74: /**
75: * The list of pending tasks.
76: */
77: private final List<ImagingTask> pendingTasks = Collections.synchronizedList(new ArrayList<>());
78:
79: /**
80: * The list of running tasks.
81: */
82: private final List<ImagingTask> runningTasks = Collections.synchronizedList(new ArrayList<>());
83:
84: /**
85: * The list of completed tasks.
86: */
87: private final List<ImagingTask> completedTasks = Collections.synchronizedList(new ArrayList<>());
88:
89: private final Statistics statistics = new Statistics();
90:
91: private final ImagingTaskProcessorEventManager eventManager = new ImagingTaskProcessorEventManager(this);
92:
93: /*******************************************************************************************************************
94: *
95: *
96: *
97: ******************************************************************************************************************/
98: protected ImagingTaskProcessor()
99: {
100: }
101:
102: /*******************************************************************************************************************
103: *
104: * Gets the singleton instance of the processor.
105: *
106: * @return the ImagingTaskProcessor
107: *
108: ******************************************************************************************************************/
109: // change getInstance()->getDefault() since this is no more a singleton
110: public static synchronized ImagingTaskProcessor getInstance()
111: {
112: if (instance == null)
113: {
114: try
115: {
116: instance = defaultClass.newInstance();
117: }
118: catch (Exception e)
119: {
120: throw new RuntimeException(e); // FIXME
121: }
122: }
123:
124: return instance;
125: }
126:
127: /*******************************************************************************************************************
128: *
129: * Sets the default implementation of the ImagingTaskProcessor
130: *
131: * @param defaultClass the implementation class
132: *
133: ******************************************************************************************************************/
134: public static void setDefault (final Class<? extends ImagingTaskProcessor> defaultClass)
135: {
136: ImagingTaskProcessor.defaultClass = defaultClass;
137: instance = null; // force a creation of new instance next time
138: }
139:
140: /*******************************************************************************************************************
141: *
142: * Sets a limit to the number of concurrent workers.
143: *
144: * @param maxWorkers the max. number of workers
145: *
146: ******************************************************************************************************************/
147: public static void setMaxWorkers (final int maxWorkers)
148: {
149: ImagingTaskProcessor.maxWorkers = maxWorkers;
150: }
151:
152: /*******************************************************************************************************************
153: *
154: *
155: ******************************************************************************************************************/
156: public int getMaxWorkers()
157: {
158: return maxWorkers;
159: }
160:
161: /*******************************************************************************************************************
162: *
163: *
164: ******************************************************************************************************************/
165: public abstract int getWorkerCount();
166:
167: /*******************************************************************************************************************
168: *
169: *
170: ******************************************************************************************************************/
171: public abstract Collection<Serializable> getWorkerIds();
172:
173: /*******************************************************************************************************************
174: *
175: * Returns true if the tasks will be executed in a distributed context
176: * (i.e. with different physical nodes - in single-node, multi-core contexts
177: * this method returns false.
178: *
179: ******************************************************************************************************************/
180: public abstract boolean isDistributed();
181:
182: /*******************************************************************************************************************
183: *
184: * Returns true if the tasks will be executed in a distributed context
185: * with the support of a distributed file system - that is, if each task
186: * is guaranteed to have a Path access to the same set of data. Or if
187: * the context is a single-node, multi-core system, which of course has a
188: * single filesystem.
189: *
190: ******************************************************************************************************************/
191: public abstract boolean hasFileAccess();
192:
193: /*******************************************************************************************************************
194: *
195: *
196: ******************************************************************************************************************/
197: public void addListener (final ImagingTaskProcessorListener listener)
198: {
199: eventManager.addListener(listener);
200: }
201:
202: /*******************************************************************************************************************
203: *
204: *
205: ******************************************************************************************************************/
206: public void removeListener (final ImagingTaskProcessorListener listener)
207: {
208: eventManager.removeListener(listener);
209: }
210:
211: /*******************************************************************************************************************
212: *
213: *
214: *
215: ******************************************************************************************************************/
216: public boolean processingResourcesAvailable()
217: {
218: synchronized (lock)
219: {
220: return freeWorkers > 0;
221: }
222: }
223:
224: /*******************************************************************************************************************
225: *
226: * Posts a new task into the queue. The task will be scheduled later as soon
227: * there are favorable conditions. This method returns immediately.
228: *
229: * @param task the ImagingTask to post
230: *
231: ******************************************************************************************************************/
232: public void post (final Collection<? extends ImagingTask> tasks)
233: {
234: log.info(String.format("post(%s) - free workers: %d", tasks, freeWorkers));
235:
236: synchronized (lock)
237: {
238: pendingTasks.addAll(tasks);
239: lock.notify();
240:
241: for (final ImagingTask task : tasks)
242: {
243: log.info(String.format(">>>> %s added to pending task list", task.getName()));
244: eventManager.fireNotifyTaskPosted(task);
245: }
246: }
247: }
248:
249: /*******************************************************************************************************************
250: *
251: * Posts a new task into the queue. The task will be scheduled later as soon
252: * there are favorable conditions. This method returns immediately.
253: *
254: * @param task the ImagingTask to post
255: *
256: ******************************************************************************************************************/
257: public void post (final ImagingTask task)
258: {
259: post(Collections.singletonList(task));
260: }
261:
262: /*******************************************************************************************************************
263: *
264: * @param task the ImagingTask to post
265: *
266: ******************************************************************************************************************/
267: public void postWithPriority (final Collection<? extends ImagingTask> tasks)
268: {
269: log.info(String.format("postWithPriority(%s) - free workers: %d", tasks, freeWorkers));
270:
271: synchronized (lock)
272: {
273: pendingTasks.addAll(0, tasks);
274: lock.notify();
275:
276: for (final ImagingTask task : tasks)
277: {
278: log.info(String.format(">>>> %s added to pending task list", task.getName()));
279: eventManager.fireNotifyTaskPosted(task);
280: }
281: }
282: }
283:
284: /*******************************************************************************************************************
285: *
286: * @param task the ImagingTasks to post
287: *
288: ******************************************************************************************************************/
289: public void postWithPriority (final ImagingTask task)
290: {
291: postWithPriority(Collections.singletonList(task));
292: }
293:
294: /*******************************************************************************************************************
295: *
296: * Cancels all the pending tasks of the given type. This method returns
297: * immediately. Tasks in running state will be completed.
298: *
299: * @param taskClass the kind of task to remove (null means all)
300: * @return the list of completed tasks
301: *
302: ******************************************************************************************************************/
303: public final <T extends ImagingTask> Collection<T> cancellPendingTasks (final Class<T> taskClass)
304: {
305: log.info("cancellPendingTasks(" + taskClass.getName() + ")");
306: final List<T> result = new ArrayList<>();
307:
308: synchronized (lock)
309: {
310: for (final Iterator<? extends ImagingTask> i = pendingTasks.iterator(); i.hasNext(); )
311: {
312: final var task = i.next();
313: // if ((taskClass == null) || taskClass.equals(task.getClass()))
314: if ((taskClass == null) || taskClass.getName().equals(task.getClass().getName()))
315: {
316: i.remove();
317: result.add(taskClass.cast(task));
318: }
319: }
320: }
321:
322: return result;
323: }
324:
325: /*******************************************************************************************************************
326: *
327: * Returns the number of pending tasks of the given class.
328: *
329: * @param taskClass the class of the task (null for any class)
330: * @return the task count
331: *
332: ******************************************************************************************************************/
333: public int getPendingTaskCount (final Class<? extends ImagingTask> taskClass)
334: {
335: return getTaskCount(taskClass, pendingTasks);
336: }
337:
338: /*******************************************************************************************************************
339: *
340: * Returns the number of running tasks of the given class.
341: *
342: * @param taskClass the class of the task (null for any class)
343: * @return the task count
344: *
345: ******************************************************************************************************************/
346: public int getRunningTaskCount (final Class<? extends ImagingTask> taskClass)
347: {
348: return getTaskCount(taskClass, runningTasks);
349: }
350:
351: /*******************************************************************************************************************
352: *
353: * Returns the number of completed tasks of the given class. This number is
354: * decreased as popCompletedTask() is called.
355: *
356: * @param taskClass the class of the task (null for any class)
357: * @return the task count
358: *
359: ******************************************************************************************************************/
360: public int getCompletedTaskCount (final Class<? extends ImagingTask> taskClass)
361: {
362: return getTaskCount(taskClass, completedTasks);
363: }
364:
365: /*******************************************************************************************************************
366: *
367: * Pops a completed task out of the list. This will decrease the count of
368: * completed task. This operation is usually performed by the controller to
369: * consume the task result and eventually schedule a new task.
370: *
371: * @param taskClass the class of the task
372: * @return the completed task
373: * @throws NoSuchElementException if no tasks of the given class
374: * is available
375: *
376: ******************************************************************************************************************/
377: @SuppressWarnings("unchecked")
378: public <T extends ImagingTask> T popCompletedTask (final Class<T> taskClass)
379: {
380: synchronized (lock)
381: {
382: for (final var task : completedTasks)
383: {
384: // if ((taskClass == null) || taskClass.equals(task.getClass()))
385: if ((taskClass == null) || taskClass.getName().equals(task.getClass().getName()))
386: {
387: completedTasks.remove(task);
388: eventManager.fireNotifyTaskPopped(task);
389: return (T)task;
390: }
391: }
392: }
393:
394: throw new NoSuchElementException("No completed task of class " + taskClass);
395: }
396:
397: /*******************************************************************************************************************
398: *
399: *
400: *
401: ******************************************************************************************************************/
402: public Statistics getStatistics()
403: {
404: return statistics;
405: }
406:
407: /*******************************************************************************************************************
408: *
409: *
410: *
411: ******************************************************************************************************************/
412: // protected boolean canScheduleMore()
413: // {
414: // return freeWorkers > 0;
415: // }
416:
417: /*******************************************************************************************************************
418: *
419: *
420: *
421: ******************************************************************************************************************/
422: protected void changeFreeWorkerCount (final int delta)
423: {
424: synchronized (lock)
425: {
426: freeWorkers += delta;
427: lock.notify();
428: }
429: }
430:
431: /*******************************************************************************************************************
432: *
433: * Returns the next pending task to execute. This method blocks until a
434: * pending task is available.
435: *
436: * @return the task to execute
437: *
438: ******************************************************************************************************************/
439: protected final ImagingTask getNextTask (final Serializable workerId, final boolean remoteExecution)
440: {
441: ImagingTask task = null;
442:
443: while (task == null)
444: {
445: synchronized (lock)
446: {
447: while (pendingTasks.isEmpty())
448: {
449: try
450: {
451: lock.wait();
452: }
453: catch (InterruptedException e)
454: {
455: return null; // FIXME: better to relaunch InterruptedException
456: }
457: }
458:
459: for (final var i = pendingTasks.iterator(); i.hasNext(); )
460: {
461: final var task2 = i.next();
462:
463: if (!remoteExecution || task2.isRemoteExecutionOk())
464: {
465: i.remove();
466: task = task2;
467: break;
468: }
469: }
470: }
471:
472: if (task != null)
473: {
474: synchronized (lock)
475: {
476: runningTasks.add(task);
477: }
478:
479: eventManager.fireNotifyTaskStarted(task, workerId);
480: }
481:
482: else
483: {
484: try
485: {
486: Thread.sleep(100); // Don't go CPU-bound
487: }
488: catch (InterruptedException e)
489: {
490: // FIXME: better to relaunch InterruptedException
491: }
492: }
493: }
494:
495: return task;
496: }
497:
498: /*******************************************************************************************************************
499: *
500: * <p>Notifies that a task has been completed. The task is removed from the
501: * list of running tasks and added into the list of completed tasks. If
502: * an ImagingTaskProcessorController has been specified, it is notified too.
503: * </p>
504: *
505: * <p>This method is able to filter out duplicated tasks (that could occur
506: * with some distributed implementations.</p>
507: *
508: * <p>It is important that this method blocks until the controller has been
509: * invoked - otherwise e.g. the Computer Server implementation could fail,
510: * for instance if there are no running tasks and the controller has not
511: * been able to post new tasks yet. In this circumstances, the facility
512: * erroneously thinks that everything is over and quits.</p>
513: *
514: * @param task the completed task
515: *
516: ******************************************************************************************************************/
517: protected final void notifyTaskCompleted (final ImagingTask task)
518: {
519: log.info("notifyTaskCompleted(" + task + ")");
520: var duplicate = false;
521:
522: synchronized (lock)
523: {
524: duplicate = !runningTasks.contains(task);
525:
526: if (!duplicate)
527: {
528: runningTasks.remove(task);
529: completedTasks.add(task);
530: }
531: }
532:
533: if (duplicate)
534: {
535: log.warn("Filtering out duplicated task: " + task);
536: }
537:
538: else
539: {
540: statistics.merge(task.getStatistics());
541: eventManager.fireNotifyTaskCompleted(task);
542: }
543: }
544:
545: /*******************************************************************************************************************
546: *
547: *
548: *
549: ******************************************************************************************************************/
550: private int getTaskCount (final Class taskClass, final Collection<? extends ImagingTask> taskCollection)
551: {
552: var count = 0;
553:
554: synchronized (lock) // implement in a smarter way
555: {
556: for (final ImagingTask task : taskCollection)
557: {
558: // if ((taskClass == null) || taskClass.equals(task.getClass()))
559: if ((taskClass == null) || taskClass.getName().equals(task.getClass().getName()))
560: {
561: count++;
562: }
563: }
564: }
565:
566: return count;
567: }
568: }