1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.objectledge.pipeline;
29
30 import java.util.concurrent.Semaphore;
31
32 import org.jcontainer.dna.Configuration;
33 import org.jcontainer.dna.ConfigurationException;
34 import org.objectledge.context.Context;
35 import org.objectledge.statistics.DataSource;
36 import org.objectledge.statistics.Graph;
37 import org.objectledge.statistics.ReflectiveStatisticsProvider;
38
39 /***
40 * A valve that provides control over the number of threads executing another valve.
41 *
42 * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski</a>
43 * @version $Id: ConcurrencyControlValve.java,v 1.8 2005/08/04 11:54:09 rafal Exp $
44 */
45 public class ConcurrencyControlValve
46 extends ReflectiveStatisticsProvider
47 implements Valve
48 {
49 private final Valve nestedValve;
50
51 private final Semaphore semaphore;
52
53 private final int limit;
54
55 private volatile int threadCount = 0;
56
57 /***
58 * Creates new ConcurrencyControlValve instance.
59 *
60 * @param nestedValve the valve to control.
61 * @param limit the maximum number of threads allowed to execute, or 0 for unlimited.
62 */
63 public ConcurrencyControlValve(final Valve nestedValve, final int limit)
64 {
65 this.nestedValve = nestedValve;
66 this.limit = limit;
67 if(limit > 0)
68 {
69 semaphore = new Semaphore(limit, true);
70 }
71 else
72 {
73 semaphore = null;
74 }
75 }
76
77 /***
78 * Creates a new ConcurrencyControlValve instance.
79 *
80 * @param nestedValve the valve to control.
81 * @param config the confguration object.
82 * @throws ConfigurationException if the configuration is incorrect.
83 */
84 public ConcurrencyControlValve(final Valve nestedValve, final Configuration config)
85 throws ConfigurationException
86 {
87 this(nestedValve, config.getChild("limit").getValueAsInteger());
88 }
89
90 /***
91 * {@inheritDoc}
92 */
93 public void process(Context context)
94 throws ProcessingException
95 {
96 if(semaphore != null)
97 {
98 try
99 {
100 semaphore.acquireUninterruptibly();
101 nestedValve.process(context);
102 }
103 finally
104 {
105 semaphore.release();
106 }
107 }
108 else
109 {
110 try
111 {
112 threadCount++;
113 nestedValve.process(context);
114 }
115 finally
116 {
117 threadCount--;
118 }
119 }
120 }
121
122
123
124 /***
125 * {@inheritDoc}
126 */
127 public String getName()
128 {
129 return "Concurrency control";
130 }
131
132 private static final DataSource[] DATA_SOURCES = {
133 new DataSource("concurrency_threads_running", "Running threads",
134 DataSource.Type.GAUGE, DataSource.Graph.LINE1),
135 new DataSource("concurrency_threads_waiting", "Waiting threads",
136 DataSource.Type.GAUGE, DataSource.Graph.LINE1) };
137
138 private static final Graph[] GRAPHS = { new Graph("concurrency", "Execution concurrency", null,
139 DATA_SOURCES, "threads") };
140
141 /***
142 * {@inheritDoc}
143 */
144 public Graph[] getGraphs()
145 {
146 return GRAPHS;
147 }
148
149 /***
150 * {@inheritDoc}
151 */
152 public DataSource[] getDataSources()
153 {
154 return DATA_SOURCES;
155 }
156
157 /***
158 * Returns the number of concurrently executing threads.
159 *
160 * @return the number of concurrently executing threads.
161 */
162 public Number getConcurrencyThreadsRunning()
163 {
164 if(semaphore != null)
165 {
166 return new Integer(limit - semaphore.availablePermits());
167 }
168 else
169 {
170 return threadCount;
171 }
172 }
173
174 /***
175 * Returns the number of threads waiting for execution.
176 *
177 * @return the number of threads waiting for execution.
178 */
179 public Number getConcurrencyThreadsWaiting()
180 {
181 if(semaphore != null)
182 {
183 return new Integer(semaphore.getQueueLength());
184 }
185 else
186 {
187 return 0;
188 }
189 }
190 }