IO実行中のQuartzジョブを中断する方法
17212 ワード
Interrupt a Quartz job that doing IO
IOを実行しているQuartzジョブを中断したい場合は、InterruptibleChannelを使用する場合に実行できます.Oracleリンクを参照:このインタフェースを実装したチャネルは、割り込み可能です.あるスレッドが割り込みチャネルでI/O操作をブロックしている場合、別のスレッドが割り込みスレッドの割り込み方法を呼び出すことができます.これにより、チャネルが閉じられ、ブロックされたスレッドはClosedByInterruptExceptionを受け取り、ブロックされたスレッドの割り込み状態を設定します.したがって,自分の作業の実行スレッドの作業計画を取得し,後で使用するために保存することができる.Quartzスケジューリングがジョブを中断すると、スレッドのinterrupt()メソッドを呼び出して読み取り/書き込みを停止できます.ここには簡単な例があります.
package demo;
// import statements excluded for brevity
public class MyJob implements InterruptableJob {
private static Logger LOG = LoggerFactory.getLogger(MyJob.class);
private volatile boolean isJobInterrupted = false;
private JobKey jobKey = null;
private volatile Thread thisThread;
public MyJob() {
}
public void execute(JobExecutionContext context) throws JobExecutionException {
thisThread = Thread.currentThread();
LOG.info("Thread name of the current job: " + thisThread.getName());
jobKey = context.getJobDetail().getKey();
LOG.info("Job " + jobKey + " executing at " + new Date());
try {
String fileUrl = "http://d2zwv9pap9ylyd.cloudfront.net/terracotta-3.6.1.tar.gz"; // 59 MB
String localFile = "terracotta-3.6.1.tar.gz";
download(fileUrl, localFile);
} catch (ClosedByInterruptException e) {
LOG.info("Caught ClosedByInterruptException... exiting job.");
} catch (IOException e) {
LOG.info("Caught IOException... exiting job.", e);
} finally {
if (isJobInterrupted) {
LOG.info("Job " + jobKey + " did not complete");
} else {
LOG.info("Job " + jobKey + " completed at " + new Date());
}
}
}
// this method is called by the scheduler
public void interrupt() throws UnableToInterruptJobException {
LOG.info("Job " + jobKey + " -- INTERRUPTING --");
isJobInterrupted = true;
if (thisThread != null) {
// this called cause the ClosedByInterruptException to happen
thisThread.interrupt();
}
}
private void download(String address, String localFileName) throws ClosedByInterruptException, IOException {
URL url = new URL(address);
ReadableByteChannel src = Channels.newChannel(url.openStream());
WritableByteChannel dest = new FileOutputStream(new File(localFileName)).getChannel();
try {
System.out.println("Downloading " + address + " to " + new File(localFileName).getCanonicalPath());
int size = fastChannelCopy(src, dest);
System.out.println("Download completed! " + (size / 1024 / 1024) + " MB");
} finally {
src.close();
dest.close();
}
}
// Code copied from http://thomaswabner.wordpress.com/2007/10/09/fast-stream-copy-using-javanio-channels/
private static int fastChannelCopy(final ReadableByteChannel src, final WritableByteChannel dest) throws IOException {
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
int count = 0;
int total = 0;
while ((count = src.read(buffer)) != -1) {
total += count;
// prepare the buffer to be drained
buffer.flip();
// write to the channel, may block
dest.write(buffer);
// If partial transfer, shift remainder down
// If buffer is empty, same as doing clear()
buffer.compact();
}
// EOF will leave buffer in fill state
buffer.flip();
// make sure the buffer is fully drained.
while (buffer.hasRemaining()) {
dest.write(buffer);
}
return total;
}
}
これは私の主なクラスで、Quartz Schedulerを作成し、予想される割り込みをシミュレートします.ダウンロードには約40秒かかります(59 MBファイル).私たちのジョブがダウンロード中に中断されていることを確認するために、スケジュールを開始して5秒休憩します.注意:見たい宿題が終わったら、約40秒休憩しました.
package demo;
import static org.quartz.DateBuilder.nextGivenSecondDate;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;
import static org.quartz.TriggerBuilder.newTrigger;
// other imports excluded for brevity
public class InterruptExample {
public void run() throws Exception {
final Logger log = LoggerFactory.getLogger(InterruptExample.class);
log.info("------- Initializing ----------------------");
// First we must get a reference to a scheduler
SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sched = sf.getScheduler();
log.info("------- Initialization Complete -----------");
log.info("------- Scheduling Jobs -------------------");
// get a "nice round" time a few seconds in the future...
Date startTime = nextGivenSecondDate(null, 1);
JobDetail job = newJob(MyJob.class).withIdentity("myJob", "group1").build();
SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(startTime)
.withSchedule(simpleSchedule()).build();
sched.scheduleJob(job, trigger);
// start up the scheduler (jobs do not start to fire until
// the scheduler has been started)
sched.start();
log.info("Scheduler thread's name: " + Thread.currentThread().getName());
log.info("------- Started Scheduler -----------------");
try {
// if you want to see the job to finish successfully, sleep for about 40 seconds
Thread.sleep(5 * 1000L);
// tell the scheduler to interrupt our job
sched.interrupt(job.getKey());
Thread.sleep(3 * 1000L);
} catch (Exception e) {
e.printStackTrace();
}
log.info("------- Shutting Down ---------------------");
sched.shutdown(true);
log.info("------- Shutdown Complete -----------------");
}
public static void main(String[] args) throws Exception {
InterruptExample example = new InterruptExample();
example.run();
}
}
これはログです.私たちの宿題がinteruptedに早期に終了されたことを示しています.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
INFO [main] ------- Initializing ----------------------
INFO [main] Using default implementation for ThreadExecutor
INFO [main] Job execution threads will use class loader of thread: main
INFO [main] Initialized Scheduler Signaller of type: class org.quartz.core.SchedulerSignalerImpl
INFO [main] Quartz Scheduler v.2.1.3 created.
INFO [main] RAMJobStore initialized.
INFO [main] Scheduler meta-data: Quartz Scheduler (v2.1.3) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED'
Scheduler class: 'org.quartz.core.QuartzScheduler' - running locally.
NOT STARTED.
Currently in standby mode.
Number of jobs executed: 0
Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads.
Using job-store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered.
INFO [main] Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties'
INFO [main] Quartz scheduler version: 2.1.3
INFO [main] ------- Initialization Complete -----------
INFO [main] ------- Scheduling Jobs -------------------
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started.
INFO [main] Scheduler thread's name: main
INFO [main] ------- Started Scheduler -----------------
INFO [DefaultQuartzScheduler_Worker-1] Thread name of the current job: DefaultQuartzScheduler_Worker-1
NFO [DefaultQuartzScheduler_Worker-1] Job group1.myJob executing at Mon Apr 16 16:24:40 PDT 2012
Downloading
http://d2zwv9pap9ylyd.cloudfront.net/terracotta-3.6.1.tar.gz to S:\quartz-interrupt-demo\terracotta-3.6.1.tar.gz
INFO [main] Job group1.myJob -- INTERRUPTING --
INFO [DefaultQuartzScheduler_Worker-1] Caught ClosedByInterruptException... exiting job.
INFO [DefaultQuartzScheduler_Worker-1] Job group1.myJob did not complete
ERROR [DefaultQuartzScheduler_Worker-1] Worker thread was interrupt()'ed.
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:552)
INFO [main] ------- Shutting Down ---------------------
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutting down.
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED paused.
INFO [main] Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED shutdown complete.
INFO [main] ------- Shutdown Complete -----------------
原文:http://itindex.net/blog/2012/04/23/1335149680608.html