1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.objectledge.threads;
29
30 import java.util.HashSet;
31 import java.util.Iterator;
32 import java.util.Set;
33
34 import org.jcontainer.dna.Configuration;
35 import org.jcontainer.dna.Logger;
36 import org.objectledge.context.Context;
37 import org.objectledge.pipeline.Valve;
38 import org.objectledge.threads.impl.Daemon;
39 import org.objectledge.threads.impl.WorkerPool;
40 import org.picocontainer.Startable;
41
42 /***
43 * Thread pool component.
44 *
45 * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski</a>
46 * @version $Id: ThreadPool.java,v 1.11 2006/02/08 18:24:45 zwierzem Exp $
47 */
48 public class ThreadPool
49 implements Startable
50 {
51
52
53 private static final String THREAD_GROUP_NAME = "Ledge ThreadPool";
54
55
56
57 /*** thread's processing context. */
58 private Context context;
59
60 /*** cleanup valve (possibly pipeline). */
61 private Valve cleanup;
62
63 /*** thread pool's thread group. */
64 private ThreadGroup threadGroup;
65
66 private int daemonPriority = Thread.MIN_PRIORITY;
67
68 private int workerPriority = Thread.MIN_PRIORITY;
69
70 private int workerPoolCapacity = 10;
71
72 private Set<Daemon> threads = new HashSet<Daemon>();
73
74 private Logger log;
75
76 private WorkerPool workerPool;
77
78 /***
79 * Component constructor.
80 *
81 * @param cleanup the valve that should be invoked every time the thread finishes it's work.
82 * @param context thread processing context.
83 * @param config the pool configuration.
84 * @param log the logger to use.
85 */
86 public ThreadPool(Valve cleanup, Context context, Configuration config, Logger log)
87 {
88 this.context = context;
89 this.cleanup = cleanup;
90 this.log = log;
91 if(config != null)
92 {
93 daemonPriority = config.getChild("daemon-priority").getValueAsInteger(daemonPriority);
94 workerPriority = config.getChild("worker-priority").getValueAsInteger(workerPriority);
95 workerPoolCapacity = config.getChild("worker-pool-capacity").
96 getValueAsInteger(workerPoolCapacity);
97 }
98
99 this.threadGroup = new ThreadGroup(THREAD_GROUP_NAME);
100 this.workerPool = new WorkerPool(workerPoolCapacity, workerPriority, threadGroup,
101 log, context, cleanup);
102 runDaemon(workerPool.getSchedulingTask());
103 }
104
105 /***
106 * Run the worker task.
107 *
108 * @param task the task to run.
109 */
110 public void runWorker(Task task)
111 {
112 workerPool.dispatch(task);
113 }
114
115 /***
116 * Run the daemon task.
117 *
118 * @param task the task to run.
119 */
120 public void runDaemon(Task task)
121 {
122 synchronized(threads)
123 {
124 threads.add(new Daemon(task, daemonPriority, threadGroup, log, context, cleanup));
125 }
126 }
127
128 /***
129 * {@inheritDoc}
130 */
131 public void start()
132 {
133
134 }
135
136 /***
137 * {@inheritDoc}
138 */
139 public void stop()
140 {
141 synchronized(threads)
142 {
143 Iterator i = threads.iterator();
144 while(i.hasNext())
145 {
146 Startable thread = (Startable)i.next();
147 thread.stop();
148 i.remove();
149 }
150 }
151 }
152 }