View Javadoc

1   // 
2   // Copyright (c) 2003-2005, Caltha - Gajda, Krzewski, Mach, Potempski Sp.J. 
3   // All rights reserved. 
4   // 
5   // Redistribution and use in source and binary forms, with or without modification,  
6   // are permitted provided that the following conditions are met: 
7   //  
8   // * Redistributions of source code must retain the above copyright notice,  
9   //	 this list of conditions and the following disclaimer. 
10  // * Redistributions in binary form must reproduce the above copyright notice,  
11  //	 this list of conditions and the following disclaimer in the documentation  
12  //	 and/or other materials provided with the distribution. 
13  // * Neither the name of the Caltha - Gajda, Krzewski, Mach, Potempski Sp.J.  
14  //	 nor the names of its contributors may be used to endorse or promote products  
15  //	 derived from this software without specific prior written permission. 
16  // 
17  // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  
18  // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED  
19  // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
20  // IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,  
21  // INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,  
22  // BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
23  // OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
24  // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)  
25  // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE  
26  // POSSIBILITY OF SUCH DAMAGE. 
27  // 
28  package org.objectledge.pipeline;
29  
30  import java.util.concurrent.Semaphore;
31  
32  import org.jcontainer.dna.Configuration;
33  import org.jcontainer.dna.ConfigurationException;
34  import org.objectledge.context.Context;
35  import org.objectledge.statistics.DataSource;
36  import org.objectledge.statistics.Graph;
37  import org.objectledge.statistics.ReflectiveStatisticsProvider;
38  
39  /***
40   * A valve that provides control over the number of threads executing another valve.
41   * 
42   * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski</a>
43   * @version $Id: ConcurrencyControlValve.java,v 1.8 2005/08/04 11:54:09 rafal Exp $
44   */
45  public class ConcurrencyControlValve
46      extends ReflectiveStatisticsProvider
47      implements Valve
48  {
49      private final Valve nestedValve;
50  
51      private final Semaphore semaphore;
52  
53      private final int limit;
54  
55      private volatile int threadCount = 0;
56  
57      /***
58       * Creates new ConcurrencyControlValve instance.
59       * 
60       * @param nestedValve the valve to control.
61       * @param limit the maximum number of threads allowed to execute, or 0 for unlimited.
62       */
63      public ConcurrencyControlValve(final Valve nestedValve, final int limit)
64      {
65          this.nestedValve = nestedValve;
66          this.limit = limit;
67          if(limit > 0)
68          {
69              semaphore = new Semaphore(limit, true);
70          }
71          else
72          {
73              semaphore = null;
74          }
75      }
76  
77      /***
78       * Creates a new ConcurrencyControlValve instance.
79       *
80       * @param nestedValve the valve to control.
81       * @param config the confguration object.
82       * @throws ConfigurationException if the configuration is incorrect.
83       */
84      public ConcurrencyControlValve(final Valve nestedValve, final Configuration config)
85          throws ConfigurationException
86      {
87          this(nestedValve, config.getChild("limit").getValueAsInteger());
88      }
89      
90      /***
91       * {@inheritDoc}
92       */
93      public void process(Context context)
94          throws ProcessingException
95      {
96          if(semaphore != null)
97          {
98              try
99              {
100                 semaphore.acquireUninterruptibly();
101                 nestedValve.process(context);
102             }
103             finally
104             {
105                 semaphore.release();
106             }
107         }
108         else
109         {
110             try
111             {
112                 threadCount++;
113                 nestedValve.process(context);
114             }
115             finally
116             {
117                 threadCount--;
118             }
119         }
120     }
121 
122     // statistics ///////////////////////////////////////////////////////////////////////////////
123 
124     /***
125      * {@inheritDoc}
126      */
127     public String getName()
128     {
129         return "Concurrency control";
130     }
131 
132     private static final DataSource[] DATA_SOURCES = {
133                     new DataSource("concurrency_threads_running", "Running threads",
134                         DataSource.Type.GAUGE, DataSource.Graph.LINE1),
135                     new DataSource("concurrency_threads_waiting", "Waiting threads",
136                         DataSource.Type.GAUGE, DataSource.Graph.LINE1) };
137 
138     private static final Graph[] GRAPHS = { new Graph("concurrency", "Execution concurrency", null,
139         DATA_SOURCES, "threads") };
140 
141     /***
142      * {@inheritDoc}
143      */
144     public Graph[] getGraphs()
145     {
146         return GRAPHS;
147     }
148 
149     /***
150      * {@inheritDoc}
151      */
152     public DataSource[] getDataSources()
153     {
154         return DATA_SOURCES;
155     }
156 
157     /***
158      * Returns the number of concurrently executing threads.
159      * 
160      * @return the number of concurrently executing threads.
161      */
162     public Number getConcurrencyThreadsRunning()
163     {
164         if(semaphore != null)
165         {
166             return new Integer(limit - semaphore.availablePermits());
167         }
168         else
169         {
170             return threadCount;
171         }
172     }
173 
174     /***
175      * Returns the number of threads waiting for execution.
176      * 
177      * @return the number of threads waiting for execution.
178      */
179     public Number getConcurrencyThreadsWaiting()
180     {
181         if(semaphore != null)
182         {
183             return new Integer(semaphore.getQueueLength());
184         }
185         else
186         {
187             return 0;
188         }
189     }
190 }