1
2
3
4
5
6
7
8 /
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56 public abstract class AbstractScheduler
57 implements Startable
58 {
59 /*** The default DateFormat pattern used by the UI (yyy-MM-dd HH:mm). */
60 public static final String DATE_FORMAT_DEFAULT = "yyyy-MM-dd HH:mm";
61
62
63
64 /*** the container */
65 private MutablePicoContainer container;
66
67 /*** the configuration. */
68 protected Configuration config;
69
70 /*** The logging facility. */
71 protected Logger logger;
72
73 /*** The pool service. */
74 protected ThreadPool threadPool;
75
76 /*** The registered schedule types factories. */
77 private Map scheduleFactory = new HashMap();
78
79 /*** The registered jobs. */
80 protected Map jobs = new HashMap();
81
82 /*** The job queue. */
83 private SortedMap queue = new TreeMap();
84
85 /*** Runners of a specific job. */
86 private Map runners = new HashMap();
87
88 /*** The date format to be used for the UI. */
89 protected DateFormat format;
90
91 /***
92 * Component contructor.
93 *
94 * @param container the container to store loaded classes.
95 * @param config the configuration.
96 * @param logger the logger.
97 * @param threadPool the thread pool component.
98 * @param scheduleFactories the list of schedule factories.
99 */
100 public AbstractScheduler(MutablePicoContainer container, Configuration config, Logger logger,
101 ThreadPool threadPool, ScheduleFactory[] scheduleFactories)
102 {
103 this.container = container;
104 this.config = config;
105 this.logger = logger;
106 this.threadPool = threadPool;
107 for(int i = 0; i < scheduleFactories.length; i++)
108 {
109 scheduleFactory.put(scheduleFactories[i].getName(), scheduleFactories[i]);
110 }
111 String formatString = config.getChild("date_format").getValue(DATE_FORMAT_DEFAULT);
112 format = new SimpleDateFormat(formatString);
113 }
114
115 /***
116 * {@inheritDoc}
117 */
118 public void start()
119 {
120 loadJobs();
121 Iterator i = jobs.values().iterator();
122 while(i.hasNext())
123 {
124 AbstractJobDescriptor job = (AbstractJobDescriptor)i.next();
125 if(job.getSchedule().atStartup())
126 {
127 run(job);
128 }
129 schedule(job);
130 }
131 threadPool.runDaemon(new SchedulerTask());
132 }
133
134 /***
135 * {@inheritDoc}
136 */
137 public void stop()
138 {
139
140 }
141
142
143
144 /***
145 * Creates a new scheduled job.
146 * <p>
147 * Note that newly created jobs are initialy disabled. You need to call
148 * {@link #enable(AbstractJobDescriptor)}to allow the job's execution.
149 * </p>
150 * <p>
151 * Non-perisistent implementation may decide to disallow creation of new jobs at run time. This
152 * mehtod would throw <code>UnsupportedOperationException</code> in that case.
153 * </p>
154 *
155 * @param name the name of the job.
156 * @param schedule the job's schedule.
157 * @param jobSpec job class specification, see
158 * {@link AbstractJobDescriptor#setJobClassName(String)}
159 * @return the scheduled job.
160 * @throws JobModificationException if the job could not be instantiated.
161 */
162 public abstract AbstractJobDescriptor createJobDescriptor(String name, Schedule schedule,
163 String jobSpec)
164 throws JobModificationException;
165
166 /***
167 * Deletes a scheduled job.
168 * <p>
169 * Non-persisten implementaion may decide to dissalow this operation.
170 * </p>
171 *
172 * @param job the job.
173 * @throws JobModificationException if the job could not be deleted.
174 */
175 public abstract void deleteJobDescriptor(AbstractJobDescriptor job)
176 throws JobModificationException;
177
178 /***
179 * Enables a scheduled job.
180 *
181 * @param job the job.
182 * @throws JobModificationException if the job state could not be saved.
183 */
184 public void enable(AbstractJobDescriptor job)
185 throws JobModificationException
186 {
187 JobModificationException e = null;
188 synchronized(job)
189 {
190 try
191 {
192 job.setEnabled(true);
193 }
194 catch(JobModificationException ee)
195 {
196 e = ee;
197 }
198 }
199 schedule(job);
200 if(e != null)
201 {
202 throw e;
203 }
204 }
205
206 /***
207 * Disables a scheduled job.
208 * <p>
209 * An attempt will be made to terminate all instances of this job running at the moment. Would
210 * this attempt be effective depends on the implementation of <code>execute()</code> and
211 * <code>terminate(Thread)</code> methods in the Job class.
212 * </p>
213 *
214 * @param job the job.
215 * @throws JobModificationException if the job state could not be saved.
216 */
217 public void disable(AbstractJobDescriptor job)
218 throws JobModificationException
219 {
220 JobModificationException e = null;
221 synchronized(job)
222 {
223 try
224 {
225 job.setEnabled(false);
226 }
227 catch(JobModificationException ee)
228 {
229 e = ee;
230 }
231 }
232 terminate(job);
233 if(e != null)
234 {
235 throw e;
236 }
237 }
238
239 /***
240 * Returns all currently configured jobs.
241 *
242 * @return all currently configured jobs.
243 */
244 public synchronized AbstractJobDescriptor[] getJobDescriptors()
245 {
246 AbstractJobDescriptor[] result = new AbstractJobDescriptor[jobs.size()];
247 jobs.values().toArray(result);
248 return result;
249 }
250
251 /***
252 * Returns a configured job with the specified name.
253 *
254 * @param name the job's name.
255 * @return the scheduled job.
256 */
257 public AbstractJobDescriptor getJobDescriptor(String name)
258 {
259 return (AbstractJobDescriptor)jobs.get(name);
260 }
261
262 /***
263 * Returns available schedule types.
264 *
265 * @return available schedule types.
266 */
267 public String[] getScheduleTypes()
268 {
269 String[] result = new String[scheduleFactory.size()];
270 scheduleFactory.keySet().toArray(result);
271 return result;
272 }
273
274 /***
275 * Creates a Schedule object.
276 *
277 * @param type the schedule type.
278 * @param config the schedule configuration.
279 * @return the schedule.
280 * @throws InvalidScheduleException if schedule factory is not registerd.
281 */
282 public Schedule createSchedule(String type, String config)
283 throws InvalidScheduleException
284 {
285 ScheduleFactory factory = (ScheduleFactory)scheduleFactory.get(type);
286 if(factory == null)
287 {
288 throw new InvalidScheduleException("Schedule factory for type '" + type
289 + "' not registered");
290 }
291 Schedule schedule = factory.getInstance();
292 schedule.init(this, config);
293 return schedule;
294 }
295
296 /***
297 * Returns <code>true</code> if the implemenation allows job manipulation at run time.
298 * <p>
299 * If this method returns <code>false</code>, the following methods are not supported, and
300 * should not be called:
301 * </p>
302 * <ul>
303 * <li><code>SchedulerService.createJob()</code></li>
304 * <li><code>SchedulerService.deleteJob()</code></li>
305 * <li><code>ScheduledJob.setSchedule()</code></li>
306 * <li><code>ScheduledJob.setJobSpec()</code></li>
307 * <li><code>ScheduledJob.setRunCountLimit()</code></li>
308 * <li><code>ScheduledJob.setTimeLimit()</code></li>
309 * <li><code>ScheduledJob.setReentrant()</code></li>
310 * </ul>
311 *
312 * @return <code>true</code> if the implemenation allows job manipulation at run time.
313 */
314 public abstract boolean allowsModifications();
315
316 /***
317 * Returns the DateFormat that should be used by the UI.
318 *
319 * @return the date format.
320 */
321 public DateFormat getDateFormat()
322 {
323 return format;
324 }
325
326
327
328 /***
329 * Loads the jobs from storage / configuration.
330 */
331 protected abstract void loadJobs();
332
333 /***
334 * Schedules the next execution of a job.
335 *
336 * @param job the job.
337 */
338 void schedule(AbstractJobDescriptor job)
339 {
340 if(job.isEnabled())
341 {
342 Date nextRun = job.getSchedule().getNextRunTime(new Date(), job.getLastRunTime());
343 logger.debug("scheduling " + job.getName() + " at " + nextRun + " (last run " + job.getLastRunTime() + ")");
344 if(nextRun != null)
345 {
346 Date start = job.getTimeLimitStart();
347 Date end = job.getTimeLimitEnd();
348 if((start == null || nextRun.compareTo(start) > 0)
349 && (end == null || nextRun.compareTo(end) < 0))
350 {
351 int countLimit = job.getRunCountLimit();
352 if(countLimit < 0 || job.getRunCount() < countLimit)
353 {
354 Long target = new Long(nextRun.getTime());
355 Set set = (Set)queue.get(target);
356 if(set == null)
357 {
358 set = new HashSet();
359 queue.put(target, set);
360 }
361 set.add(job);
362 synchronized(queue)
363 {
364 queue.notify();
365 }
366 return;
367 }
368 }
369 }
370 if(job.getAutoClean())
371 {
372 try
373 {
374 deleteJobDescriptor(job);
375 }
376 catch(JobModificationException e)
377 {
378 logger.error("failed to auto-clean expired job " + job.getName(), e);
379 }
380 }
381 }
382 }
383
384 /***
385 * Terminates execution of all instances of the job.
386 */
387 private void terminate(AbstractJobDescriptor job)
388 {
389 synchronized(runners)
390 {
391 Set set = (Set)runners.get(job);
392 if(set != null)
393 {
394 Iterator i = set.iterator();
395 while(i.hasNext())
396 {
397 ((RunnerTask)i.next()).terminate();
398 }
399 }
400 }
401 }
402
403 /***
404 * Runs a specified job within a pool service's worker thread.
405 *
406 * @param job the job.
407 */
408 void run(AbstractJobDescriptor job)
409 {
410 synchronized(job)
411 {
412 if(!job.isRunning() || job.isReentrant())
413 {
414 logger.debug("starting " + job.getName() + " at " + new Date());
415 job.setRunning(true);
416 RunnerTask rt = new RunnerTask(job);
417 threadPool.runWorker(rt);
418 }
419 }
420 }
421
422 /***
423 * Get job component.
424 *
425 * @param job the job description.
426 * @return the job object.
427 * @throws JobNotFoundException if job class not found.
428 */
429 Job getJobObject(AbstractJobDescriptor job)
430 throws JobNotFoundException
431 {
432 String className = job.getJobClassName();
433 try
434 {
435 Class clazz = Class.forName(className);
436 if(container.getComponentInstance(clazz) == null)
437 {
438 container.registerComponentImplementation(clazz);
439 }
440 return (Job)container.getComponentInstance(clazz);
441 }
442 catch(ClassNotFoundException e)
443 {
444 throw new JobNotFoundException("Couldn't find job class '" + className + "' ", e);
445 }
446 }
447
448 /***
449 * The worker task that runs a sheduled job.
450 */
451 private class RunnerTask
452 extends Task
453 {
454 private AbstractJobDescriptor job;
455
456 private Thread thread;
457
458 public RunnerTask(AbstractJobDescriptor job)
459 {
460 this.job = job;
461 this.thread = Thread.currentThread();
462 }
463
464 public String getName()
465 {
466 return "scheduler: " + job.getName();
467 }
468
469 public void process(Context context)
470 {
471 synchronized(runners)
472 {
473 Set set = (Set)runners.get(job);
474 if(set == null)
475 {
476 set = new HashSet();
477 runners.put(job, set);
478 }
479 set.add(this);
480 }
481
482 synchronized(job)
483 {
484 try
485 {
486 job.setLastRunTime(new Date());
487 job.setRunCount(job.getRunCount() + 1);
488 }
489 catch(JobModificationException e)
490 {
491 logger.warn("failed to save job accountig information", e);
492 }
493 }
494
495 try
496 {
497 try
498 {
499 getJobObject(job).run(new String[] { job.getArgument() });
500 }
501 catch(JobNotFoundException e)
502 {
503 logger.error("invalid job specification " + job.getJobClassName(), e);
504 }
505 finally
506 {
507 synchronized(runners)
508 {
509 Set set = (Set)runners.get(job);
510 if(set != null)
511 {
512 set.remove(this);
513 if(set.isEmpty())
514 {
515 synchronized(job)
516 {
517 job.setRunning(false);
518 }
519 }
520 }
521 }
522
523 if(!job.isReentrant())
524 {
525 AbstractScheduler.this.schedule(job);
526 }
527 thread = null;
528
529 }
530 }
531 catch(VirtualMachineError e)
532 {
533 throw e;
534 }
535 catch(ThreadDeath e)
536 {
537 throw e;
538 }
539 catch(Throwable t)
540 {
541 logger.error("uncaught exception in scheduled job " + job.getName(), t);
542 }
543 }
544
545 public void terminate(Thread t)
546 {
547 try
548 {
549 getJobObject(job).terminate(t);
550 }
551 catch(JobNotFoundException e)
552 {
553 logger.error("invalid job specification " + job.getJobClassName(), e);
554 }
555 }
556
557 public void terminate()
558 {
559 if(thread != null)
560 {
561 terminate(thread);
562 }
563 }
564 }
565
566 /***
567 * The daemon task that performs the timekeeping
568 */
569 private class SchedulerTask
570 extends Task
571 {
572 public String getName()
573 {
574 return "Job Scheduler";
575 }
576
577 public void process(Context context)
578 {
579 synchronized(queue)
580 {
581 long now;
582 loop: while(!Thread.interrupted())
583 {
584
585 if(queue.size() == 0)
586 {
587 try
588 {
589 queue.wait();
590 }
591 catch(InterruptedException e)
592 {
593 break loop;
594 }
595 }
596
597 now = System.currentTimeMillis();
598 Long first = (Long)queue.firstKey();
599 while(!queue.isEmpty() && first.longValue() <= now)
600 {
601
602
603 Set set = (Set)queue.remove(first);
604 Iterator i = set.iterator();
605 while(i.hasNext())
606 {
607 AbstractJobDescriptor job = (AbstractJobDescriptor)i.next();
608 AbstractScheduler.this.run(job);
609 if(job.isReentrant())
610 {
611 AbstractScheduler.this.schedule(job);
612 }
613 }
614 if(!queue.isEmpty())
615 {
616 first = (Long)queue.firstKey();
617 }
618 }
619
620 if(!queue.isEmpty())
621 {
622 try
623 {
624 queue.wait(first.longValue() - now);
625 }
626 catch(InterruptedException e)
627 {
628 break loop;
629 }
630 }
631 }
632 }
633 }
634
635 public void terminate(Thread t)
636 {
637 t.interrupt();
638 }
639 }
640 }