AsyncTask是google公司封裝的一個輕量級的異步任務類。實際上它內部也是通過Thread + handler實現的。如果沒有AsyncTask類,我們完全可以用thread+handler來處理。這個時候就很可能自己回去封裝一下thread+handler了。正是因為這類需求很多,google就幫我們封裝了一下。其實我們也可以自己封裝,但是我相信99%程序員自己封裝的東西比不上google的。所以還是有必要學習一下AsyncTask。
public abstract class AsyncTask{ private static final String LOG_TAG = "AsyncTask"; private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueue sPoolWorkQueue = new LinkedBlockingQueue (128); /** * An {@link Executor} that can be used to execute tasks in parallel. */ public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory); /** * An {@link Executor} that executes tasks one at a time in serial * order. This serialization is global to a particular process. */ public static final Executor SERIAL_EXECUTOR = new SerialExecutor(); private static final int MESSAGE_POST_RESULT = 0x1; private static final int MESSAGE_POST_PROGRESS = 0x2; private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR; private static InternalHandler sHandler; private final WorkerRunnable mWorker; private final FutureTask mFuture; private volatile Status mStatus = Status.PENDING; ... };
1. static Executor THREAD_POOL_EXECUTOR
2. static Executor SERIAL_EXECUTOR;
3. static Executor sDefaultExecutor; 這玩意默認指向了SERIAL_EXECUTOR。看源代碼就知道了:
private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;
4. static InternalHandler sHandler;
5. WorkerRunnable
6. FutureTask
7. Status mStatus;
private static class SerialExecutor implements Executor { final ArrayDequemTasks = new ArrayDeque (); Runnable mActive; public synchronized void execute(final Runnable r) { mTasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (mActive == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((mActive = mTasks.poll()) != null) { THREAD_POOL_EXECUTOR.execute(mActive); } } }
private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors(); private static final int CORE_POOL_SIZE = CPU_COUNT + 1; private static final int MAXIMUM_POOL_SIZE = CPU_COUNT * 2 + 1; private static final int KEEP_ALIVE = 1; private static final ThreadFactory sThreadFactory = new ThreadFactory() { private final AtomicInteger mCount = new AtomicInteger(1); public Thread newThread(Runnable r) { return new Thread(r, "AsyncTask #" + mCount.getAndIncrement()); } }; private static final BlockingQueuesPoolWorkQueue = new LinkedBlockingQueue (128); public static final Executor THREAD_POOL_EXECUTOR = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS, sPoolWorkQueue, sThreadFactory);
當創建ThreadPoolExecutor對象的時候,構造函數裡面第一個參數就是核心線程數量,這裡取得是CPU個數 + 1, 第二個參數是最大線程數量,這裡是CPU個數 * 2 + 1,第五個參數是緩沖區的隊列,這裡是個LinkedBlockingQueue,這個隊列的最大容量是128.
1. 如果正在運行的線程數量小於核心線程數量(由調用者設置,像AsyncTask就設置了cpu個數+1),那麼就新創建要給線程,來執行任務(execute的傳入參數)
2. 如果正在運行的線程數量大於等於核心線程數量,這個時候就分兩種情況:
a. 可以把任務丟進緩沖區,那就丟進去,等待空閒線程來執行。
b. 如果緩沖區滿了,那就看最大線程數 - 運行線程數是不是>0。如果 > 0, 就創建線程來運行新的任務。如果=0,那就丟出異常,也就是ThreadPoolExecutor不接受這個任務了。(所以使用ThreadPoolExecutor的時候需要注意異常,因為它有可能不接受任務)以下是ThreadPoolExecutor的execute代碼。
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
1. 沒有任何線程在運行
2. 只有一個線程在運行,執行完一個任務就繼續執行SerialExecutor的容器裡面的下一個任務,如果有,就在當前線程裡面繼續執行,如果沒有線程結束或者空閒。當serialExecutor有新任務來的時候,就再啟動一個線程(或者用某個空閒線程)來執行任務。
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
mWorker & mFuture
public AsyncTask() { mWorker = new WorkerRunnable() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask (mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
private static abstract class WorkerRunnableimplements Callable { Params[] mParams; }
就放了個數據成員:Params[] mParams。
1. 首先AsyncTask的構造函數會創建mWorker和mFuture。
2. 調用AsyncTask的execute過程如:
public final AsyncTaskexecute的函數params被丟到了mWorker裡面去,然後exec.execute(mFuture)執行任務,這裡exec是sDefaultExecutor,而sDefaultExecutor就是SerialExecutor,execute(Params... params) { return executeOnExecutor(sDefaultExecutor, params); } public final AsyncTask executeOnExecutor(Executor exec, Params... params) { if (mStatus != Status.PENDING) { switch (mStatus) { case RUNNING: throw new IllegalStateException("Cannot execute task:" + " the task is already running."); case FINISHED: throw new IllegalStateException("Cannot execute task:" + " the task has already been executed " + "(a task can be executed only once)"); } } mStatus = Status.RUNNING; onPreExecute(); mWorker.mParams = params; exec.execute(mFuture); return this; }
(private static volatile Executor sDefaultExecutor = SERIAL_EXECUTOR;)
AsyncTaskasyncTask = new AsyncTask (){ @Override protected String doInBackground(Void... param) { Log.v("AsyncTask", "doInBackground"); return "hello asyncTask"; } @Override public void onPostExecute(String response) { // callback.onSendRequestFinished(JsonUtil.jsonToBean(response, mBeanType)); Toast.makeText(MainActivity.this, "result: " + response, Toast.LENGTH_LONG).show(); } }; asyncTask.execute();
public AsyncTask() { mWorker = new WorkerRunnable() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } }; mFuture = new FutureTask (mWorker) { @Override protected void done() { try { postResultIfNotInvoked(get()); } catch (InterruptedException e) { android.util.Log.w(LOG_TAG, e); } catch (ExecutionException e) { throw new RuntimeException("An error occurred while executing doInBackground()", e.getCause()); } catch (CancellationException e) { postResultIfNotInvoked(null); } } }; }
mWorker是WorkerRunnable的子類(匿名內嵌子類)對象,mWorker被傳給了mFuture,FutureTask的 callable就是mWorker。
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
1. ThreadPoolExecutor裡面的一個線程執行任務(mFuture)
2. FutureTask的run()會在線程裡面被執行
3. Future的run()裡面會嘗試獲得callable,然後調用callable的call()函數。callable就是mWorker,也就是WorkRunnable,而WorkRunnable實現了接口Callable,Callable裡面有個方法就是call()。
public void run() { if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
4. 這樣,mWorker是WorkerRunnable的子類對象,而且剛好實現了call函數,而call函數通過接口Callable在FutureTask的run()裡面被調用了。所以AsyncTask的構造函數裡面的匿名內嵌類裡面的call實現被ThreadPoolExecutor的線程調用了。也就是這段代碼:
mWorker = new WorkerRunnable() { public Result call() throws Exception { mTaskInvoked.set(true); Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); //noinspection unchecked Result result = doInBackground(mParams); Binder.flushPendingCommands(); return postResult(result); } };
整個AsyncTask的大致流程就是這樣,當然還有其他一些內容,如cancel, onPreExecute, onPostExecute, onCancelled等等,還有ThreadPoolExecutor執行完任務後,怎麼通知主線程的等等問題。