Clover coverage report - Ledge Components - SNAPSHOT
Coverage timestamp: Fri Nov 17 2006 05:13:20 CET
file stats: LOC: 640   Methods: 22
NCLOC: 405   Classes: 3
 
 Source file Conditionals Statements Methods TOTAL
AbstractScheduler.java 62.5% 81.1% 81.8% 76.5%
coverage coverage
 1    //
 2    //Copyright (c) 2003, Caltha - Gajda, Krzewski, Mach, Potempski Sp.J.
 3    //All rights reserved.
 4    //
 5    //Redistribution and use in source and binary forms, with or without modification,
 6    //are permitted provided that the following conditions are met:
 7    //
 8    //* Redistributions of source code must retain the above copyright notice,
 9    //this list of conditions and the following disclaimer.
 10    //* Redistributions in binary form must reproduce the above copyright notice,
 11    //this list of conditions and the following disclaimer in the documentation
 12    //and/or other materials provided with the distribution.
 13    //* Neither the name of the Caltha - Gajda, Krzewski, Mach, Potempski Sp.J.
 14    //nor the names of its contributors may be used to endorse or promote products
 15    //derived from this software without specific prior written permission.
 16    //
 17    //THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 18    //AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 19    //WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
 20    //IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
 21    //INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
 22    //BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA,
 23    //OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 24    //WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 25    //ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 26    //POSSIBILITY OF SUCH DAMAGE.
 27    //
 28   
 29    package org.objectledge.scheduler;
 30   
 31    import java.text.DateFormat;
 32    import java.text.SimpleDateFormat;
 33    import java.util.Date;
 34    import java.util.HashMap;
 35    import java.util.HashSet;
 36    import java.util.Iterator;
 37    import java.util.Map;
 38    import java.util.Set;
 39    import java.util.SortedMap;
 40    import java.util.TreeMap;
 41   
 42    import org.jcontainer.dna.Configuration;
 43    import org.jcontainer.dna.Logger;
 44    import org.objectledge.context.Context;
 45    import org.objectledge.threads.Task;
 46    import org.objectledge.threads.ThreadPool;
 47    import org.picocontainer.MutablePicoContainer;
 48    import org.picocontainer.Startable;
 49   
 50    /**
 51    * Base class for scheduler components.
 52    *
 53    * @author <a href="mailto:pablo@caltha.pl">Pawel Potempski </a>
 54    * @author <a href="mailto:rafal@caltha.pl">Rafal Krzewski </a>
 55    */
 56    public abstract class AbstractScheduler
 57    implements Startable
 58    {
 59    /** The default DateFormat pattern used by the UI (yyy-MM-dd HH:mm). */
 60    public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm";
 61   
 62    // instance variables ////////////////////////////////////////////////////
 63   
 64    /** the container */
 65    private MutablePicoContainer container;
 66   
 67    /** the configuration. */
 68    protected Configuration config;
 69   
 70    /** The logging facility. */
 71    protected Logger logger;
 72   
 73    /** The pool service. */
 74    protected ThreadPool threadPool;
 75   
 76    /** The registered schedule types factories. */
 77    private Map scheduleFactory = new HashMap();
 78   
 79    /** The registered jobs. */
 80    protected Map jobs = new HashMap();
 81   
 82    /** The job queue. */
 83    private SortedMap queue = new TreeMap();
 84   
 85    /** Runners of a specific job. */
 86    private Map runners = new HashMap();
 87   
 88    /** The date format to be used for the UI. */
 89    protected DateFormat format;
 90   
 91    /**
 92    * Component contructor.
 93    *
 94    * @param container the container to store loaded classes.
 95    * @param config the configuration.
 96    * @param logger the logger.
 97    * @param threadPool the thread pool component.
 98    * @param scheduleFactories the list of schedule factories.
 99    */
 100  920 public AbstractScheduler(MutablePicoContainer container, Configuration config, Logger logger,
 101    ThreadPool threadPool, ScheduleFactory[] scheduleFactories)
 102    {
 103  920 this.container = container;
 104  920 this.config = config;
 105  920 this.logger = logger;
 106  920 this.threadPool = threadPool;
 107  920 for(int i = 0; i < scheduleFactories.length; i++)
 108    {
 109  1380 scheduleFactory.put(scheduleFactories[i].getName(), scheduleFactories[i]);
 110    }
 111  920 String formatString = config.getChild("date_format").getValue(DATE_FORMAT_DEFAULT);
 112  920 format = new SimpleDateFormat(formatString);
 113    }
 114   
 115    /**
 116    * {@inheritDoc}
 117    */
 118  920 public void start()
 119    {
 120  920 loadJobs();
 121  920 Iterator i = jobs.values().iterator();
 122  920 while(i.hasNext())
 123    {
 124  1748 AbstractJobDescriptor job = (AbstractJobDescriptor)i.next();
 125  1748 if(job.getSchedule().atStartup())
 126    {
 127  0 run(job);
 128    }
 129  1748 schedule(job);
 130    }
 131  920 threadPool.runDaemon(new SchedulerTask());
 132    }
 133   
 134    /**
 135    * {@inheritDoc}
 136    */
 137  0 public void stop()
 138    {
 139    // I wish startable interface was split back to Startable/Stoppable
 140    }
 141   
 142    // Scheduler interface ////////////////////////////////////////////
 143   
 144    /**
 145    * Creates a new scheduled job.
 146    * <p>
 147    * Note that newly created jobs are initialy disabled. You need to call
 148    * {@link #enable(AbstractJobDescriptor)}to allow the job's execution.
 149    * </p>
 150    * <p>
 151    * Non-perisistent implementation may decide to disallow creation of new jobs at run time. This
 152    * mehtod would throw <code>UnsupportedOperationException</code> in that case.
 153    * </p>
 154    *
 155    * @param name the name of the job.
 156    * @param schedule the job's schedule.
 157    * @param jobSpec job class specification, see
 158    * {@link AbstractJobDescriptor#setJobClassName(String)}
 159    * @return the scheduled job.
 160    * @throws JobModificationException if the job could not be instantiated.
 161    */
 162    public abstract AbstractJobDescriptor createJobDescriptor(String name, Schedule schedule,
 163    String jobSpec)
 164    throws JobModificationException;
 165   
 166    /**
 167    * Deletes a scheduled job.
 168    * <p>
 169    * Non-persisten implementaion may decide to dissalow this operation.
 170    * </p>
 171    *
 172    * @param job the job.
 173    * @throws JobModificationException if the job could not be deleted.
 174    */
 175    public abstract void deleteJobDescriptor(AbstractJobDescriptor job)
 176    throws JobModificationException;
 177   
 178    /**
 179    * Enables a scheduled job.
 180    *
 181    * @param job the job.
 182    * @throws JobModificationException if the job state could not be saved.
 183    */
 184  92 public void enable(AbstractJobDescriptor job)
 185    throws JobModificationException
 186    {
 187  92 JobModificationException e = null;
 188  92 synchronized(job)
 189    {
 190  92 try
 191    {
 192  92 job.setEnabled(true);
 193    }
 194    catch(JobModificationException ee)
 195    {
 196  0 e = ee;
 197    }
 198    }
 199  92 schedule(job);
 200  92 if(e != null)
 201    {
 202  0 throw e;
 203    }
 204    }
 205   
 206    /**
 207    * Disables a scheduled job.
 208    * <p>
 209    * An attempt will be made to terminate all instances of this job running at the moment. Would
 210    * this attempt be effective depends on the implementation of <code>execute()</code> and
 211    * <code>terminate(Thread)</code> methods in the Job class.
 212    * </p>
 213    *
 214    * @param job the job.
 215    * @throws JobModificationException if the job state could not be saved.
 216    */
 217  46 public void disable(AbstractJobDescriptor job)
 218    throws JobModificationException
 219    {
 220  46 JobModificationException e = null;
 221  46 synchronized(job)
 222    {
 223  46 try
 224    {
 225  46 job.setEnabled(false);
 226    }
 227    catch(JobModificationException ee)
 228    {
 229  0 e = ee;
 230    }
 231    }
 232  46 terminate(job);
 233  46 if(e != null)
 234    {
 235  0 throw e;
 236    }
 237    }
 238   
 239    /**
 240    * Returns all currently configured jobs.
 241    *
 242    * @return all currently configured jobs.
 243    */
 244  92 public synchronized AbstractJobDescriptor[] getJobDescriptors()
 245    {
 246  92 AbstractJobDescriptor[] result = new AbstractJobDescriptor[jobs.size()];
 247  92 jobs.values().toArray(result);
 248  92 return result;
 249    }
 250   
 251    /**
 252    * Returns a configured job with the specified name.
 253    *
 254    * @param name the job's name.
 255    * @return the scheduled job.
 256    */
 257  230 public AbstractJobDescriptor getJobDescriptor(String name)
 258    {
 259  230 return (AbstractJobDescriptor)jobs.get(name);
 260    }
 261   
 262    /**
 263    * Returns available schedule types.
 264    *
 265    * @return available schedule types.
 266    */
 267  92 public String[] getScheduleTypes()
 268    {
 269  92 String[] result = new String[scheduleFactory.size()];
 270  92 scheduleFactory.keySet().toArray(result);
 271  92 return result;
 272    }
 273   
 274    /**
 275    * Creates a Schedule object.
 276    *
 277    * @param type the schedule type.
 278    * @param config the schedule configuration.
 279    * @return the schedule.
 280    * @throws InvalidScheduleException if schedule factory is not registerd.
 281    */
 282  2392 public Schedule createSchedule(String type, String config)
 283    throws InvalidScheduleException
 284    {
 285  2392 ScheduleFactory factory = (ScheduleFactory)scheduleFactory.get(type);
 286  2392 if(factory == null)
 287    {
 288  92 throw new InvalidScheduleException("Schedule factory for type '" + type
 289    + "' not registered");
 290    }
 291  2300 Schedule schedule = factory.getInstance();
 292  2300 schedule.init(this, config);
 293  2208 return schedule;
 294    }
 295   
 296    /**
 297    * Returns <code>true</code> if the implemenation allows job manipulation at run time.
 298    * <p>
 299    * If this method returns <code>false</code>, the following methods are not supported, and
 300    * should not be called:
 301    * </p>
 302    * <ul>
 303    * <li><code>SchedulerService.createJob()</code></li>
 304    * <li><code>SchedulerService.deleteJob()</code></li>
 305    * <li><code>ScheduledJob.setSchedule()</code></li>
 306    * <li><code>ScheduledJob.setJobSpec()</code></li>
 307    * <li><code>ScheduledJob.setRunCountLimit()</code></li>
 308    * <li><code>ScheduledJob.setTimeLimit()</code></li>
 309    * <li><code>ScheduledJob.setReentrant()</code></li>
 310    * </ul>
 311    *
 312    * @return <code>true</code> if the implemenation allows job manipulation at run time.
 313    */
 314    public abstract boolean allowsModifications();
 315   
 316    /**
 317    * Returns the DateFormat that should be used by the UI.
 318    *
 319    * @return the date format.
 320    */
 321  1104 public DateFormat getDateFormat()
 322    {
 323  1104 return format;
 324    }
 325   
 326    // implementation ////////////////////////////////////////////////////////
 327   
 328    /**
 329    * Loads the jobs from storage / configuration.
 330    */
 331    protected abstract void loadJobs();
 332   
 333    /**
 334    * Schedules the next execution of a job.
 335    *
 336    * @param job the job.
 337    */
 338  1851 void schedule(AbstractJobDescriptor job)
 339    {
 340  1851 if(job.isEnabled())
 341    {
 342  1391 Date nextRun = job.getSchedule().getNextRunTime(new Date(), job.getLastRunTime());
 343  1391 logger.debug("scheduling " + job.getName() + " at " + nextRun + " (last run " + job.getLastRunTime() + ")");
 344  1391 if(nextRun != null)
 345    {
 346  471 Date start = job.getTimeLimitStart();
 347  471 Date end = job.getTimeLimitEnd();
 348  471 if((start == null || nextRun.compareTo(start) > 0)
 349    && (end == null || nextRun.compareTo(end) < 0))
 350    {
 351  471 int countLimit = job.getRunCountLimit();
 352  471 if(countLimit < 0 || job.getRunCount() < countLimit)
 353    {
 354  460 Long target = new Long(nextRun.getTime());
 355  460 Set set = (Set)queue.get(target);
 356  460 if(set == null)
 357    {
 358  460 set = new HashSet();
 359  460 queue.put(target, set);
 360    }
 361  460 set.add(job);
 362  460 synchronized(queue)
 363    {
 364  460 queue.notify();
 365    }
 366  460 return;
 367    }
 368    }
 369    }
 370  931 if(job.getAutoClean())
 371    {
 372  0 try
 373    {
 374  0 deleteJobDescriptor(job);
 375    }
 376    catch(JobModificationException e)
 377    {
 378  0 logger.error("failed to auto-clean expired job " + job.getName(), e);
 379    }
 380    }
 381    }
 382    }
 383   
 384    /**
 385    * Terminates execution of all instances of the job.
 386    */
 387  46 private void terminate(AbstractJobDescriptor job)
 388    {
 389  46 synchronized(runners)
 390    {
 391  46 Set set = (Set)runners.get(job);
 392  46 if(set != null)
 393    {
 394  0 Iterator i = set.iterator();
 395  0 while(i.hasNext())
 396    {
 397  0 ((RunnerTask)i.next()).terminate();
 398    }
 399    }
 400    }
 401    }
 402   
 403    /**
 404    * Runs a specified job within a pool service's worker thread.
 405    *
 406    * @param job the job.
 407    */
 408  11 void run(AbstractJobDescriptor job)
 409    {
 410  11 synchronized(job)
 411    {
 412  11 if(!job.isRunning() || job.isReentrant())
 413    {
 414  11 logger.debug("starting " + job.getName() + " at " + new Date());
 415  11 job.setRunning(true);
 416  11 RunnerTask rt = new RunnerTask(job);
 417  11 threadPool.runWorker(rt);
 418    }
 419    }
 420    }
 421   
 422    /**
 423    * Get job component.
 424    *
 425    * @param job the job description.
 426    * @return the job object.
 427    * @throws JobNotFoundException if job class not found.
 428    */
 429  11 Job getJobObject(AbstractJobDescriptor job)
 430    throws JobNotFoundException
 431    {
 432  11 String className = job.getJobClassName();
 433  11 try
 434    {
 435  11 Class clazz = Class.forName(className);
 436  0 if(container.getComponentInstance(clazz) == null)
 437    {
 438  0 container.registerComponentImplementation(clazz);
 439    }
 440  0 return (Job)container.getComponentInstance(clazz);
 441    }
 442    catch(ClassNotFoundException e)
 443    {
 444  11 throw new JobNotFoundException("Couldn't find job class '" + className + "' ", e);
 445    }
 446    }
 447   
 448    /**
 449    * The worker task that runs a sheduled job.
 450    */
 451    private class RunnerTask
 452    extends Task
 453    {
 454    private AbstractJobDescriptor job;
 455   
 456    private Thread thread;
 457   
 458  11 public RunnerTask(AbstractJobDescriptor job)
 459    {
 460  11 this.job = job;
 461  11 this.thread = Thread.currentThread();
 462    }
 463   
 464  33 public String getName()
 465    {
 466  33 return "scheduler: " + job.getName();
 467    }
 468   
 469  11 public void process(Context context)
 470    {
 471  11 synchronized(runners)
 472    {
 473  11 Set set = (Set)runners.get(job);
 474  11 if(set == null)
 475    {
 476  11 set = new HashSet();
 477  11 runners.put(job, set);
 478    }
 479  11 set.add(this);
 480    }
 481   
 482  11 synchronized(job)
 483    {
 484  11 try
 485    {
 486  11 job.setLastRunTime(new Date());
 487  11 job.setRunCount(job.getRunCount() + 1);
 488    }
 489    catch(JobModificationException e)