View Javadoc

1   /*
2    * Copyright 2000-2001,2004 The Apache Software Foundation.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.apache.jetspeed.services.threadpool;
18  
19  // Java Stuff
20  import java.util.*;
21  import javax.servlet.ServletConfig;
22  
23  // Turbine Stuff
24  import org.apache.turbine.services.TurbineBaseService;
25  
26  // Jetspeed classes
27  import org.apache.jetspeed.services.logging.JetspeedLogFactoryService;
28  import org.apache.jetspeed.services.logging.JetspeedLogger;
29  
30  /***
31   * This is a Service that provides a simple threadpool usable by all   
32   * thread intensive classes in order to optimize resources utilization 
33   * screen:<br>
34   *
35   * <p>It uses 3 parameters for contolling resource usage:
36   *   <dl>
37   *     <dt>init.count</dt>
38   *     <dd>The number of threads to start at initizaliation</dd>
39   *     <dt>max.count</dt>
40   *     <dd>The maximum number of threads started by this service</dd>
41   *     <dt>minspare.count</dt>
42   *     <dd>The pool tries to keep lways this minimum number if threads
43   *      available</dd>
44   *   </dl>
45   * </p>
46   *
47   * @author <a href="mailto:burton@apache.org">Kevin A. Burton</a>
48   * @author <a href="mailto:raphael@apache.org">Raphaël Luta</a>
49   * @author <a href="mailto:sgala@hisitech.com">Santiago Gala</a>
50   * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
51   */
52  public class JetspeedThreadPoolService
53      extends TurbineBaseService
54      implements ThreadPoolService
55  {
56      /***
57       * Static initialization of the logger for this class
58       */    
59      protected static final JetspeedLogger logger = JetspeedLogFactoryService.getLogger(JetspeedThreadPoolService.class.getName());
60      
61      /***
62       * The number of threads to create on initialization
63       */
64      private int initThreads = 50;
65  
66      /***
67       * The maximum number of threads that should ever be created.
68       */
69      private int maxThreads = 100;
70  
71      /***
72       * The minimum amount of threads that should always be available
73       */
74      private int minSpareThreads = 15;
75  
76      /***
77       * The default priority to use when creating new threads.
78       */
79      public static final int DEFAULT_THREAD_PRIORITY = Thread.MIN_PRIORITY;
80  
81      /***
82       * Stores threads that are available within the pool.
83       */    
84      private Vector availableThreads = new Vector();
85  
86      
87      /***
88       * The thread group used for all created threads.
89       */
90      private ThreadGroup tg = new ThreadGroup("JetspeedThreadPoolService");
91      
92      /***
93       * Create a new queue for adding Runnable objects to.
94       */
95      private Queue queue = new Queue();
96  
97      /***
98       * Holds the total number of threads that have ever been processed.
99       */
100     private int count = 0;
101 
102 
103     /***
104      * Constructor.
105      *
106      * @exception Exception, a generic exception.
107      */
108     public JetspeedThreadPoolService()
109         throws Exception
110     {
111     }
112 
113 
114     /***
115      * Late init. Don't return control until early init says we're done.
116      */
117     public void init( )
118     {
119         while( !getInit() ) {
120             try {
121                 Thread.sleep(500);
122             } catch (InterruptedException ie ) {
123                 logger.info("ThreadPool service: Waiting for init()..." );
124             }
125         }
126     }
127 
128     /***
129      * Called during Turbine.init()
130      *
131      * @param config A ServletConfig.
132      */
133     public synchronized void init( ServletConfig config )
134     {
135         if( getInit() ) {
136             //Already inited
137             return;
138         }
139 
140         try
141         {
142             logger.info ( "JetspeedThreadPoolService early init()....starting!");
143             initThreadpool(config);
144             setInit(true);
145             logger.info ( "JetspeedThreadPoolService early init()....finished!");
146         }
147         catch (Exception e)
148         {
149             logger.error ( "Cannot initialize JetspeedThreadPoolService!", e );
150         }
151 
152         // we don't call setInit(true) yet, because we want init() to be called also
153     }
154 
155     /***
156      * Processes the Runnable object with an available thread at default priority
157      *
158      * @see #process( Runnable, int )
159      * @param runnable the runnable code to process
160      */
161     public void process( Runnable runnable )  {
162         
163         process( runnable, Thread.MIN_PRIORITY );
164         
165     }
166     
167     /***
168      * Process a Runnable object by allocating a Thread for it
169      * at the given priority
170      *
171      * @param runnable the runnable code to process
172      * @param priority the priority used be the thread that will run this runnable
173      */
174     public void process( Runnable runnable, int priority ) {
175         
176         RunnableThread thread = this.getAvailableThread();
177         
178         if ( thread == null ) {
179 
180             this.getQueue().add( runnable );
181         
182         } else {
183 
184             try {
185                 synchronized ( thread ) {
186                     //get the default priority of this Thread
187                     int defaultPriority = thread.getPriority();
188                     if( defaultPriority != priority ) {
189                         //setting priority triggers security checks,
190                         //so we do it only if needed.
191                         thread.setPriority( priority );
192                     }
193                     thread.setRunnable( runnable );
194                     thread.notify();
195                 }
196             } catch ( Throwable t ) {
197                 logger.error("Throwable",  t);
198             }
199             
200         }
201         
202         
203     }
204     
205     /***
206      * Get the number of threads that have been created
207      *
208      * @return the number of threads currently created by the pool
209      */
210     public int getThreadCount() {
211         return this.tg.activeCount();        
212     }
213     
214     /***
215      * Get the number of threads that are available.
216      *
217      * @return the number of threads available in the pool
218      */
219     public int getAvailableThreadCount() {
220         return this.availableThreads.size();
221     }
222     
223     /***
224      * Get the current length of the Runnable queue, waiting for processing
225      *
226      * @return the length of the queue of waiting processes
227      */
228     public int getQueueLength() {
229         return this.getQueue().size();
230     }
231    
232     /***
233      * Get the number of threads that have successfully been processed
234      * for logging and debugging purposes.
235      *
236      * @return the number of processes executed since initialization
237      */
238     public int getThreadProcessedCount() {
239         return this.count;
240     }
241    
242     /***
243      * Get the queue used by the JetspeedThreadPoolService
244      *
245      * @return the queue holding the waiting processes
246      */
247     Queue getQueue() {
248         return this.queue;
249     }
250 
251     /***
252      * Place this thread back into the pool so that it can be used again
253      *
254      * @param thread the thread to release back to the pool
255      */
256     void release( RunnableThread thread ) {
257         
258         synchronized ( this.availableThreads )  {
259 
260             this.availableThreads.addElement( thread );
261         
262             ++this.count;
263 
264             /*
265             It is important to synchronize here because it is possible that
266             between the time we check the queue and we get this another
267             thread might return and fetch the queue to the end.
268             */
269             synchronized( this.getQueue() ) {
270             
271                 //now if there are any objects in the queue add one for processing to 
272                 //the thread that you just freed up.
273                 if ( this.getQueue().size() > 0 ) {
274 
275                     Runnable r = this.getQueue().get();
276 
277                     if ( r != null ) {
278                         this.process( r );
279                     } else {
280                         logger.info( "JetspeedThreadPoolService: no Runnable found." );
281                     }
282             
283                 }
284             
285             }
286             
287         }
288         
289     }
290     
291     /***
292      * This method initialized the ThreadPool
293      *
294      * @param config A ServletConfig.
295      */
296     private void initThreadpool( ServletConfig config )
297     {
298         Properties props = getProperties();
299 
300         try {
301 
302             this.initThreads = Integer.parseInt( props.getProperty( "init.count" ) );
303             this.maxThreads = Integer.parseInt( props.getProperty( "max.count" ) );
304             this.minSpareThreads = Integer.parseInt( props.getProperty( "minspare.count" ) );
305 
306         } catch ( NumberFormatException e ) {
307             logger.error("Invalid number format in properties", e);
308         }
309                                           
310         //create the number of threads needed for initialization
311         createThreads( this.initThreads );
312 
313     }
314 
315     /***
316      * Create "count" number of threads and make them available.  
317      *
318      * @param count the number of threads to create     
319      */    
320     private synchronized void createThreads( int count ) {
321         
322         //if the amount of threads you are about to create would end up being
323         //greater than maxThreads then just cap this off to the end point so that
324         //you end up with exactly maxThreads
325         if ( this.getThreadCount() < this.maxThreads && 
326              this.getThreadCount() + count > this.maxThreads ) {
327             
328             count = this.maxThreads - this.getThreadCount();
329             
330         } else if ( this.getThreadCount() >= this.maxThreads ) {
331 
332             return;
333         }
334         
335         logger.info( "JetspeedThreadPoolService:  creating " + 
336                    count + 
337                    " more thread(s) for a total of: " + 
338                    ( this.getThreadCount() + count ) );
339         
340         for (int i = 0; i < count; ++i ) {
341             
342 
343             //RunnableThread has a static numbering counter
344             RunnableThread thread = new RunnableThread( this.tg);
345             thread.setPriority( DEFAULT_THREAD_PRIORITY );
346             
347             thread.start(); //The thread calls release to add...
348             //SGP this.availableThreads.addElement( thread );
349             
350         }
351         
352     }
353 
354     /***
355      * Get a thread that is available from the pool or null if there are no more 
356      * threads left.
357      *
358      * @return a thread from the pool or null if non available
359      */
360     private RunnableThread getAvailableThread() {
361         
362        
363         synchronized( this.availableThreads ) {
364 
365             //if the current number of available threads is less than minSpareThreads
366             //then we need to create more
367         
368             if ( this.getAvailableThreadCount() < this.minSpareThreads ) {
369                 this.createThreads( this.minSpareThreads );
370             }
371 
372             //now if there aren't any threads available then just return null.
373             if ( this.getAvailableThreadCount() == 0 ) {
374                 return null;
375             }
376         
377             RunnableThread thread = null;
378             
379             
380             
381             //get the element to use
382             int id = this.availableThreads.size() - 1;
383 
384             
385             
386             thread = (RunnableThread)this.availableThreads.elementAt( id );
387             this.availableThreads.removeElementAt( id );
388 
389             return thread;
390         }
391         
392     
393     }
394 
395 }
396 
397 /***
398  * Handles holding Runnables until they are ready to be processed.  This is an impl
399  * of a FIFO (First In First Out) Queue.  This makes it possible to add Runnable
400  * objects so that they get processed and they pass through the queue in a predictable
401  * fashion.
402  *
403  * @author <a href="mailto:burton@apache.org">Kevin A. Burton</a>
404  * @version $Id: JetspeedThreadPoolService.java,v 1.10 2004/02/23 03:51:31 jford Exp $
405  */
406 class Queue {
407     
408     /***
409      * Holds Runnables that have been requested to process but there are no 
410      * threads available.
411      */
412     private Vector queue = new Vector();
413     
414     /***
415      * Add a Runnable object into the queue.
416      *
417      * @param runnable the process to add to the queue
418      */
419     public synchronized void add( Runnable runnable ) {
420         queue.insertElementAt( runnable, 0 );
421     }
422     
423     /***
424      * Get a Runnable object from the queue, and then remove it.  Return null
425      * if no more Runnable objects exist.
426      *
427      * @return the first Runnable stored in the queue or null if empty
428      */
429     public synchronized Runnable get() {
430         
431         if ( this.queue.size() == 0 ) {
432             JetspeedThreadPoolService.logger.info( "JetspeedThreadPoolService->Queue: No more Runnables left in queue.  Returning null" );
433             return null;
434         }
435         
436         int id = queue.size() - 1;
437         Runnable runnable = (Runnable)queue.elementAt( id );
438         this.queue.removeElementAt( id );
439 
440         return runnable;
441     }
442     
443     /***
444      * Return the size of the queue.
445      *
446      * @return the size of the queue
447      */
448     public int size() {
449         return this.queue.size();
450     }
451     
452 }