Chin的博客

程序的世界多奇妙

Netty事件模型分析

netty中以channel来表示每个连接,用handler来做事件的处理,pipline则是将handler组成了双向链表。事件借助pipline在链表中传播,pipline的整个结构如下:

其中的context指的是ChannelHandlerContext,它维护了channel、hander的引用。

那事件是怎么在pipline中传播的呢?是从头结点开始,还是从尾节点开始呢?又是从哪里出发的呢?

我们从读的事件着手分析,先看事件的源头是哪里。

netty里有不同的io编程模型实现,以Nio为例,对io事件的处理是在NioEventLoop里做的,事件的注册,是下面的这个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

我们看到,不同的事件调用unsafe的不同方法,netty对底层socket的操作都是通过unsafe来做的。而unsafe主要由两种不同的实现NioMessageUnsafe和NioByteUnsafe。NioServerSocketChannel使用的是NioMessageUnsafe来做socket操作,NioSocketChannel使用NioByteUnsafe来做soket操作。以服务端的NioMessageUnsafe为例来看下read()方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            if (!config.isAutoRead() && !isReadPending()) {
                // ChannelConfig.setAutoRead(false) was called in the meantime
                removeReadOp();
                return;
            }

            final int maxMessagesPerRead = config.getMaxMessagesPerRead();
            final ChannelPipeline pipeline = pipeline();
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    for (;;) {
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }

                        // stop reading and remove op
                        if (!config.isAutoRead()) {
                            break;
                        }

                        if (readBuf.size() >= maxMessagesPerRead) {
                            break;
                        }
                    }
                } catch (Throwable t) {
                    exception = t;
                }
                setReadPending(false);
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    pipeline.fireChannelRead(readBuf.get(i));
                }

                readBuf.clear();
                pipeline.fireChannelReadComplete();

                if (exception != null) {
                    if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
                        // ServerChannel should not be closed even on IOException because it can often continue
                        // accepting incoming connections. (e.g. too many open files)
                        closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
                    }

                    pipeline.fireExceptionCaught(exception);
                }

                if (closed) {
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!config.isAutoRead() && !isReadPending()) {
                    removeReadOp();
                }
            }
        }

关键在读数据的时候的那个for循环,调用pipline.fireChannelRead()这个是读事件的触发源头,读完成的触发也在这个方法里触发 pipeline.fireChannelReadComplete()。

下面来分析事件是如何在pipline中传播的,先来看pipline的fireChannelRead()方法代码:

1
2
3
4
5
    @Override
    public ChannelPipeline fireChannelRead(Object msg) {
        head.fireChannelRead(msg);
        return this;
    }

由此可见读事件是从head开始触发的(所有的inbound事件都是从head开始触发的),接着分析head节点的fireChannelRead,head节点是AbstractChannelHandlerContext的一个实例,具体实现在抽象类已经做了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        final AbstractChannelHandlerContext next = findContextInbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(msg);
        } else {
            executor.execute(new OneTimeTask() {
                @Override
                public void run() {
                    next.invokeChannelRead(msg);
                }
            });
        }
        return this;
    }

findContextInbound()回从pipline的handlerContext链表里获取一个inboundHander来调用invokeChannelRead(),findContextInbound()的逻辑如下:

1
2
3
4
5
6
7
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

从代码可以看出,它会从头节点开始直到找到InboundHandler节点,接着调用节点的invokeChannelRead()方法:

1
2
3
4
5
6
7
    private void invokeChannelRead(Object msg) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
     }

AbstractChannelHandlerContext的invokeChannelRead方法实际上是调用这个context里的handler的channelRead()方法。我们知道ChannelInboundHandlerAdapter的channelRead方法代码如下:

1
2
3
4
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws    Exception {
        ctx.fireChannelRead(msg);
    }

它只是接着调用AbstractChannelHandlerContext的fireChannelRead(),fireChannelRead()又会从当前节点开始寻找下一个inbound context,这样事件就传递下去了。

Netty初步分析

网络通讯是分布式架构的基础,而netty作为网络通讯的基础库,被广泛应用在各种中间件中。深入分析netty很有必要。

从一个demo来了解netty,先看server端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

 public class TcpServer {

    private int port;

    public TcpServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8), new StringDecoder(CharsetUtil.UTF_8)).
                                    addLast(new ServerHandler(SessionManager.getManager()));
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .option(ChannelOption.TCP_NODELAY, true);

            //启动推送消息线程
            Thread t = new Thread(new PushRun(new PushMessage()));
            t.start();
            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    class PushRun implements Runnable {

        private PushMessage pushMessage;

        public PushRun(PushMessage pushMessage) {
            this.pushMessage = pushMessage;
        }

        public void run() {
            int i = 0;
            while (true) {
                i++;
                pushMessage.sendMsg("send msg:" + i, 34445L);
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 4199;
        }
        new TcpServer(port).run();
    }
}

我们看到先是创建了两个EventLoopGroup,eventLoopGroup是什么,先姑且理解为线程组的概念,后面的文章再详细分析netty的线程模型。然后创建ServerBootStrap,设置上面的两个eventGroup,再就是设置handler,channelInit的时候加入StringEncoder,StringDecoder,业务逻辑处理ServerHandler,再就是一些参数的设置,是否启用NODELAY等。

服务端执行b.bind()之后就开始监听客户端的连接,具体的业务逻辑的处理在ServerHandler里做。我们看下ServerHandler的代码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public class ServerHandler extends ChannelInboundHandlerAdapter {

    private SessionManager sessionManager;

    public ServerHandler(SessionManager sessionManager) {
        super();
        this.sessionManager = sessionManager;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("receive from client:" + msg);
        TcpProtocol tcpProtocol = JSON.parseObject((String) msg, TcpProtocol.class);
        sessionManager.addSession(String.valueOf(tcpProtocol.getUid()), ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
        System.out.println("channel registered....");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("channel active.....");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        cause.printStackTrace();
        ctx.close();
    }
}

ServerHandler继承自ChannelInboundHandlerAdapter,覆盖了一些方法,channelRegistered、channelActive、channelRead、channelInactive,这些方法分别在channel生命周期的不同阶段调用,由此看来,netty是事件编程模型的。

接着看客户端的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
ublic class TcpClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap b = new Bootstrap(); // (1)
            b.group(workerGroup); // (2)
            b.channel(NioSocketChannel.class); // (3)
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8),
                            new StringDecoder(CharsetUtil.UTF_8)).addLast(new ClientHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (5)
            TcpProtocol tcpProtocol = new TcpProtocol();
            tcpProtocol.setUid(34445L);
            f.channel().writeAndFlush(JSON.toJSONString(tcpProtocol));
            synchronized (TcpClient.class) {
                while (true) {
                    TcpClient.class.wait();
                }
            }
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

客户端因为不需要处理连接的监听,所以只创建了一个EventLoopGroup。同样具体的业务处理的逻辑放在Handler里处理,ClientHandler代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package netty; /**
 * Created by Administrator on 2016/7/13.
 */

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println(msg);
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
}

覆盖channelRead方法只是输出接受到的消息。

我们看到,不管是Server还是Client,netty都是基于事件来做业务的处理,那事件是传播开来的,事件的触发源在哪里呢?下一篇文章详细分析netty的事件传播。

jdk源码阅读之ThreadLocal

并发编程中,另一个常用的工具是Threadlocal.今天来一探其神秘面纱。 我们最常用的是Threadlocal的set方法,下面从set方法的源码入手:

1
2
3
4
5
6
7
8
9
10
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocal.ThreadLocalMap map = this.getMap(t);
    if(map != null) {
        map.set(this, value);
    } else {
        this.createMap(t, value);
    }

}

从上面的代码来看,先回调用getMap判断ThreadLocalMap是否存在,getMap的代码如下:

1
2
3
  ThreadLocal.ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }

它实际上就是Thread对象的一个成员变量。再回到set方法,如果当前Thread对象的threadLocals没有设置,则会调用createMap创建新的ThreadlocalMap。

1
2
  void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocal.ThreadLocalMap(this, firstValue);

将当前ThreadLocal对象和传给ThreadLocalMap。

下面来看看ThreadLocalMap的构造函数

1
2
3
4
5
6
7
    ThreadLocalMap(ThreadLocal firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }

ThreadLocalMap实际上只是一个数组,第一个entry的放入位置是根据firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1)来计算的。ThreadLocal的threadLocalHashCode是0x61c88647累加得到的,而0x61c88647是黄金比例Math.sqrt(5) - 1左移31位得到。

再回到set方法,如果Thread的threadlocal已存在,则直接调用ThreadLocalMap的set,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void set(ThreadLocal key, Object value) {

            // We don't use a fast path as with get() because it is at
            // least as common to use set() to create new entries as
            // it is to replace existing ones, in which case, a fast
            // path would fail more often than not.

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }

                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

set方法会先判断map中是否有同一个ThreadLocal,如果有就使用当前的value覆盖调原来的value。如果原来的threadlocal为null(被回收掉了),则直接使用当前的key/value替换掉原来的threadlocal。否则,找到一个空的位置把当前的Entry填进去。

下面看看get方法的源码

1
2
3
4
5
6
7
8
9
10
public T get() {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null) {
        ThreadLocalMap.Entry e = map.getEntry(this);
        if (e != null)
            return (T)e.value;
    }
    return setInitialValue();
}

get方法很简单就是从Thread的ThreadLocalMap中取,如果没有取到,则初始化数据到map中。

ThreadLocal的remove是直接调用ThreadLocalMap的remove方法,源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    private void remove(ThreadLocal key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                e.clear();
                expungeStaleEntry(i);
                return;
            }
        }

从代码来看,如果找到则会调用Entry的clear,接着调用expungeStaleEntry移除entry。

通读源码知道,ThreadLocalMap的设计思想跟HashMap是不一样的,HashMap实用链表来解决冲突,而ThreadLocalMap实用是开放地址的算法来解决冲突,同时在set的时候会移除stale entry,来保证数组不会太慢,导致多次rehash。

jdk源码阅读之ReentrantReadWriteLock

了解完reentrantlock,再来看看reentrantreadwritelock。在看reentrantreadwritelock源码之前,得了解它的特点。我们都知道读锁跟写锁是互斥的,多个读线程可以同时进行读操作,就是说对锁对读线程来说是共享锁。写锁是独占锁,写锁之间也是互斥的。了解这些基本特性之后,再来看源码。

1
2
3
4
5
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

reentrantreadwritelock是通过上面的代码初始化的,从上面来看读锁跟写锁公用同一个同步器。因为一个同步器只有一个state变量,我们知道同步器使用state来标识线程的不同状态。那么,reentrantreadwritelock是怎么来区分读锁、写锁的,后面看过源码后知道,它是用state的高16位表示读锁,低16位表示写锁。

先来看读锁的获取,读锁获取首先是下面的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
protected final int tryAcquireShared(int unused) {
            /*
             * Walkthrough:
             * 1. If write lock held by another thread, fail.
             * 2. Otherwise, this thread is eligible for
             *    lock wrt state, so ask if it should block
             *    because of queue policy. If not, try
             *    to grant by CASing state and updating count.
             *    Note that step does not check for reentrant
             *    acquires, which is postponed to full version
             *    to avoid having to check hold count in
             *    the more typical non-reentrant case.
             * 3. If step 2 fails either because thread
             *    apparently not eligible or CAS fails or count
             *    saturated, chain to version with full retry loop.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != current.getId())
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

从上面的代码来看,首先如果写锁已被其他线程持有,则直接阻塞当前线程。如果进入if分支,分三种情况:没有读线程持有锁,则直接设置firstReader、firstReaderHoldCount;否则,如果当前线程是第一个读线程,则firstReaderHoldCount+1;否则,创建一个HolderCounter对象来记录线程对读锁的持有次数,并将该HolderCounter对象放入ThreadLocal中。其他的情况(CAS失败或readerShouldBlock返回true),则进入fullTryAcquireShared。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
final int fullTryAcquireShared(Thread current) {
            /*
             * This code is in part redundant with that in
             * tryAcquireShared but is simpler overall by not
             * complicating tryAcquireShared with interactions between
             * retries and lazily reading hold counts.
             */
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                    // else we hold the exclusive lock; blocking here
                    // would cause deadlock.
                } else if (readerShouldBlock()) {
                    // Make sure we're not acquiring read lock reentrantly
                    if (firstReader == current) {
                        // assert firstReaderHoldCount > 0;
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != current.getId()) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != current.getId())
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh; // cache for release
                    }
                    return 1;
                }
            }
        }

fullTryAcquireShared首先会判断若其他线程持有写锁,则会阻塞。若readerShouldBlock返回true,则进入if分支,如果当前线程不是第一个读线程,同时当前线程是之前没有持有读锁,则会从ThreadLocal中移除HoldCounter,最后如果count仍是-1,则直接返回-1,阻塞当前线程。接着下来的代码就是对tryAcquireShared CAS state失败的处理,逻辑类似。

下面来看读锁释放的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != current.getId())
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

首先,如果当前线程是第一个读线程,先判断读锁重入次数是否为1,如果为1,则直接将firstReader置null;否则,firstReaderHoldCount-1;否则,如果当前线程不是第一个读线程,则对持锁次数减1.下面的for循环是处理CAS state。

下面接着来看写锁的获取锁的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

首先,从代码来看,如果是有读线程持有读锁或其他写线程持有锁,则当前写线程会阻塞,如果是当前线程持有写锁,则将持锁次数加1。如果writerShouldBlock方法返回true或CAS state失败,则阻塞当前写线程;否则,设置当前线程为写锁持有者。

写锁的释放过程:

1
2
3
4
5
6
7
8
9
10
  protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

释放锁的代码很简单,因为是只有一个线程持有锁,所有是单线程的,sate的变更没有使用CAS。

jdk源码阅读之ReentrantLock

接着来看并发工具类库另外一个常用的工具ReentrantLock。Reentrantlock也有公平策略和非公平策略,先来看非公平策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class NonfairSync extends ReentrantLock.Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        NonfairSync() {
        }

        final void lock() {
            if(this.compareAndSetState(0, 1)) {
                this.setExclusiveOwnerThread(Thread.currentThread());
            } else {
                this.acquire(1);
            }

        }

        protected final boolean tryAcquire(int acquires) {
            return this.nonfairTryAcquire(acquires);
        }
    }

非公平策略优先尝试cas state,如果没成功,则进入AQS的aquire。先来看Sync的nonfairTryAcquire。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    final boolean nonfairTryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = this.getState();
            if(c == 0) {
                if(this.compareAndSetState(0, acquires)) {
                    this.setExclusiveOwnerThread(current);
                    return true;
                }
            } else if(current == this.getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if(nextc < 0) {
                    throw new Error("Maximum lock count exceeded");
                }

                this.setState(nextc);
                return true;
            }

            return false;
        }

这个方法首先判断是否线程持有锁,如果没有则获取锁,并把线程所有者设置为当前线程。否则,判断当前线程是否是持有锁,如果是,则state+1,返回true。这里说明ReentrantLock是可重入的。如果别的线程已经获取锁,则返回false。如果返回false,这个时候,我们再来看acquire方法:

1
2
3
4
5
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

这个时候进入acquireQueued。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
 final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

从上面的代码来看,这个时候线程会被park住,至此获取锁的过程分析完毕。下面,来看释放锁的过程。首先unlock调用的是release方法

1
2
3
4
5
6
7
8
9
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

先来看tryRelease

1
2
3
4
5
6
7
8
9
10
11
12
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

从上面的代码来看,当state变为0,tryRelease才会返回true,release才会unpark第二个线程节点。这个时候回到acquireQueued方法,第二节点被unpark之后。仍在for循环里,这个时候把如果tryRelease返回true,即等待队列的第二个节点获得锁,进入if分支,把头结点设置为当前节点,并直接返回。等待队列的其他节点,依次类推。

下面来说说公平策略好非公平策略的区别。跟Semaphore一样,公平策略会判断线程等待队列中是否有线程再等待,如果有则将当前线程放入等待队列,直到被被唤醒。但非公平策略是,不过当前是否有线程再等待,都尝试去获取锁,这样当前线程可能会优先获取到锁,所以是不公平的。

另外,condition也是并发场景中用的较多的工具之一。我们来看看condition是怎么做的。首先看await方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

首先,addConditionWaiter再condition队列为空的时候,构建condition队列,若condition队列不为空,则把当前线程节点添加到队尾。后面while循环里判断,当前节点是否已经已到线程同步队列。如果没有移动到线程同步队列,则park当前线程节点,至此线程则被阻塞住了,直到被唤醒,才会往下走。

1
2
3
4
5
    private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

如果线程没有被中断,则会继续循环。再次判断当前节点是否在线程同步队列,如果在同步队列,则会结束循环。从后面代码知道,线程会在signal的时候移动到同步队列。接着,会调用acquireQueued判断同步队列能否获得锁,不能则会被阻塞住。否则,会继续往下走。

await的整个流程就如上面分析,下面来看看sigal的流程。

1
2
3
4
5
6
7
    public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

简单的判断当前线程是否持有锁,如果是则往下走。

1
2
3
4
5
6
7
8
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

从上面的代码来看,doSignal从第一个节点开始调用transferForSignal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先,尝试将node的state由-2更新为0,如果更新不成功,则继续循环,尝试唤醒下个节点;如果,更新成功,则将当前节点添加到线程同步队列。然后,把节点状态更新为-1,返回true,这样doSignal就做完了。

ReentrantLock其实就是实现了线程间最基本的互斥和通信机制。

Jdk源码阅读之semaphore

前面看过AQS的源码后,对AQS的原理有了叫深入的理解,现在可以看看基于AQS的各种并发工具了,Semaphore是其中使用最普遍的一个。它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    /** All mechanics via AbstractQueuedSynchronizer subclass */
    private final Sync sync;

    /**
     * Synchronization implementation for semaphore.  Uses AQS state
     * to represent permits. Subclassed into fair and nonfair
     * versions.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

跟CountDownLatch一样,Semaphore也是使用组合的方式来使用AQS,内部定义了两个AQS之类,NonfairSync和FairSync。他们的区别是什么呢?稍后详解。使用下面的构造函数,创建的Semaphore对象是使用NonfairSync方式

1
2
3
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

NonfairSync类

1
2
3
4
5
6
7
8
9
10
11
  static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

Sync类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
   abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

通过nonfairTryAcquireShared方法知道,线程在未获取到许可的情况下,线程会被方法等待队列。这种情况下,如果有许可释放出来,另外的线程可能比等待队列的线程先获取到许可,所以是nonfair的。我们来看看释放许可的方法

1
2
3
4
5
6
7
8
9
10
       protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

释放许可也是简单for循环.我们知道之前CountDownLatch是会由前面的节点唤醒后面的节点,直到整个线程队列都被唤醒。那Semaphore如果也是这样,不就有问题么?那Semaphore是怎么样的呢?从上面的tryReleaseShared代码知道,每次cas成功,tryReleaseShared都会返回true,这样会"唤醒"第二个节点,从AQS的doAcquireSharedInterruptibly方法知道,第二个节点唤醒后,仍在for循环里,在tryAcquireShared返回大于零(即有可用的许可)之前,第二个节点仍会被park住。这里我觉得tryReleaseShared方法做得不够好,应该是在有许可的时候,再返回true。这样可以避免线程无意义的unpark、park

现在,我们来看看nonfair和fair的区别。下面是fair类的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
   static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

FairSync和NonfairSync的区别是在获取许可的地方,FairSync获取许可的时候,hasQueuedPredecessors使用这个方法判断阻塞队列中是否有线程在排队

1
2
3
4
5
6
7
8
9
10
    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

FairSync的做法跟NonfairSync的区别就是,在许可用完的情况下,FairSync是把线程阻塞住,放入等待队列,而NonfairSync的做法是,不管当前是否有线程在排队,NonfairSync始终重试去获取许可,这会导致当前线程可能会优先于等待队列中的线程获得许可。

LinkedHashMap源码分析

      继之前看完hashMap的源码之后,最近又看以了一下LinkedHashMap的源码。发现LinkedHash的代码很简单,他是HashMap的子类,自然具有 父类的所以特性,除此之外他也有自己的特点。详细来说,它包含两种数据结构:1、从父类HashMap继承来的数据链表结构(见HashMap源码分析这篇文章)。2、自己的双向循环链表结构。前一 种数据结构的由来显而易见,后一种数据结构是怎么实现的呢?这久要从LinkedMap的对象创建说起:
      LinkedHashMap最简单的构造函数是这样的:

1
2
3
4
    public LinkedHashMap() {
        super();
        accessOrder = false;
    }

看出跟双向链表又任何关系,关键是下面这个方法:

1
2
3
4
5
   @Override
    void init() {
        header = new Entry<>(-1, null, null, null);
        header.before = header.after = header;
    }

这个方法是对HashMap中的init的覆盖,在HashMap的构造方法中调用。注意这里的Entry类是LinkedHashMap继承HashMap.Entry类的实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
    private static class Entry<K,V> extends HashMap.Entry<K,V> {
        // These fields comprise the doubly linked list used for iteration.
        Entry<K,V> before, after;

        Entry(int hash, K key, V value, HashMap.Entry<K,V> next) {
            super(hash, key, value, next);
        }

        /**
         * Removes this entry from the linked list.
         */
        private void remove() {
            before.after = after;
            after.before = before;
        }

        /**
         * Inserts this entry before the specified existing entry in the list.
         */
        private void addBefore(Entry<K,V> existingEntry) {
            after  = existingEntry;
            before = existingEntry.before;
            before.after = this;
            after.before = this;
        }

        /**
         * This method is invoked by the superclass whenever the value
         * of a pre-existing entry is read by Map.get or modified by Map.set.
         * If the enclosing Map is access-ordered, it moves the entry
         * to the end of the list; otherwise, it does nothing.
         */
        void recordAccess(HashMap<K,V> m) {
            LinkedHashMap<K,V> lm = (LinkedHashMap<K,V>)m;
            if (lm.accessOrder) {
                lm.modCount++;
                remove();
                addBefore(lm.header);
            }
        }

        void recordRemoval(HashMap<K,V> m) {
            remove();
        }
    }

成员变量多了before和after,还增加了一个addBefore方法。双向链表的初始化就是init方法中这行代码:

1
   header.before = header.after = header;

即这个双向链表初始化完之后,只有一个header节点,header节点的key、value都为null。初始化完之后的结构是这样的:

再向LinkedHashMap添加一个元素之后的结构是这样的:


依次类推,最近添加的节点在最前面,紧跟在header节点之后。LinkedHashMap双向链表元素的添加主要是下面这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    void addEntry(int hash, K key, V value, int bucketIndex) {
        super.addEntry(hash, key, value, bucketIndex);

        // Remove eldest entry if instructed
        Entry<K,V> eldest = header.after;
        if (removeEldestEntry(eldest)) {
            removeEntryForKey(eldest.key);
        }
    }

    void createEntry(int hash, K key, V value, int bucketIndex) {
        HashMap.Entry<K,V> old = table[bucketIndex];
        Entry<K,V> e = new Entry<>(hash, key, value, old);
        table[bucketIndex] = e;
        e.addBefore(header);
        size++;
    }

LinkedHashMap的这两个方法也是对HashMap的方法的覆盖,是由HashMap的put方法引起的调用。
      LinkedHashMap的构造说完了,那它又是怎么迭代遍历元素的呢?同样LinkedHashMap实现了自己的迭代器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
    private abstract class LinkedHashIterator<T> implements Iterator<T> {
        Entry<K,V> nextEntry    = header.after;
        Entry<K,V> lastReturned = null;

        /**
         * The modCount value that the iterator believes that the backing
         * List should have.  If this expectation is violated, the iterator
         * has detected concurrent modification.
         */
        int expectedModCount = modCount;

        public boolean hasNext() {
            return nextEntry != header;
        }

        public void remove() {
            if (lastReturned == null)
                throw new IllegalStateException();
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();

            LinkedHashMap.this.remove(lastReturned.key);
            lastReturned = null;
            expectedModCount = modCount;
        }

        Entry<K,V> nextEntry() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            if (nextEntry == header)
                throw new NoSuchElementException();

            Entry<K,V> e = lastReturned = nextEntry;
            nextEntry = e.after;
            return e;
        }
    }

同样关键是nextEntry方法,故知LinkedHashMap是按照元素的放入顺序来迭代的,如果按before来迭代就是逆向迭代了。
LinkedHashMap作为一种特殊的HashMap,它也有随机访问的特性。看它的get方法代码:

1
2
3
4
5
6
7
    public V get(Object key) {
        Entry<K,V> e = (Entry<K,V>)getEntry(key);
        if (e == null)
            return null;
        e.recordAccess(this);
        return e.value;
    }

跟HashMap的大同小异,访问方式同HashMap(见HashMap源码分析)。这个recordAccess方法值得注意,源码:

1
2
3
4
5
6
7
8
    void recordAccess(HashMap<K,V> m) {
            LinkedHashMap<K,V> lm = (LinkedHashMap<K,V>)m;
            if (lm.accessOrder) {
                lm.modCount++;
                remove();
                addBefore(lm.header);
            }
        }

从代码来看,若构造LinkedHashMap的时候accessOrder=true,那么在每次调用get访问的元素都会放到链表的最前端,即把最后后一次get访问的元素放在最前面,这种情况下LinkedHashMap就不是按元素的放入顺序来存放的,而是最近访问的在最前面。

HashMap源码分析

     通读hashMap的源代码之后,发现他的数据结构是这样的。

     阅读完源码知道,hashmap是会把indexFor()方法获得的index相同的那些entity放在数组的同一个位置,用一个next引用指向下一个entity,即index相同的entity组成了一个链表。这样来说,如果index的重复越多,链表越长,随机操作的效率就越低。那么jdk是如何计算index的呢?jdk中indexFor()方法的源码是:

1
2
3
    static int indexFor(int h, int length) {
        return h & (length-1);
    }

其中leng是hashmap的大小,而h是key的hashCode经过hash()方法处理的结构,hash()方法源码:

1
2
3
4
5
6
7
    static int hash(int h) {
        // This function ensures that hashCodes that differ only by
        // constant multiples at each bit position have a bounded
        // number of collisions (approximately 8 at default load factor).
        h ^= (h >>> 20) ^ (h >>> 12);
        return h ^ (h >>> 7) ^ (h >>> 4);
    }

     都说遍历hashmap的时候不一定是按照元素放入的顺序来迭代的,这是怎么回事呢?来看hashmap迭代的关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
    private abstract class HashIterator<E> implements Iterator<E> {
        Entry<K,V> next;        // next entry to return
        int expectedModCount;   // For fast-fail
        int index;              // current slot
        Entry<K,V> current;     // current entry

        HashIterator() {
            expectedModCount = modCount;
            if (size > 0) { // advance to first entry
                Entry[] t = table;
                while (index < t.length && (next = t[index++]) == null)
                    ;
            }
        }

        public final boolean hasNext() {
            return next != null;
        }

        final Entry<K,V> nextEntry() {
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            Entry<K,V> e = next;
            if (e == null)
                throw new NoSuchElementException();

            if ((next = e.next) == null) {
                Entry[] t = table;
                while (index < t.length && (next = t[index++]) == null)
                    ;
            }
            current = e;
            return e;
        }

        public void remove() {
            if (current == null)
                throw new IllegalStateException();
            if (modCount != expectedModCount)
                throw new ConcurrentModificationException();
            Object k = current.key;
            current = null;
            HashMap.this.removeEntryForKey(k);
            expectedModCount = modCount;
        }
    }

     通关HashIterator的构造方法和nextEntry方法知:entrySet是按深度优先遍历的,即构造完之后next会指向第一个不为null的元素,迭代的时候nextEntry返回当前元素,在结束的时候 next指向下一个不为空的元素,没有元素,则next为null,依次往下遍历直到next为null。
     此外key和value的迭代hashmap都是以此类作为父类,写了自己的实现类。key的迭代类KeyIterator:

1
2
3
4
5
    private final class KeyIterator extends HashIterator<K> {
        public K next() {
            return nextEntry().getKey();
        }
    }

value的迭代类ValueIterator:

1
2
3
4
5
    private final class ValueIterator extends HashIterator<V> {
        public V next() {
            return nextEntry().value;
        }
    }

     由此看,jdk的代码的面向对象设计是相当出色的,以后项目的架构设计可以很好的参考jdk。到此,之前的大部分疑问都解决了。还有几个疑问的是, 其中一个就是,我们都知道hashmap在迭代的时候是不能修改的,那我们来看一下代码这是怎么回事呢?
     在hashmap的几个修改方法中都会判断modCount != expectedModCount,如果这两个值不等就会抛出ConcurrentModificationException。 HashIterator构造的时候expectedModCount = modCount;迭代的时候不会修改expctedModCount和modCount,但put、remove都会修改modCount,故在迭代的时候会导致抛出 ConcurrentModificationException。看过官方的文档注释,貌似是为了防止hashmap在多线程下被修改,但问题是这样hashmap在单线程下也不能修改了,我认为这是一个不好的设计。 我想多线程环境没有人会想到用hashmap不做任何同步处理,那样出问题的风险太大了。所以hashmap本身应该只是针对单线程环境的,那就没必要有这样的设计。

用redis实现分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
lock = 0
while lock != 1:
    timestamp = current Unix time + lock timeout + 1
    lock = SETNX lock.foo timestamp
    if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)):
        break;
    else:
        sleep(10ms)

do_job()

if now() < GET lock.foo:
    DEL lock.foo

Bootstrap常用组件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
   <!DOCTYPE html>
<html lang="en">
 <head>
     <meta charset="utf-8"/>
 <link id="bootstrap_221" rel="stylesheet" type="text/css" class="library" href="/js/sandbox/bootstrap-2.2.1/css/bootstrap.min.css">
 </head>
 <style>
  .pagination ul > li > input{
         vertical-align: top;
         -webkit-border-radius: 0;
          -moz-border-radius: 0;
               border-radius: 0;
         height:auto;
         *height: 20px;
         margin-bottom:0px;
         background-color: #fff;
         border-left-width: 0;
         width:40px;
         float:left;
         min-height: auto;
         *min-height: 20px;
     }
 </style>
 <body>
     <div class="pagination pagination-right">
         <ul>
             <li><a href="#"><i class="icon-fast-backward"></i></a></li>
             <li class="disabled"><a href="#">Previous</a></li>
             <li class="disabled"><a href="#">1</a></li>
             <li class="disabled"><a href="#">2</a></li>
             <li class="disabled"><a href="#">3</a></li>
             <li class="disabled"><a href="#">4</a></li>
             <li class="disabled"><a href="#">5</a></li>
             <li><a href="#">Next</a></li>
             <li><a href="#"><i class="icon-fast-forward"></i></a></li>
             <li><input type="text" class="input"></li>
             <li><a href="javascript:void();" onclick="">Go</a></li>
         </ul>
     </div>
 </body>
</html>

Tornado+sqlalchemy的事务管理

      之前在项目开发当中遇到tornado+sqlalchemy的事务问题,一直没有时间没去解决。现在空下来了,考虑怎么解决这个问题。

      众所周知,diango里有自己的middleware来做事务的管理,tornado是不是也能做自己的middelware呢来完成这件事情呢?经过研究,自己来实现tornado的middleware还是比较复杂的。不过网上有牛人写了一个,tmiddelware。除此以外,使用装饰器是实现也是事务管理一种比较简便的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
     def transactional(baseSession):
         def transaction(func):
             def __decorator(obj,*args,**kwargs):
                 session = baseSession()
                 try:
                     func(obj,session,*args,**kwargs)
                     session.commit()
                 except Exception,e:
                     session.rollback()
                 finally:
                     session.close()
             return __decorator

         return transaction

     @transactional(BaseSession)
     def get(self,session,*args,**kwargs):
         city = BaseCity(name="添加城市")
         org = BaseOrg(name="添加组织")
         session.add(city)
         session.add(org)
         self.write("添加成功")

AQS探究

总所周知,java concurrent包的工具类是构建在AbstractQueuedSynchronizer类上的基础上的,而这个类是Doug Lea大神基于CHL队列实现的同步器。这个强大的同步器是怎样实现的呢?我们来一探究竟。

因为AQS的代码比较难以理解,我们从concurrent包下的并发工具类着手开始研究。从最简单的CountDownLatch开始,首先看它的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
   public class CountDownLatch {
    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

CountDownlatch类定义了一个Sync类继承自AQS,实现的了AQS的tryAcquireShared和tryReleaseShared方法,share顾名思义是共享锁。首先从await方法入手:

1
2
3
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

await方法调用的AQS的acquireSharedInterruptibly

1
2
3
4
5
6
7
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

从这个方法看,await方法是可中断的,如果当前线程被中断,则直接向上抛InterruptedException。如果正常执行,则会调用tryAcquireShared方法,这个是在之类中实现的。现在回到CountDownLatch,看tryAcquireShared的实现:

1
2
3
   protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

很简单,如果state为0则返回1,否则,返回-1。state是构造函数里传进来的。我们都知道使用CountDownlatch时传进来的数字表示并发执行的线程数,由此联想state就是持有锁的线程数。从acquireSharedInterruptibly方法可以看到,当前state!=0,即并发任务线程还没执行完时,会进入doAcquireSharedInterruptibly:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

首先看addWaiter方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

也就是说在pred为null的时候会初始化队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

从上面代码看初始化之后的队列是这样的:

head只是指向一个空节点,这一点对于理解后面的代码很重要,再回到doAcquireSharedInterruptibly,p的前继节点就是head,所以会进入下面的if分支(至于为什么有这个if判断后面再详解),对于CountDownLatch,在并发任务还没完成的时候,tryAcquireShared返回值为-1,所以就不会往下走。直接进入shouldParkAfterFailedAcquire

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

pred就是head,初始化之后waitStatus=0,进入else分支,故head的waitStatus被更新为SIGNAL,再回到doAcquireSharedInterruptibly,这个时候如果线程没有被中断,那么会接着循环,再次进入shouldParkAfterFailedAcquire,这个是进入第一个if分支,返回true,那么就是进入parkAndCheckInterrupt,将当前线程阻塞住,这就是CountDownlatch调用await后阻塞住的原因。

从上面的分析可以知道,对于CountDownlatch,在并发任务还没结束的时候,如果另外一个线程B再调用await方法,那么当前线程会放到等待队列的最后面。第一个节点park住的时候,它的waitStatus还是0,所以这次,shouldParkAfterFailedAcquire会把第一个节点的waitStatus设置为SIGNAL,同时下次循环会park住线程B

AQS获取锁的过程已经了解清楚了,下面来看看AQS释放锁的过程。还是从CountDownLatch的countdown()方法入手。countdown()是直接调用AQS的releaseShared

1
2
3
4
5
6
7
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

从代码看,tryReleaseShared是在子类中实现的:

1
2
3
4
5
6
7
8
9
10
11
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

从tryReleaseShared方法代码来看,只有等所有并发任务执行完,tryReleaseShared才会返回true,才会执行doReleaseShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

如果head节点的waitStatus为SIGNAL,则先把head节点的status设置为0,然后进入unparkSuccessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

在通常情况下,s!=null并且s.waitStatus为SIGNAL,所以head节点的后继节点会被唤醒。就是说每次调用releaseShared只会唤醒等待队列中head节点之后的线程。

分析到这里,试想这个使用CountDownLatch场景,线程A和线程B,都调用await方法等待线程B、线程C完成任务。那么在线程B、线>程C完成任务的时候,主线程调用releaseShared进入doReleaseShared唤醒head节点之后的节点线程。因为原来的线程是在doAcquireSharedInterruptibly里的for循环最后park住,现在仍然回到该处,继续下次循环。这个时候会进入上面提到的if分支,进入setHeadAndPropagate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    private void setHeadAndPropagate(Node node, long propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

从代码来看,setHeadAndPropagate就是把当前当前head节点remove掉,设置当前线程节点为head节点(也就是第二个节点)。同时在共享锁的模式下,会调用doReleaseShared,唤醒当前节点的后继节点,这就是propagate的概念。同理后续节点又会再唤醒它后面的节点,直到整个队列都被唤醒。

至此,已基本了解AQS的工作原理的,为了加深印象,我们来看下面的线程队列的变化过程图。

线程thread1调用acquireSharedInterruptibly之后,线程队列如下图,同时thread1被park住

另外一个线程thead2再次调用acquireSharedInterruptibly之后,线程队列如下图,同时thread2被park住

这个时候,另一个线程触发releaseShared,线程队列如下图,同时thread1被unpark

thread1被unpark之后,会进入setHeadAndPropagate,setHead之后,线程队列如下图

thread1调用doReleaseShared唤醒thread2后,线程队列如下图

thread2 进入setHeadAndPropagate,setHead之后,线程队列如下图