View Javadoc

1   // 
2   // Copyright (c) 2003, Caltha - Gajda, Krzewski, Mach, Potempski Sp.J. 
3   // All rights reserved. 
4   // 
5   // Redistribution and use in source and binary forms, with or without modification,  
6   // are permitted provided that the following conditions are met: 
7   //  
8   // * Redistributions of source code must retain the above copyright notice,  
9   //	 this list of conditions and the following disclaimer. 
10  // * Redistributions in binary form must reproduce the above copyright notice,  
11  //	 this list of conditions and the following disclaimer in the documentation  
12  //	 and/or other materials provided with the distribution. 
13  // * Neither the name of the Caltha - Gajda, Krzewski, Mach, Potempski Sp.J.  
14  //	 nor the names of its contributors may be used to endorse or promote products  
15  //	 derived from this software without specific prior written permission. 
16  // 
17  // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  
18  // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED  
19  // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
20  // IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,  
21  // INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,  
22  // BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
23  // OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
24  // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)  
25  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE  
26  // POSSIBILITY OF SUCH DAMAGE. 
27  // 
28  package org.objectledge.threads;
29  
30  import java.util.HashSet;
31  import java.util.Iterator;
32  import java.util.Set;
33  
34  import org.jcontainer.dna.Configuration;
35  import org.jcontainer.dna.Logger;
36  import org.objectledge.context.Context;
37  import org.objectledge.pipeline.Valve;
38  import org.objectledge.threads.impl.Daemon;
39  import org.objectledge.threads.impl.WorkerPool;
40  import org.picocontainer.Startable;
41  
42  /***
43   * Thread pool component.
44   * 
45   * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski</a>
46   * @version $Id: ThreadPool.java,v 1.11 2006/02/08 18:24:45 zwierzem Exp $
47   */
48  public class ThreadPool
49      implements Startable
50  {
51      // constants ////////////////////////////////////////////////////////////////////////////////
52      
53      private static final String THREAD_GROUP_NAME = "Ledge ThreadPool";
54      
55      // instance variables ///////////////////////////////////////////////////////////////////////
56      
57      /*** thread's processing context. */
58      private Context context;    
59      
60      /*** cleanup valve (possibly pipeline). */
61      private Valve cleanup;
62      
63      /*** thread pool's thread group. */
64      private ThreadGroup threadGroup;
65      
66      private int daemonPriority = Thread.MIN_PRIORITY;
67      
68      private int workerPriority = Thread.MIN_PRIORITY;
69      
70      private int workerPoolCapacity = 10;
71      
72      private Set<Daemon> threads = new HashSet<Daemon>();
73  
74      private Logger log;
75      
76      private WorkerPool workerPool;
77      
78      /***
79       * Component constructor.
80       * 
81       * @param cleanup the valve that should be invoked every time the thread finishes it's work.
82       * @param context thread processing context.
83       * @param config the pool configuration.
84       * @param log the logger to use.
85       */
86      public ThreadPool(Valve cleanup, Context context, Configuration config, Logger log)
87      {
88          this.context = context;
89          this.cleanup = cleanup;
90          this.log = log;
91          if(config != null)
92          {
93              daemonPriority = config.getChild("daemon-priority").getValueAsInteger(daemonPriority);
94              workerPriority = config.getChild("worker-priority").getValueAsInteger(workerPriority);
95              workerPoolCapacity = config.getChild("worker-pool-capacity").   
96                  getValueAsInteger(workerPoolCapacity);
97          }
98          
99          this.threadGroup = new ThreadGroup(THREAD_GROUP_NAME);
100         this.workerPool = new WorkerPool(workerPoolCapacity, workerPriority, threadGroup, 
101             log, context, cleanup);
102         runDaemon(workerPool.getSchedulingTask());
103     }
104     
105     /***
106      * Run the worker task.
107      * 
108      * @param task the task to run.
109      */
110     public void runWorker(Task task)
111     {
112         workerPool.dispatch(task);     
113     }
114     
115     /***
116      * Run the daemon task.
117      * 
118      * @param task the task to run.
119      */
120     public void runDaemon(Task task)
121     {
122         synchronized(threads)
123         {
124             threads.add(new Daemon(task, daemonPriority, threadGroup, log, context, cleanup));     
125         }
126     }
127 
128     /***
129      * {@inheritDoc}
130      */
131     public void start()
132     {
133         // I wish Startable interface was split
134     }
135     
136     /***
137      * {@inheritDoc}
138      */
139     public void stop()
140     {
141         synchronized(threads)
142         {
143             Iterator i = threads.iterator();
144             while(i.hasNext())
145             {
146                 Startable thread = (Startable)i.next();
147                 thread.stop();
148                 i.remove();
149             }
150         }
151     }
152 }