フレームワークから見たスレッドプールクラス
9508 ワード
public class BackgroundExecutor {
private static final String TAG = "BackgroundExecutor";
public static Executor DEFAULT_EXECUTOR = Executors.newScheduledThreadPool(2 * Runtime.getRuntime().availableProcessors());
private static Executor executor = DEFAULT_EXECUTOR;
private static final List<Task> tasks = new ArrayList<Task>();
/**
* Execute a runnable after the given delay.
*
* @param runnable
* the task to execute
* @param delay
* the time from now to delay execution, in milliseconds
* @throws IllegalArgumentException
* if <code>delay</code> is strictly positive and the current
* executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
* @return Future associated to the running task
*/
private static Future<?> directExecute(Runnable runnable, int delay) {
Future<?> future = null;
if (delay > 0) {
/* no serial, but a delay: schedule the task */
if (!(executor instanceof ScheduledExecutorService)) {
throw new IllegalArgumentException("The executor set does not support scheduling");
}
ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) executor;
future = scheduledExecutorService.schedule(runnable, delay, TimeUnit.MILLISECONDS);
} else {
if (executor instanceof ExecutorService) {
ExecutorService executorService = (ExecutorService) executor;
future = executorService.submit(runnable);
} else {
/* non-cancellable task */
executor.execute(runnable);
}
}
return future;
}
/**
* Execute a task after (at least) its delay <strong>and</strong> after all
* tasks added with the same non-null <code>serial</code> (if any) have
* completed execution.
*
* @param task
* the task to execute
* @throws IllegalArgumentException
* if <code>task.delay</code> is strictly positive and the
* current executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static synchronized void execute(Task task) {
Future<?> future = null;
if (task.serial == null || !hasSerialRunning(task.serial)) {
task.executionAsked = true;
future = directExecute(task, task.remainingDelay);
}
if (task.id != null || task.serial != null) {
/* keep task */
task.future = future;
tasks.add(task);
}
}
/**
* Execute a task.
*
* @param runnable
* the task to execute
* @param id
* identifier used for task cancellation
* @param delay
* the time from now to delay execution, in milliseconds
* @param serial
* the serial queue (<code>null</code> or <code>""</code> for no
* serial execution)
* @throws IllegalArgumentException
* if <code>delay</code> is strictly positive and the current
* executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static void execute(final Runnable runnable, String id, int delay, String serial) {
execute(new Task(id, delay, serial) {
@Override
public void execute() {
runnable.run();
}
});
}
/**
* Execute a task after the given delay.
*
* @param runnable
* the task to execute
* @param delay
* the time from now to delay execution, in milliseconds
* @throws IllegalArgumentException
* if <code>delay</code> is strictly positive and the current
* executor does not support scheduling (if
* {@link #setExecutor(Executor)} has been called with such an
* executor)
*/
public static void execute(Runnable runnable, int delay) {
directExecute(runnable, delay);
}
/**
* Execute a task.
*
* @param runnable
* the task to execute
*/
public static void execute(Runnable runnable) {
directExecute(runnable, 0);
}
/**
* Execute a task after all tasks added with the same non-null
* <code>serial</code> (if any) have completed execution.
*
* Equivalent to {@link #execute(Runnable, String, int, String)
* execute(runnable, id, 0, serial)}.
*
* @param runnable
* the task to execute
* @param id
* identifier used for task cancellation
* @param serial
* the serial queue to use (<code>null</code> or <code>""</code>
* for no serial execution)
*/
public static void execute(Runnable runnable, String id, String serial) {
execute(runnable, id, 0, serial);
}
/**
* Change the executor.
*
* Note that if the given executor is not a {@link ScheduledExecutorService}
* then executing a task after a delay will not be supported anymore. If it
* is not even a {@link ExecutorService} then tasks will not be cancellable
* anymore.
*
* @param executor
* the new executor
*/
public static void setExecutor(Executor executor) {
BackgroundExecutor.executor = executor;
}
/**
* Cancel all tasks having the specified <code>id</code>.
*
* @param id
* the cancellation identifier
* @param mayInterruptIfRunning
* <code>true</code> if the thread executing this task should be
* interrupted; otherwise, in-progress tasks are allowed to
* complete
*/
public static synchronized void cancelAll(String id, boolean mayInterruptIfRunning) {
for (int i = tasks.size() - 1; i >= 0; i--) {
Task task = tasks.get(i);
if (id.equals(task.id)) {
if (task.future != null) {
task.future.cancel(mayInterruptIfRunning);
if (!task.managed.getAndSet(true)) {
/*
* the task has been submitted to the executor, but its
* execution has not started yet, so that its run()
* method will never call postExecute()
*/
task.postExecute();
}
} else if (task.executionAsked) {
Log.w(TAG, "A task with id " + task.id + " cannot be cancelled (the executor set does not support it)");
} else {
/* this task has not been submitted to the executor */
tasks.remove(i);
}
}
}
}
/**
* Indicates whether a task with the specified <code>serial</code> has been
* submitted to the executor.
*
* @param serial
* the serial queue
* @return <code>true</code> if such a task has been submitted,
* <code>false</code> otherwise
*/
private static boolean hasSerialRunning(String serial) {
for (Task task : tasks) {
if (task.executionAsked && serial.equals(task.serial)) {
return true;
}
}
return false;
}
/**
* Retrieve and remove the first task having the specified
* <code>serial</code> (if any).
*
* @param serial
* the serial queue
* @return task if found, <code>null</code> otherwise
*/
private static Task take(String serial) {
int len = tasks.size();
for (int i = 0; i < len; i++) {
if (serial.equals(tasks.get(i).serial)) {
return tasks.remove(i);
}
}
return null;
}
public static abstract class Task implements Runnable {
private String id;
private int remainingDelay;
private long targetTimeMillis; /* since epoch */
private String serial;
private boolean executionAsked;
private Future<?> future;
/*
* A task can be cancelled after it has been submitted to the executor
* but before its run() method is called. In that case, run() will never
* be called, hence neither will postExecute(): the tasks with the same
* serial identifier (if any) will never be submitted.
*
* Therefore, cancelAll() *must* call postExecute() if run() is not
* started.
*
* This flag guarantees that either cancelAll() or run() manages this
* task post execution, but not both.
*/
private AtomicBoolean managed = new AtomicBoolean();
public Task(String id, int delay, String serial) {
if (!"".equals(id)) {
this.id = id;
}
if (delay > 0) {
remainingDelay = delay;
targetTimeMillis = System.currentTimeMillis() + delay;
}
if (!"".equals(serial)) {
this.serial = serial;
}
}
@Override
public void run() {
if (managed.getAndSet(true)) {
/* cancelled and postExecute() already called */
return;
}
try {
execute();
} finally {
/* handle next tasks */
postExecute();
}
}
public abstract void execute();
private void postExecute() {
if (id == null && serial == null) {
/* nothing to do */
return;
}
synchronized (BackgroundExecutor.class) {
/* execution complete */
tasks.remove(this);
if (serial != null) {
Task next = take(serial);
if (next != null) {
if (next.remainingDelay != 0) {
/* the delay may not have elapsed yet */
next.remainingDelay = Math.max(0, (int) (targetTimeMillis - System.currentTimeMillis()));
}
/* a task having the same serial was queued, execute it */
BackgroundExecutor.execute(next);
}
}
}
}
}
}