作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
Android Handler 详解

Custom Tab

Android开发中经常使用Handler来实现“跨越线程(Activity)更新UI”。本文将从源码角度回答:为什么使用Handler能够跨线程更新UI?为什么跨线程更新UI一定要用Handler?

Demo

Demo1. 用Handler更新UI

下面这个Demo完全是为了演示“跨线程更新UI”而写的。界面上只有一个TextView和一个Button,按下Button创建一个后台线程,该后台线程每隔一秒更新一次TextView,连续更新10次,结束。

Activity的代码如下:

public class MainActivity extends Activity {
    static final String TAG = "MainActivity";

    Handler handler = null;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        final TextView text = (TextView)findViewById(R.id.txtHello);
        Button button = (Button)findViewById(R.id.btnRun);

        button.setOnClickListener(new OnClickListener(){
            @Override
            public void onClick(View v) {
                Log.d(TAG, "clicked!");
                new Thread() {
                    public void run() {
                        for(int i=0; i<10; i++) {
                            Message msg = new Message();
                            msg.what = 1;
                            msg.obj = "item-"+i;

                            handler.sendMessage(msg);
                            Log.d(TAG, "sended "+"item-"+i);

                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
            }
        });

        handler = new Handler() {
            @Override
            public void handleMessage(Message msg) {
                String str = "unknow";
                switch(msg.what) {
                    case 1:
                        str =  (String)msg.obj;
                        break;
                    default:
                        break;
                }
                Log.d(TAG, "recv " + str);
                text.setText(str);
                super.handleMessage(msg);
            }
        };
    }

    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.
        getMenuInflater().inflate(R.menu.main, menu);
        return true;
    }

}

布局文件较为简单:

<relativelayout xmlns:android="https://schemas.android.com/apk/res/android" 
xmlns:tools="https://schemas.android.com/tools" 
android:layout_width="match_parent" 
android:layout_height="match_parent" 
android:paddingbottom="@dimen/activity_vertical_margin" 
android:paddingleft="@dimen/activity_horizontal_margin" 
android:paddingright="@dimen/activity_horizontal_margin" 
android:paddingtop="@dimen/activity_vertical_margin" tools:context=".MainActivity">
<textview android:id="@+id/txtHello" 
android:layout_width="wrap_content" 
android:layout_height="wrap_content" 
android:text="@string/hello_world">
<button android:id="@+id/btnStart" 
android:layout_width="wrap_content" 
android:layout_height="wrap_content" android:text="Start">
</button></textview></relativelayout>


这里展示的是Handler的典型用法——用来更新UI控件。

下面再展示一个非典型用法,仅仅是为了后面的分析方便。


Demo2. 自制ActivityThread模拟Activity

本例是为了分析方便而创建的;使用一个线程LooperThread来模拟Activity。

后面阐述为什么要这么做,代码如下:

package com.example.handlerdemo;

import android.os.Bundle;
import android.os.Message;
import android.app.Activity;
import android.util.Log;
import android.view.Menu;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;

public class MainActivity extends Activity {
    static final String TAG = "MainActivity";

    ActivityThread acitivityThread = null;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        setupViews();
    }

    private void setupViews() {
        TextView tv = (TextView)findViewById(R.id.txtHello);
        Button bt = (Button)findViewById(R.id.btnStart);

        Log.d(TAG, String.format("[MainActivity] Thread %s(%d)",
                     Thread.currentThread().getName(), Thread.currentThread().getId()));
        acitivityThread = new ActivityThread();
        acitivityThread.start();

        acitivityThread.waitForHandlerReady();

        bt.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread() {
                    @Override
                    public void run() {
                        for(int i=0; i<10; i++) {
                            Message msg = new Message();
                            msg.what = i;
                            acitivityThread.mHandler.sendMessage(msg);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }.start();
            }
        });
    }

    @Override
    public boolean onCreateOptionsMenu(Menu menu) {
        // Inflate the menu; this adds items to the action bar if it is present.
        getMenuInflater().inflate(R.menu.main, menu);
        return true;
    }
}

MainActivity.java 

package com.example.handlerdemo;

import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import android.util.Log;

public class ActivityThread extends Thread {
    static final String TAG = "LooperThread";

    public Handler mHandler = null;

    public ActivityThread() {
        super("LooperThread");
    }

    @Override
    public void run() {
        Looper.prepare();

        synchronized(this) {
            mHandler = new Handler() {
                @Override
                public void handleMessage(Message msg) {
                    Log.d(TAG, String.format("recv msg.what: %d in Thread: %s(%d)", msg.what,
                               Thread.currentThread().getName(),Thread.currentThread().getId()));
                }
            };
            this.notify();
        }

        Looper.loop();
    }

    public void waitForHandlerReady() {
            try {
                synchronized(this) {
                    while(mHandler == null)
                        this.wait();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}

ActivityThread.java


这个Demo的布局文件很简单,就不贴出来了。


为什么使用Handler能够跨线程更新UI?

概览

以Demo2为例,这个Demo至少涉及三个线程:GodActivity线程,ActivityThread线程(模拟UI),匿名线程(GodActivity创建的,叫他aThread)。暂且把GodActivity当做上帝,把ActivityThread看做Demo1里的Activity。现在,我们先预览一下为什么aThread可以通过Handler来更新ActivityThread的UI(纯属虚构),这两个线程的交互关系如下图所示:

2014082909073893.jpg

(PS:此前的版本画了很多对象的生命线,结果很混乱,删了一堆无关紧要的之后,立刻清晰了,^_^)

这个序列图(Sequence Diagram)已经简洁明了地给出了答案:<喎�/kf/ware/vc/" target="_blank" class="keylink">vcD4KCkFjdGl2aXR5z9+zzLXExLu687u509DSu7j2TWVzc2FnZVF1ZXVlo7tNZXNzYWdlUXVldWW5ysP7y7zS6crH0ru49k1lc3NhZ2XX6bPJtcRRdWV1ZaO7YVRocmVhZNa7yse9q8r9vt3S1E1lc3NhZ2W1xNDOyr250rW9wctBY3Rpdml0ecS7uvO1xE1lc3NhZ2VRdWV1ZcnPwcuju0FjdGl2aXR5z9+zzLTTTWVzc2FnZVF1ZXVlyc/IoU1lc3NhZ2WyorX308NIYW5kbGVyLmhhbmRsZXJNZXNzYWdlo6zL+dLUyrW8yrXEobC4/NDCtq/X96Gxu7nKx7eiyfrU2kFjdGl2aXR5z9+zzMTao7sKPHA+PGJyPgo8L3A+CjxoMj7P6r3iPC9oMj4KPHA+z8LD5r2rtNNBbmRyb2lkIDQuNC401LTC67XEvce2yLfWzvZIYW5kbGVytcShsMS7uvO62srWobGhozwvcD4KPGgzPry4uPa52Lz8wOA8L2gzPgo8cD5EZW1vMtbQus1IYW5kbGVy09C52LXEwOCz/cHLTWVzc2FnZVF1ZXVlu7nT0E1lc3NhZ2W6zUxvb3BlcqOs1eK8uLj2wOC1xLnYz7XI58/Co7o8L3A+CjxwPjxpbWcgc3JjPQ=="/uploadfile/Collfiles/20140829/2014082909073994.jpg" alt="\">

关键点:

MessageQueue通过Message.next维护链表结构(java引用即指针);ActivityThread的消息循环被封装在Looper.loop()内,Looper.prepare()用于创建属于当前线程的Looper和MessageQueue;
每个Message可以通过target指向一个Handler,Handler实际上就是一个用来处理Message的callback;

接下来的代码,只贴代码片段(方法),如果对各类的属性有所疑惑,可以回头查看此图。

Looper.prepare()


根据Looper的注释可以看到,Looper线程“三部曲”:

Looper.prepare()new Handler() { /* override handleMessage() */ }Looper.loop();


下面逐渐切入Looper.prepare():

  public static void prepare() {
        prepare(true);
    }

Looper.java


无参数版本调用了有参数版本:

private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper(quitAllowed)); // 放入“单例”中
    }

Looper.java 

这段代码中引用了sThreadLocal,它被定义为ThreadLocal类型,即线程私有数据类型(或者叫做线程级别单例)

ThreadLocal可以理解为Map的一层包包装(实际上Android,JVM都是按Map实现的,感兴趣的同学可自行研究;set(value)时,以当前线程对象为key,所以每个线程能够保存一份value。)

可见Looper.prepare()调用使得AcitivityThread通过Looper.sThreadLocal持有了一个Looper对象。


继续看Looper的构造方法Looper(quitAllowed):

private Looper(boolean quitAllowed) {
    mQueue = new MessageQueue(quitAllowed); 
    mThread = Thread.currentThread(); // 和当前线程关联
}

Handler.java



可以看到Looper的构造函数中创建了一个MessageQueue。



流程又转到了MessageQueue的构造函数MessageQueue(quitAllowed):

MessageQueue(boolean quitAllowed) {
    mQuitAllowed = quitAllowed;
    mPtr = nativeInit();
}

MessageQueue.java


Handler()


首先看上面调用的默认构造方法:

/**
 * Default constructor associates this handler with the {@link Looper} for the
 * current thread. 将当前线程的Looper与此handler关联。
 *   如果当前线程没有looper,这个handler将不能接收消息,从而导致异常抛出
 * If this thread does not have a looper, this handler won't be able to receive messages
 * so an exception is thrown.
 */
public Handler() {
    this(null, false);
}

Handler.java




默认构造方法又调用了另一版本的构造方法,如下:

public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) { // FIND_POTENTIAL_LEAKS 为 false;
        final Class<!--? extends Handler--> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }
 
    mLooper = Looper.myLooper(); // 获取当前线程(调用者)的Looper
    if (mLooper == null) { // 如果当前线程没有Looper,则抛异常
        throw new RuntimeException( 
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue; // 这里引用的MessageQueue是Looper()中创建的
    mCallback = callback;
    mAsynchronous = async;
}

Handler.java


Handler()调用了Looper.myLooper():

public static Looper myLooper() {
    return sThreadLocal.get(); // 从该线程的“单例”中取出Looper对象
}

Looper.java



Looper.loop()





Looper.loop()封装了消息循环,所以我们现在看看Looper.loop()的“真面目”:

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;
 
    // Make sure the identity of this thread is that of the local process,
    // and keep track of what that identity token actually is.
    Binder.clearCallingIdentity();
    final long ident = Binder.clearCallingIdentity();
 
    for (;;) {
        Message msg = queue.next(); // might block, 取出消息
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
 
        // This must be in a local variable, in case a UI event sets the logger
        Printer logging = me.mLogging;
        if (logging != null) {
            logging.println(">>>>> Dispatching to " + msg.target + " " +
                    msg.callback + ": " + msg.what);
        }
 
            // mLatencyLock is only initialized for non USER builds
            // (e.g., USERDEBUG and ENG)
            if ((!sLatencyEnabled) || (me != sMainLooper)) {
                msg.target.dispatchMessage(msg); // 通过msg.target分派消息
            }
            else { // 记录性能数据
                long t1 = SystemClock.uptimeMillis(); // 获得当前毫秒数(自启动)
                msg.target.dispatchMessage(msg);
                long t2 = SystemClock.uptimeMillis() - t1; // t2就是dispatchMessage(msg)所用时间
                if (t2 < 50) {
                    // We don't care about these from a latency perspective
                }
                else if (t2 < 250) {
                    // Fast response that usually has low impact on user experience
                    sLatencyCountFast++;
                    sLatencySumFast += t2;
                    if (sLatencyCountFast >= 100) {
                        String name = getProcessName();
                        long avg = sLatencySumFast / sLatencyCountFast;
                        EventLog.writeEvent(2731, "mainloop2_latency1", name, avg);
                        sLatencyCountFast = 0;
                        sLatencySumFast = 0;
                    }
                }
                else if (t2 < 1000) {
                    sLatencyCountSlow++;
                    sLatencySumSlow += t2;
                    if (sLatencyCountSlow >= 10) {
                        String name = getProcessName();
                        long avg = sLatencySumSlow / sLatencyCountSlow;
                        EventLog.writeEvent(2731, "mainloop2_latency2", name, avg);
                        sLatencyCountSlow = 0;
                        sLatencySumSlow = 0;
                    }
                }
                else {
                    String name = getProcessName();
                    EventLog.writeEvent(2731, "mainloop2_bad", name, t2);
                }
            }
 
        if (logging != null) {
            logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
        }
 
        // Make sure that during the course of dispatching the
        // identity of the thread wasn't corrupted.
        final long newIdent = Binder.clearCallingIdentity();
        if (ident != newIdent) {
            Log.wtf(TAG, "Thread identity changed from 0x"
                    + Long.toHexString(ident) + " to 0x"
                    + Long.toHexString(newIdent) + " while dispatching to "
                    + msg.target.getClass().getName() + " "
                    + msg.callback + " what=" + msg.what);
        }
 
        msg.recycle();
    }
}

Looper.java


可以看到,Looper.loop()的for循环实际上就是“消息循环”,它负责从消息队列(MessageQueue)中不断地取出消息(MessageQueue.next),然后通过msg.target来派发(dispatch)消息。


How to dispatch?



下面看看Message到底是如何被dispatch的:

public void dispatchMessage(Message msg) {
    if (msg.callback != null) { // 方法 1
        handleCallback(msg); 
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) { // 方法 2
                return;
            }
        }
        handleMessage(msg); // 方法 3
    }
}

Handler.java


从这段代码可以看出,实现正常的Message处理有三种方式:

为Message.callback注册一个Runnable实例。为Handler.mCallback注册一个Handler.Callback实例。重写Handler的handleMessage方法。

另外,这三种方法优先级依次降低,且一个Message只能有一种处理方式。


Message的发送与获取

对于一个后台线程,它要发出消息(Handler.sendMessage);对于Activity线程,它要得到其他线程发来的消息(MessageQueue.next);而这两种工作都是以MessageQueue为基础的。下面,分别分析发送和接收的具体流程:

Handler.sendMessage()

Demo中后台线程正是通过Handler.sendMessage实现向Activity发消息的,Handler.sendMessage方法的代码如下:

public final boolean sendMessage(Message msg)
{
    return sendMessageDelayed(msg, 0);
}

Handler.java

public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

Handler.java
其中,其中SystemClock.uptimeMillis()返回自启动以来CPU经过的毫秒数。

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

Handler.java

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this; // 将当前Handler(通常已重写handleMessage方法)与该Message绑定(通过target)
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis); // 调用MessageQueue.enqueueMessage
}

Handler.java


这里看到了Looper.loop()里引用的target的来源。



流程转到了MessageQueue.enqueueMessage(),看命名基本知道它是入队操作,代码如下:

boolean enqueueMessage(Message msg, long when) {
    if (msg.isInUse()) {
        throw new AndroidRuntimeException(msg + " This message is already in use.");
    }
    if (msg.target == null) {
        throw new AndroidRuntimeException("Message must have a target.");
    }
 
    synchronized (this) { // 临界区
        if (mQuitting) {
            RuntimeException e = new RuntimeException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w("MessageQueue", e.getMessage(), e);
            return false;
        }
 
        msg.when = when;
        Message p = mMessages; // 链表头
        boolean needWake;
        if (p == null || when == 0 || when < p.when) {
            // p == null 队列为空
            // when == 0 由 Handler.sendMessageAtFrontOfQueue() 发出
            // when < p.when 新消息的when比队头要早
            // New head, wake up the event queue if blocked.
            msg.next = p;    // 将msg放到队头,step 1
            mMessages = msg; // 将msg放到队头,step 2
            needWake = mBlocked;
        } else {
            // Inserted within the middle of the queue.  Usually we don't have to wake 插到队列中间。通常我们不必唤醒
            // up the event queue unless there is a barrier at the head of the queue 事件(event)队列,除非队头有一个barrier,
            // and the message is the earliest asynchronous message in the queue.且消息是队列中最早的同步消息。
            needWake = mBlocked && p.target == null && msg.isAsynchronous();
            Message prev;
            for (;;) { // 遍历链表
                prev = p;
                p = p.next;
                if (p == null || when < p.when) { // 到“尾部”了 或 新消息比当前消息更早
                    break;
                }
                if (needWake && p.isAsynchronous()) {
                    needWake = false;
                }
            }
            // 以下两行将msg插入prev和p之间
            msg.next = p; // invariant: p == prev.next
            prev.next = msg;
        }
 
        // We can assume mPtr != 0 because mQuitting is false.
        if (needWake) {
            nativeWake(mPtr); // 通知前台线程“有消息来啦”
        }
    }
    return true;
}

MessageQueue.java
根据这段代码可知,MessageQueue上的Message是按照when大小排列的。



MessageQueue.next()

前文的Looper.loop方法通过MessageQueue.next()取出消息,现在看看它是如何实现的:

Message next() {
    int pendingIdleHandlerCount = -1; // -1 only during first iteration
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
 
        // We can assume mPtr != 0 because the loop is obviously still running.
        // The looper will not call this method after the loop quits.
        nativePollOnce(mPtr, nextPollTimeoutMillis); // 等待通知,可能阻塞
 
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages; // 链表头
            if (msg != null && msg.target == null) {
                // Stalled by a barrier.  Find the next asynchronous message in the queue.
                do { // 遍历链表
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                if (now < msg.when) {
                    // Next message is not ready.  Set a timeout to wake up when it is ready.
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // Got a message.
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next; // 将msg节点摘下
                    } else { // prevMsg == null, msg是链表头
                        mMessages = msg.next;
                    }
                    msg.next = null; // msg与MessageQueue“断绝关系”
                    if (false) Log.v("MessageQueue", "Returning message: " + msg);
                    msg.markInUse();
                    return msg; // 到这为止,是主体逻辑
                }
            } else {
                // No more messages.
                nextPollTimeoutMillis = -1;
            }
 
            // Process the quit message now that all pending messages have been handled.
            if (mQuitting) {
                dispose();
                return null;
            }
 
            // If first time idle, then get the number of idlers to run.
            // Idle handles only run if the queue is empty or if the first message
            // in the queue (possibly a barrier) is due to be handled in the future.
            if (pendingIdleHandlerCount < 0
                    && (mMessages == null || now < mMessages.when)) {
                pendingIdleHandlerCount = mIdleHandlers.size();
            }
            if (pendingIdleHandlerCount <= 0) {
                // No idle handlers to run.  Loop and wait some more.
                mBlocked = true;
                continue;
            }
 
            if (mPendingIdleHandlers == null) {
                mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
            }
            mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
        }
 
        // Run the idle handlers.
        // We only ever reach this code block during the first iteration.
        for (int i = 0; i < pendingIdleHandlerCount; i++) {
            final IdleHandler idler = mPendingIdleHandlers[i];
            mPendingIdleHandlers[i] = null; // release the reference to the handler
 
            boolean keep = false;
            try {
                keep = idler.queueIdle();
            } catch (Throwable t) {
                Log.wtf("MessageQueue", "IdleHandler threw exception", t);
            }
 
            if (!keep) {
                synchronized (this) {
                    mIdleHandlers.remove(idler);
                }
            }
        }
 
        // Reset the idle handler count to 0 so we do not run them again.
        pendingIdleHandlerCount = 0;
 
        // While calling an idle handler, a new message could have been delivered
        // so go back and look again for a pending message without waiting.
        nextPollTimeoutMillis = 0;
    }
}

MessageQueue.java


小结

MessageQueue.next()和MessageQueue.sendMessage()分别被Activity线程、后台线程调用,而他们两个线程可能同时在调用这两个方法,所以他们共享并修改的成员变量需要加锁,这就是synchronized (this)出现的原因。

至此,已经能够完整的回答“为什么用Handler能够实现跨线程更新UI”。简单的说,Activity线程的背后都有一个消息队列(MessageQueue),后台线程通过Handler的sendMessage方法向这个消息队列上放消息;Activity线程将消息从消息队列上取下来之后,通过具体Handler的handleMessage方法处理消息,而更新UI的代码就在这个handleMessage中;所以,后台线程并没有做实际的“更新”,只是将要更新的内容以借助MessageQueue告诉了Activity线程,Activity线程才是实际做“更新”动作的人。

简言之,Handler并没有真正的实现“跨线程”更新UI,而是将要更新的数据(Message携带)和如何更新(Handler携带)通过消息队列告诉了UI线程,UI线程才是真正的“幕后英雄”。


真正的ActivityThread

Demo2中的ActivityThread完全是虚构出来的,下面来看看Android的Activity到底是不是想我虚构的那样有一个Looper。

经过上面的分析,可以从两方面验证:

看看Activity源码中执行onCreate之前是否调用了Looper.prepare()。执行onXXX方法时的CallStack上是否有Looper.loop();

第二点很容易验证,只需在任意onXXX方法中打一个断点,然后看程序的CallStack,就一面了然了:

aaaaaaaa.png

根据这个调用栈,可以很明显的看到有Looper.loop;同时还能看到是ActivityThread.main调用它的,所以可以看看ActivityThread.main的源码:

public static void main(String[] args) {
    SamplingProfilerIntegration.start();
 
    // CloseGuard defaults to true and can be quite spammy.  We
    // disable it here, but selectively enable it later (via
    // StrictMode) on debug builds, but using DropBox, not logs.
    CloseGuard.setEnabled(false);
 
    Environment.initForCurrentUser();
 
    // Set the reporter for event logging in libcore
    EventLogger.setReporter(new EventLoggingReporter());
 
    Security.addProvider(new AndroidKeyStoreProvider());
 
    Process.setArgV0("<pre-initialized>");
 
    Looper.prepareMainLooper(); // 它和Looper.prepare类似
 
    ActivityThread thread = new ActivityThread();
    thread.attach(false);
 
    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }
 
    AsyncTask.init();
 
    if (false) {
        Looper.myLooper().setMessageLogging(new
                LogPrinter(Log.DEBUG, "ActivityThread"));
    }
 
    Looper.loop();
 
    throw new RuntimeException("Main thread loop unexpectedly exited");
}</pre-initialized>

ActivityThread.java


所以,上面提到的两方面都得到了验证。即真正的ActivityThread是有Looper的。


Native浮云


细心的朋友可能会发现,上面MessageQueue的代码中还遗留几个native开头方法:nativeInit,nativePollOnce,nativeWake。

下面就来扫清这些“遮眼”的浮云。和这几个native方法直接对应的是:

static JNINativeMethod gMessageQueueMethods[] = {
    /* name, signature, funcPtr */
    { "nativeInit", "()I", (void*)android_os_MessageQueue_nativeInit },
    { "nativeDestroy", "(I)V", (void*)android_os_MessageQueue_nativeDestroy },
    { "nativePollOnce", "(II)V", (void*)android_os_MessageQueue_nativePollOnce },
    { "nativeWake", "(I)V", (void*)android_os_MessageQueue_nativeWake },
    { "nativeIsIdling", "(I)Z", (void*)android_os_MessageQueue_nativeIsIdling }
};

android_os_MessageQueue.cpp


nativeInit

下面从adnroid_os_MessageQueue_nativeInit开始,顾名思义,nativeInit当然是完成一些初始化工作的。

static jint android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
    NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); // 创建了NativeMessageQueue
    if (!nativeMessageQueue) {
        jniThrowRuntimeException(env, "Unable to allocate native queue");
        return 0;
    }
 
    nativeMessageQueue->incStrong(env);
    return reinterpret_cast<jint>(nativeMessageQueue);
}
</jint>

android_os_MessageQueue.cpp


看看NativeMessageQueue的声明:

class NativeMessageQueue : public MessageQueue {
public:
    NativeMessageQueue();
    virtual ~NativeMessageQueue();
 
    virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj);
 
    void pollOnce(JNIEnv* env, int timeoutMillis);
 
    void wake();
 
private:
    bool mInCallback;
    jthrowable mExceptionObj;
};

android_os_MessageQueue.cpp


NativeMessageQueue继承了MessageQueue,再来看看MessageQueue的声明:

class MessageQueue : public RefBase {
public:
    /* Gets the message queue's looper. */
    inline sp<looper> getLooper() const {
        return mLooper;
    }
 
    /* Checks whether the JNI environment has a pending exception.
     *
     * If an exception occurred, logs it together with the specified message,
     * and calls raiseException() to ensure the exception will be raised when
     * the callback returns, clears the pending exception from the environment,
     * then returns true.
     *
     * If no exception occurred, returns false.
     */
    bool raiseAndClearException(JNIEnv* env, const char* msg);
 
    /* Raises an exception from within a callback function.
     * The exception will be rethrown when control returns to the message queue which
     * will typically cause the application to crash.
     *
     * This message can only be called from within a callback function.  If it is called
     * at any other time, the process will simply be killed.
     *
     * Does nothing if exception is NULL.
     *
     * (This method does not take ownership of the exception object reference.
     * The caller is responsible for releasing its reference when it is done.)
     */
    virtual void raiseException(JNIEnv* env, const char* msg, jthrowable exceptionObj) = 0;
 
protected:
    MessageQueue();
    virtual ~MessageQueue();
 
protected:
    sp<looper> mLooper;
};</looper></looper>

android_os_MessageQueue.h


现在看看NativeMessageQueue的构造函数:

NativeMessageQueue::NativeMessageQueue() : mInCallback(false), mExceptionObj(NULL) {
    mLooper = Looper::getForThread();
    if (mLooper == NULL) {
        mLooper = new Looper(false);
        Looper::setForThread(mLooper);
    }
}

android_os_MessageQueue.cpp


NativeMessageQueue的构造函数又调用了Looper::getForThread(),Looper::Looper()和Looper::setThread(),其中getForThread和setForThread都是静态函数:

sp<looper> Looper::getForThread() {
    int result = pthread_once(& gTLSOnce, initTLSKey);
    LOG_ALWAYS_FATAL_IF(result != 0, "pthread_once failed");
 
    return (Looper*)pthread_getspecific(gTLSKey);
}</looper>

Looper.cpp


这段代码中,在第一次执行pthread_once时将调用initTLSKey。

void Looper::initTLSKey() {
    int result = pthread_key_create(& gTLSKey, threadDestructor);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not allocate TLS key.");
}

Looper.cpp

void Looper::threadDestructor(void *st) {
    Looper* const self = static_cast<looper*>(st);
    if (self != NULL) {
        self->decStrong((void*)threadDestructor);
    }
}</looper*>

Looper.cpp

void Looper::setForThread(const sp<looper>& looper) {
    sp<looper> old = getForThread(); // also has side-effect of initializing TLS
 
    if (looper != NULL) {
        looper->incStrong((void*)threadDestructor);
    }
 
    pthread_setspecific(gTLSKey, looper.get());
 
    if (old != NULL) {
        old->decStrong((void*)threadDestructor);
    }
}</looper></looper>

Looper.cpp


Looper::setForThread和getForThread中分别使用了pthread_setspecific,pthread_getsepcific,pthread_key_create,实现了线程私有的looper引用,这和Java层Looper类似。


Looper的构造函数如下:

Looper::Looper(bool allowNonCallbacks) :
        mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
        mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
    int wakeFds[2];
    int result = pipe(wakeFds);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe.  errno=%d", errno);
 
    mWakeReadPipeFd = wakeFds[0];
    mWakeWritePipeFd = wakeFds[1];
 
    result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake read pipe non-blocking.  errno=%d",
            errno);
 
    result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not make wake write pipe non-blocking.  errno=%d",
            errno);
 
    mIdling = false;
 
    // Allocate the epoll instance and register the wake pipe.
    mEpollFd = epoll_create(EPOLL_SIZE_HINT); // 用epoll实现IO多路复用,EPOLL_SIZE_HINT定义为8
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance.  errno=%d", errno);
 
    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeReadPipeFd;
    result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem); // 将Wake管道的读端添加到mEpollFd上
    LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake read pipe to epoll instance.  errno=%d",
            errno);
}

Looper.cpp
从Looper的构造函数可以看到,Looper的Wake是由管道+epoll实现的,且管道的两端fd都被设置为NONBLOCK的,并通过epoll实现IO多路复用。Looper的数据成员(data member)声明如下:

struct Request {
        int fd;
        int ident;
        sp<loopercallback> callback;
        void* data;
    };
 
    struct Response {
        int events;
        Request request;
    };
 
    struct MessageEnvelope {
        MessageEnvelope() : uptime(0) { }
 
        MessageEnvelope(nsecs_t uptime, const sp<messagehandler> handler,
                const Message& message) : uptime(uptime), handler(handler), message(message) {
        }
 
        nsecs_t uptime;
        sp<messagehandler> handler;
        Message message;
    };
 
    const bool mAllowNonCallbacks; // immutable
 
    int mWakeReadPipeFd;  // immutable
    int mWakeWritePipeFd; // immutable
    Mutex mLock;
 
    Vector<messageenvelope> mMessageEnvelopes; // guarded by mLock
    bool mSendingMessage; // guarded by mLock
 
    // Whether we are currently waiting for work.  Not protected by a lock,
    // any use of it is racy anyway.
    volatile bool mIdling;
 
    int mEpollFd; // immutable
 
    // Locked list of file descriptor monitoring requests.
    KeyedVector<int, request=""> mRequests;  // guarded by mLock
 
    // This state is only used privately by pollOnce and does not require a lock since
    // it runs on a single thread.
    Vector<response> mResponses;
    size_t mResponseIndex;
    nsecs_t mNextMessageUptime; // set to LLONG_MAX when none</response></int,>
    </messageenvelope></messagehandler></messagehandler></loopercallback>

Looper.h


Looper数据成员涉及的类型还有有:作为callback的LooperCallback,MessageHandler,以及Message:

class MessageHandler : public virtual RefBase {
protected:
    virtual ~MessageHandler() { }
 
public:
    /**
     * Handles a message.
     */
    virtual void handleMessage(const Message& message) = 0;
};

Looper.h

class LooperCallback : public virtual RefBase {
protected:
    virtual ~LooperCallback() { }
 
public:
    /**
     * Handles a poll event for the given file descriptor.
     * It is given the file descriptor it is associated with,
     * a bitmask of the poll events that were triggered (typically ALOOPER_EVENT_INPUT),
     * and the data pointer that was originally supplied.
     *
     * Implementations should return 1 to continue receiving callbacks, or 0
     * to have this file descriptor and callback unregistered from the looper.
     */
    virtual int handleEvent(int fd, int events, void* data) = 0;
};

Looper.h

struct Message {
    Message() : what(0) { }
    Message(int what) : what(what) { }
 
    /* The message type. (interpretation is left up to the handler) */
    int what;
};

Looper.h


至此,android_os_MessageQueue_nativeInit分析完毕。


nativeWake

接下来看看android_os_MessageQueue_nativeWake和android_os_MessageQueue_nativePollOnce。

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jint ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<nativemessagequeue*>(ptr);
    return nativeMessageQueue->wake();
}</nativemessagequeue*>

android_os_MessageQueue.cpp


android_os_MessageQueue_nativeWake调用了NativeMessageQueue::wake:

void NativeMessageQueue::wake() {
    mLooper->wake();
}

android_os_MessageQueue.cpp
NativeMessageQueue::wake直接将工作转交给了Looper::wake:

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif
 
    ssize_t nWrite;
    do {
        nWrite = write(mWakeWritePipeFd, "W", 1); // 向pipe的写段写入一个字节
    } while (nWrite == -1 && errno == EINTR);
 
    if (nWrite != 1) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal, errno=%d", errno);
        }
    }
}

Looper.cpp
可以看到nativeWake非常简单,只是向pipe上写一个字节。但这是如何唤醒等待的线程的呢?猜想:等待线程必然通过epoll_wait等在mEpollFd上,稍后将得到验证。



nativePollOnce

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jclass clazz,
        jint ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<nativemessagequeue*>(ptr);
    nativeMessageQueue->pollOnce(env, timeoutMillis); // 调用NativeMessageQueue::pollOnce()
}</nativemessagequeue*>

android_os_MessageQueue.cpp


android_os_MessageQueue_nativeWake调用了NativeMessageQueue::pollOnce:

void NativeMessageQueue::pollOnce(JNIEnv* env, int timeoutMillis) {
    mInCallback = true;
    mLooper->pollOnce(timeoutMillis);
    mInCallback = false;
    if (mExceptionObj) {
        env->Throw(mExceptionObj);
        env->DeleteLocalRef(mExceptionObj);
        mExceptionObj = NULL;
    }
}

android_os_MessageQueue.cpp

NativeMessageQueue::pollOnce调用了Looper::pollOnce:

inline int pollOnce(int timeoutMillis) {
    return pollOnce(timeoutMillis, NULL, NULL, NULL);
}

Looper.h


Looper::pollOnce(int)调用了另一版本的Looper::pollOnce:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        while (mResponseIndex < mResponses.size()) {
            const Response& response = mResponses.itemAt(mResponseIndex++); // 取出一个response
            int ident = response.request.ident;
            if (ident >= 0) {
                int fd = response.request.fd;
                int events = response.events;
                void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE
                ALOGD("%p ~ pollOnce - returning signalled identifier %d: "
                        "fd=%d, events=0x%x, data=%p",
                        this, ident, fd, events, data);
#endif
                if (outFd != NULL) *outFd = fd;
                if (outEvents != NULL) *outEvents = events;
                if (outData != NULL) *outData = data;
                return ident;
            }
        }
 
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }
 
        result = pollInner(timeoutMillis);
    }
}

Looper.cpp


pollOnce的for(;;)循环里先查看是否还有没有取出的response,若有,取出一个立即返回;否则,调用Looper::pollInner,poll出一个IO事件(wake通知,后面能够看到):

int Looper::pollInner(int timeoutMillis) {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - waiting: timeoutMillis=%d", this, timeoutMillis);
#endif
 
    // Adjust the timeout based on when the next message is due.
    if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
        if (messageTimeoutMillis >= 0
                && (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
            timeoutMillis = messageTimeoutMillis;
        }
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - next message in %lldns, adjusted timeout: timeoutMillis=%d",
                this, mNextMessageUptime - now, timeoutMillis);
#endif
    }
 
    // Poll.
    int result = ALOOPER_POLL_WAKE;
    mResponses.clear();
    mResponseIndex = 0;
 
    // We are about to idle.
    mIdling = true;
 
    struct epoll_event eventItems[EPOLL_MAX_EVENTS];
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // 关键!等待wake通知
 
    // No longer idling.
    mIdling = false;
 
    // Acquire lock.
    mLock.lock();
 
    // Check for poll error.
    if (eventCount < 0) {
        if (errno == EINTR) {
            goto Done;
        }
        ALOGW("Poll failed with an unexpected error, errno=%d", errno);
        result = ALOOPER_POLL_ERROR;
        goto Done;
    }
 
    // Check for poll timeout.
    if (eventCount == 0) {
#if DEBUG_POLL_AND_WAKE
        ALOGD("%p ~ pollOnce - timeout", this);
#endif
        result = ALOOPER_POLL_TIMEOUT;
        goto Done;
    }
 
    // Handle all events.
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ pollOnce - handling events from %d fds", this, eventCount);
#endif
 
    for (int i = 0; i < eventCount; i++) { // 处理所有事件
        int fd = eventItems[i].data.fd;
        uint32_t epollEvents = eventItems[i].events;
        if (fd == mWakeReadPipeFd) { 
            if (epollEvents & EPOLLIN) {
                awoken(); // 调用Looper::awoken(),执行实际的wake通知
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
            }
        } else {
            ssize_t requestIndex = mRequests.indexOfKey(fd);
            if (requestIndex >= 0) {
                int events = 0;
                if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
                if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
                if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
                if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
                pushResponse(events, mRequests.valueAt(requestIndex)); // push到mRequest上
            } else {
                ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
                        "no longer registered.", epollEvents, fd);
            }
        }
    }
Done: ;
 
    // Invoke pending message callbacks.调用等待的消息回调
    mNextMessageUptime = LLONG_MAX;
    while (mMessageEnvelopes.size() != 0) {
        nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
        const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
        if (messageEnvelope.uptime <= now) {
            // Remove the envelope from the list.
            // We keep a strong reference to the handler until the call to handleMessage
            // finishes.  Then we drop it so that the handler can be deleted *before*
            // we reacquire our lock.
            { // obtain handler
                sp<messagehandler> handler = messageEnvelope.handler;
                Message message = messageEnvelope.message;
                mMessageEnvelopes.removeAt(0);
                mSendingMessage = true;
                mLock.unlock();
 
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
                ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d",
                        this, handler.get(), message.what);
#endif
                handler->handleMessage(message); // 调用Message回调(MessageHandler)
            } // release handler
 
            mLock.lock();
            mSendingMessage = false;
            result = ALOOPER_POLL_CALLBACK;
        } else {
            // The last message left at the head of the queue determines the next wakeup time.
            mNextMessageUptime = messageEnvelope.uptime;
            break;
        }
    }
 
    // Release lock.
    mLock.unlock();
 
    // Invoke all response callbacks.调用所有响应回调
    for (size_t i = 0; i < mResponses.size(); i++) {
        Response& response = mResponses.editItemAt(i);
        if (response.request.ident == ALOOPER_POLL_CALLBACK) {
            int fd = response.request.fd;
            int events = response.events;
            void* data = response.request.data;
#if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS
            ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p",
                    this, response.request.callback.get(), fd, events, data);
#endif
            int callbackResult = response.request.callback->handleEvent(fd, events, data); // 调用事件回调(LooperCallback)
            if (callbackResult == 0) {
                removeFd(fd);
            }
            // Clear the callback reference in the response structure promptly because we
            // will not clear the response vector itself until the next poll.
            response.request.callback.clear();
            result = ALOOPER_POLL_CALLBACK;
        }
    }
    return result;
}</messagehandler>

Looper.cpp

void Looper::awoken() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ awoken", this);
#endif
 
    char buffer[16];
    ssize_t nRead;
    do {
        nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer)); // 读到临时的buffer,
    } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}

Looper.cpp


Looper::awoken的read从mWakeReadFd上读出的消息被放在一个临时的buffer上,这再次表明了这个pipe之作唤醒通知之用,并不关心实际内容。


nativeIsIdling 和 nativeDestroy

剩下的两个native方法的实现都非常简单,先看nativeIdling:

static jboolean android_os_MessageQueue_nativeIsIdling(JNIEnv* env, jclass clazz, jint ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<nativemessagequeue*>(ptr);
    return nativeMessageQueue->getLooper()->isIdling();
}</nativemessagequeue*>

android_os_MessageQueue.cpp



NativeMessageQueue::getLooper:

inline sp<looper> getLooper() const {
    return mLooper;
}</looper>

android_os_MessageQueue.cpp

bool Looper::isIdling() const {
    return mIdling;
}

Looper.cpp


再看nativeDestroy:

static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jint ptr) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<nativemessagequeue*>(ptr);
    nativeMessageQueue->decStrong(env);
}</nativemessagequeue*>

android_os_MessageQueue.cpp


nativeDestroy将nativeMessageQueue的强引用减1,引用计数减为0时,对象会自动被析构并回收。


小结

隐藏在nativePollOnce和nativeWake背后起着重要作用的其实是pipe。nativeWake向pipe的写端写一个字节,通知前台线程“有消息来了”。


总结

后台线程使用Handler更新UI的本质上是“生产者消费者问题”。后台线程扮演生产者,生产消息(Message),并放到消息队列上;前台线程扮演消费者,从消息队列上取消息,并处理(消费)它。

在这个过程中Handler扮演了两个角色:

消息队列的窗口,后台线程通过Handler.sendMessage()向消息队列放消息;处理消息的回调,前台线程通过Handler.handleMessage()处理从队列上取下来的消息;

引申

本文开头所给的两个Demo都是“单生产者单消费者问题”。

这个问题中需要指出的是,消费者必然唯一。因为每个线程最多只能只有一个Looper(通过Looper.prepare创建),而MessageQueue是由Looper的构造方法创建的,所以每个Looper对应一个MessageQueue;所以不可能有多个消费者线程共享一个MessageQueue。

但生产者可以不必唯一,比如本文开头的Demo1,按下Button之后,会创建一个后台线程,这个线程每个1秒更新一次TextView,更新10次后结束。当你点下Button后不到10秒(比如5秒)时,再次点下Button,此时又创建了一个后台线程;这时两个后台线程都是生产者。感兴趣的朋友可以自己试试,看看实际运行的效果。

pipe是只有两个端的结构,多生产者时,有多个线程向写端write,但始终只有一个线程从读端read。所以,nativePollOnce可以实现为阻塞的,即pipe的读端mWakeReadPipeFd可以不设为NONBLOCK(当然也就不需要要用epoll了)。但由于可能存在多个生产者,所以pipe的写端设为NONBLOCK还是很有必要的。




转载自:https://www.2cto.com/kf/201408/329543.html

Home