View Javadoc

1   // 
2   //Copyright (c) 2003, Caltha - Gajda, Krzewski, Mach, Potempski Sp.J. 
3   //All rights reserved. 
4   //   
5   //Redistribution and use in source and binary forms, with or without modification,  
6   //are permitted provided that the following conditions are met: 
7   //   
8   //* Redistributions of source code must retain the above copyright notice,  
9   //this list of conditions and the following disclaimer. 
10  //* Redistributions in binary form must reproduce the above copyright notice,  
11  //this list of conditions and the following disclaimer in the documentation  
12  //and/or other materials provided with the distribution. 
13  //* Neither the name of the Caltha - Gajda, Krzewski, Mach, Potempski Sp.J.  
14  //nor the names of its contributors may be used to endorse or promote products  
15  //derived from this software without specific prior written permission. 
16  // 
17  //THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"  
18  //AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED  
19  //WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 
20  //IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,  
21  //INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,  
22  //BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, 
23  //OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,  
24  //WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)  
25  //ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE  
26  //POSSIBILITY OF SUCH DAMAGE. 
27  //
28  
29  package org.objectledge.scheduler;
30  
31  import java.text.DateFormat;
32  import java.text.SimpleDateFormat;
33  import java.util.Date;
34  import java.util.HashMap;
35  import java.util.HashSet;
36  import java.util.Iterator;
37  import java.util.Map;
38  import java.util.Set;
39  import java.util.SortedMap;
40  import java.util.TreeMap;
41  
42  import org.jcontainer.dna.Configuration;
43  import org.jcontainer.dna.Logger;
44  import org.objectledge.context.Context;
45  import org.objectledge.threads.Task;
46  import org.objectledge.threads.ThreadPool;
47  import org.picocontainer.MutablePicoContainer;
48  import org.picocontainer.Startable;
49  
50  /**
51   * Base class for scheduler components.
52   * 
53   * @author <a href="mailto:pablo@caltha.pl">Pawel Potempski </a>
54   * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski </a>
55   */
56  public abstract class AbstractScheduler
57      implements Startable
58  {
59      /*** The default DateFormat pattern used by the UI (yyy-MM-dd HH:mm). */
60      public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm";
61  
62      // instance variables ////////////////////////////////////////////////////
63  
64      /*** the container */
65      private MutablePicoContainer container;
66  
67      /*** the configuration. */
68      protected Configuration config;
69  
70      /*** The logging facility. */
71      protected Logger logger;
72  
73      /*** The pool service. */
74      protected ThreadPool threadPool;
75  
76      /*** The registered schedule types factories. */
77      private Map scheduleFactory = new HashMap();
78  
79      /*** The registered jobs. */
80      protected Map jobs = new HashMap();
81  
82      /*** The job queue. */
83      private SortedMap queue = new TreeMap();
84  
85      /*** Runners of a specific job. */
86      private Map runners = new HashMap();
87  
88      /*** The date format to be used for the UI. */
89      protected DateFormat format;
90  
91      /***
92       * Component contructor.
93       * 
94       * @param container the container to store loaded classes.
95       * @param config the configuration.
96       * @param logger the logger.
97       * @param threadPool the thread pool component.
98       * @param scheduleFactories the list of schedule factories.
99       */
100     public AbstractScheduler(MutablePicoContainer container, Configuration config, Logger logger,
101         ThreadPool threadPool, ScheduleFactory[] scheduleFactories)
102     {
103         this.container = container;
104         this.config = config;
105         this.logger = logger;
106         this.threadPool = threadPool;
107         for(int i = 0; i < scheduleFactories.length; i++)
108         {
109             scheduleFactory.put(scheduleFactories[i].getName(), scheduleFactories[i]);
110         }
111         String formatString = config.getChild("date_format").getValue(DATE_FORMAT_DEFAULT);
112         format = new SimpleDateFormat(formatString);
113     }
114 
115     /***
116      * {@inheritDoc}
117      */
118     public void start()
119     {
120         loadJobs();
121         Iterator i = jobs.values().iterator();
122         while(i.hasNext())
123         {
124             AbstractJobDescriptor job = (AbstractJobDescriptor)i.next();
125             if(job.getSchedule().atStartup())
126             {
127                 run(job);
128             }
129             schedule(job);
130         }
131         threadPool.runDaemon(new SchedulerTask());
132     }
133 
134     /***
135      * {@inheritDoc}
136      */
137     public void stop()
138     {
139         // I wish startable interface was split back to Startable/Stoppable
140     }
141 
142     // Scheduler interface ////////////////////////////////////////////
143 
144     /***
145      * Creates a new scheduled job.
146      * <p>
147      * Note that newly created jobs are initialy disabled. You need to call
148      * {@link #enable(AbstractJobDescriptor)}to allow the job's execution.
149      * </p>
150      * <p>
151      * Non-perisistent implementation may decide to disallow creation of new jobs at run time. This
152      * mehtod would throw <code>UnsupportedOperationException</code> in that case.
153      * </p>
154      * 
155      * @param name the name of the job.
156      * @param schedule the job's schedule.
157      * @param jobSpec job class specification, see
158      *        {@link AbstractJobDescriptor#setJobClassName(String)}
159      * @return the scheduled job.
160      * @throws JobModificationException if the job could not be instantiated.
161      */
162     public abstract AbstractJobDescriptor createJobDescriptor(String name, Schedule schedule,
163         String jobSpec)
164         throws JobModificationException;
165 
166     /***
167      * Deletes a scheduled job.
168      * <p>
169      * Non-persisten implementaion may decide to dissalow this operation.
170      * </p>
171      * 
172      * @param job the job.
173      * @throws JobModificationException if the job could not be deleted.
174      */
175     public abstract void deleteJobDescriptor(AbstractJobDescriptor job)
176         throws JobModificationException;
177 
178     /***
179      * Enables a scheduled job.
180      * 
181      * @param job the job.
182      * @throws JobModificationException if the job state could not be saved.
183      */
184     public void enable(AbstractJobDescriptor job)
185         throws JobModificationException
186     {
187         JobModificationException e = null;
188         synchronized(job)
189         {
190             try
191             {
192                 job.setEnabled(true);
193             }
194             catch(JobModificationException ee)
195             {
196                 e = ee;
197             }
198         }
199         schedule(job);
200         if(e != null)
201         {
202             throw e;
203         }
204     }
205 
206     /***
207      * Disables a scheduled job.
208      * <p>
209      * An attempt will be made to terminate all instances of this job running at the moment. Would
210      * this attempt be effective depends on the implementation of <code>execute()</code> and
211      * <code>terminate(Thread)</code> methods in the Job class.
212      * </p>
213      * 
214      * @param job the job.
215      * @throws JobModificationException if the job state could not be saved.
216      */
217     public void disable(AbstractJobDescriptor job)
218         throws JobModificationException
219     {
220         JobModificationException e = null;
221         synchronized(job)
222         {
223             try
224             {
225                 job.setEnabled(false);
226             }
227             catch(JobModificationException ee)
228             {
229                 e = ee;
230             }
231         }
232         terminate(job);
233         if(e != null)
234         {
235             throw e;
236         }
237     }
238 
239     /***
240      * Returns all currently configured jobs.
241      * 
242      * @return all currently configured jobs.
243      */
244     public synchronized AbstractJobDescriptor[] getJobDescriptors()
245     {
246         AbstractJobDescriptor[] result = new AbstractJobDescriptor[jobs.size()];
247         jobs.values().toArray(result);
248         return result;
249     }
250 
251     /***
252      * Returns a configured job with the specified name.
253      * 
254      * @param name the job's name.
255      * @return the scheduled job.
256      */
257     public AbstractJobDescriptor getJobDescriptor(String name)
258     {
259         return (AbstractJobDescriptor)jobs.get(name);
260     }
261 
262     /***
263      * Returns available schedule types.
264      * 
265      * @return available schedule types.
266      */
267     public String[] getScheduleTypes()
268     {
269         String[] result = new String[scheduleFactory.size()];
270         scheduleFactory.keySet().toArray(result);
271         return result;
272     }
273 
274     /***
275      * Creates a Schedule object.
276      * 
277      * @param type the schedule type.
278      * @param config the schedule configuration.
279      * @return the schedule.
280      * @throws InvalidScheduleException if schedule factory is not registerd.
281      */
282     public Schedule createSchedule(String type, String config)
283         throws InvalidScheduleException
284     {
285         ScheduleFactory factory = (ScheduleFactory)scheduleFactory.get(type);
286         if(factory == null)
287         {
288             throw new InvalidScheduleException("Schedule factory for type '" + type
289                 + "' not registered");
290         }
291         Schedule schedule = factory.getInstance();
292         schedule.init(this, config);
293         return schedule;
294     }
295 
296     /***
297      * Returns <code>true</code> if the implemenation allows job manipulation at run time.
298      * <p>
299      * If this method returns <code>false</code>, the following methods are not supported, and
300      * should not be called:
301      * </p>
302      * <ul>
303      * <li><code>SchedulerService.createJob()</code></li>
304      * <li><code>SchedulerService.deleteJob()</code></li>
305      * <li><code>ScheduledJob.setSchedule()</code></li>
306      * <li><code>ScheduledJob.setJobSpec()</code></li>
307      * <li><code>ScheduledJob.setRunCountLimit()</code></li>
308      * <li><code>ScheduledJob.setTimeLimit()</code></li>
309      * <li><code>ScheduledJob.setReentrant()</code></li>
310      * </ul>
311      * 
312      * @return <code>true</code> if the implemenation allows job manipulation at run time.
313      */
314     public abstract boolean allowsModifications();
315 
316     /***
317      * Returns the DateFormat that should be used by the UI.
318      * 
319      * @return the date format.
320      */
321     public DateFormat getDateFormat()
322     {
323         return format;
324     }
325 
326     // implementation ////////////////////////////////////////////////////////
327 
328     /***
329      * Loads the jobs from storage / configuration.
330      */
331     protected abstract void loadJobs();
332 
333     /***
334      * Schedules the next execution of a job.
335      * 
336      * @param job the job.
337      */
338     void schedule(AbstractJobDescriptor job)
339     {
340         if(job.isEnabled())
341         {
342             Date nextRun = job.getSchedule().getNextRunTime(new Date(), job.getLastRunTime());
343             logger.debug("scheduling " + job.getName() + " at " + nextRun + " (last run " + job.getLastRunTime() + ")");
344             if(nextRun != null)
345             {
346                 Date start = job.getTimeLimitStart();
347                 Date end = job.getTimeLimitEnd();
348                 if((start == null || nextRun.compareTo(start) > 0)
349                     && (end == null || nextRun.compareTo(end) < 0))
350                 {
351                     int countLimit = job.getRunCountLimit();
352                     if(countLimit < 0 || job.getRunCount() < countLimit)
353                     {
354                         Long target = new Long(nextRun.getTime());
355                         Set set = (Set)queue.get(target);
356                         if(set == null)
357                         {
358                             set = new HashSet();
359                             queue.put(target, set);
360                         }
361                         set.add(job);
362                         synchronized(queue)
363                         {
364                             queue.notify();
365                         }
366                         return;
367                     }
368                 }
369             }
370             if(job.getAutoClean())
371             {
372                 try
373                 {
374                     deleteJobDescriptor(job);
375                 }
376                 catch(JobModificationException e)
377                 {
378                     logger.error("failed to auto-clean expired job " + job.getName(), e);
379                 }
380             }
381         }
382     }
383 
384     /***
385      * Terminates execution of all instances of the job.
386      */
387     private void terminate(AbstractJobDescriptor job)
388     {
389         synchronized(runners)
390         {
391             Set set = (Set)runners.get(job);
392             if(set != null)
393             {
394                 Iterator i = set.iterator();
395                 while(i.hasNext())
396                 {
397                     ((RunnerTask)i.next()).terminate();
398                 }
399             }
400         }
401     }
402 
403     /***
404      * Runs a specified job within a pool service's worker thread.
405      * 
406      * @param job the job.
407      */
408     void run(AbstractJobDescriptor job)
409     {
410         synchronized(job)
411         {
412             if(!job.isRunning() || job.isReentrant())
413             {
414                 logger.debug("starting " + job.getName() + " at " + new Date());
415                 job.setRunning(true);
416                 RunnerTask rt = new RunnerTask(job);
417                 threadPool.runWorker(rt);
418             }
419         }
420     }
421 
422     /***
423      * Get job component.
424      * 
425      * @param job the job description.
426      * @return the job object.
427      * @throws JobNotFoundException if job class not found.
428      */
429     Job getJobObject(AbstractJobDescriptor job)
430         throws JobNotFoundException
431     {
432         String className = job.getJobClassName();
433         try
434         {
435             Class clazz = Class.forName(className);
436             if(container.getComponentInstance(clazz) == null)
437             {
438                 container.registerComponentImplementation(clazz);
439             }
440             return (Job)container.getComponentInstance(clazz);
441         }
442         catch(ClassNotFoundException e)
443         {
444             throw new JobNotFoundException("Couldn't find job class '" + className + "' ", e);
445         }
446     }
447 
448     /***
449      * The worker task that runs a sheduled job.
450      */
451     private class RunnerTask
452         extends Task
453     {
454         private AbstractJobDescriptor job;
455 
456         private Thread thread;
457 
458         public RunnerTask(AbstractJobDescriptor job)
459         {
460             this.job = job;
461             this.thread = Thread.currentThread();
462         }
463 
464         public String getName()
465         {
466             return "scheduler: " + job.getName();
467         }
468 
469         public void process(Context context)
470         {
471             synchronized(runners)
472             {
473                 Set set = (Set)runners.get(job);
474                 if(set == null)
475                 {
476                     set = new HashSet();
477                     runners.put(job, set);
478                 }
479                 set.add(this);
480             }
481 
482             synchronized(job)
483             {
484                 try
485                 {
486                     job.setLastRunTime(new Date());
487                     job.setRunCount(job.getRunCount() + 1);
488                 }
489                 catch(JobModificationException e)
490                 {
491                     logger.warn("failed to save job accountig information", e);
492                 }
493             }
494 
495             try
496             {
497                 try
498                 {
499                     getJobObject(job).run(new String[] { job.getArgument() });
500                 }
501                 catch(JobNotFoundException e)
502                 {
503                     logger.error("invalid job specification " + job.getJobClassName(), e);
504                 }
505                 finally
506                 {
507                     synchronized(runners)
508                     {
509                         Set set = (Set)runners.get(job);
510                         if(set != null)
511                         {
512                             set.remove(this);
513                             if(set.isEmpty())
514                             {
515                                 synchronized(job)
516                                 {
517                                     job.setRunning(false);
518                                 }
519                             }
520                         }
521                     }
522 
523                     if(!job.isReentrant())
524                     {
525                         AbstractScheduler.this.schedule(job);
526                     }
527                     thread = null;
528                     
529                 }
530             }
531             catch(VirtualMachineError e)
532             {
533                 throw e;
534             }
535             catch(ThreadDeath e)
536             {
537                 throw e;
538             }
539             catch(Throwable t)
540             {
541                 logger.error("uncaught exception in scheduled job " + job.getName(), t);
542             }
543         }
544 
545         public void terminate(Thread t)
546         {
547             try
548             {
549                 getJobObject(job).terminate(t);
550             }
551             catch(JobNotFoundException e)
552             {
553                 logger.error("invalid job specification " + job.getJobClassName(), e);
554             }
555         }
556 
557         public void terminate()
558         {
559             if(thread != null)
560             {
561                 terminate(thread);
562             }
563         }
564     }
565 
566     /***
567      * The daemon task that performs the timekeeping
568      */
569     private class SchedulerTask
570         extends Task
571     {
572         public String getName()
573         {
574             return "Job Scheduler";
575         }
576 
577         public void process(Context context)
578         {
579             synchronized(queue)
580             {
581                 long now;
582                 loop: while(!Thread.interrupted())
583                 {
584                     // queue is empty - wait indefinetely
585                     if(queue.size() == 0)
586                     {
587                         try
588                         {
589                             queue.wait();
590                         }
591                         catch(InterruptedException e)
592                         {
593                             break loop;
594                         }
595                     }
596                     // there is something in the queue
597                     now = System.currentTimeMillis();
598                     Long first = (Long)queue.firstKey();
599                     while(!queue.isEmpty() && first.longValue() <= now)
600                     {
601                         // the first element of the que has reached or passed
602                         // it's target time
603                         Set set = (Set)queue.remove(first);
604                         Iterator i = set.iterator();
605                         while(i.hasNext())
606                         {
607                             AbstractJobDescriptor job = (AbstractJobDescriptor)i.next();
608                             AbstractScheduler.this.run(job);
609                             if(job.isReentrant())
610                             {
611                                 AbstractScheduler.this.schedule(job);
612                             }
613                         }
614                         if(!queue.isEmpty())
615                         {
616                             first = (Long)queue.firstKey();
617                         }
618                     }
619                     // wait for the first element's target time
620                     if(!queue.isEmpty())
621                     {
622                         try
623                         {
624                             queue.wait(first.longValue() - now);
625                         }
626                         catch(InterruptedException e)
627                         {
628                             break loop;
629                         }
630                     }
631                 }
632             }
633         }
634 
635         public void terminate(Thread t)
636         {
637             t.interrupt();
638         }
639     }
640 }