static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; //表示当前的线程被取消; static final int CANCELLED = 1; //表示当前节点的后继节点包含的线程需要运行,也就是unpark; static final int SIGNAL = -1; //表示当前节点在等待condition,也就是在condition队列中; static final int CONDITION = -2; //表示当前场景下后续的acquireShared能够得以执行; static final int PROPAGATE = -3; //表示节点的状态。默认为0,表示当前节点在sync队列中,等待着获取锁。 //其它几个状态为:CANCELLED、SIGNAL、CONDITION、PROPAGATE volatile int waitStatus; //前驱节点 volatile Node prev; //后继节点 volatile Node next; //获取锁的线程 volatile Thread thread; //存储condition队列中的后继节点。 Node nextWaiter; }
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收; SIGNAL:表示这个结点的继任结点被阻塞了,所以自己被释放或取消的时候需要通知继任节点; CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞,它不会被作为同步节点直到其状态被置为0; PROPAGATE:使用在共享模式头结点有可能处于这种状态,表示锁的下一次获取可以无条件传播; 0:None of the above,新结点会处于这种状态。
应用之——公平锁
公平锁的获取锁
1 2 3 4
final void lock() { acquire(1); }
调用到了AQS的acquire方法:
1 2 3 4 5 6
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); // 自己给自己设置了一个中断标志,只有之前被中断过,前面才会返回true,这里相当于重新设置一个中断标志 }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread();//获取当前线程 int c = getState(); //获取父类AQS中的标志位 if (c == 0) { // 0意味着锁还在 if (!hasQueuedPredecessors() &&//如果前面没有已在队列中的线程! compareAndSetState(0, acquires)) { //修改一下状态位,注意:这里的acquires是在lock的时候传递来的,从上面的图中可以知道,这个值是写死的1 setExclusiveOwnerThread(current);//如果通过CAS操作将状态位更新成功则代表当前线程获取锁,因此,将当前线程设置到AQS的一个变量中,说明这个线程拿走了锁。 return true; } } else if (current == getExclusiveOwnerThread()) {//如果不为0意味着,锁已经被拿走了,但是,因为ReentrantLock是重入锁,是可以重复lock,unlock的,所以这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。 int nextc = c + acquires;//如果是的,累加在state字段上就可以了。 if (nextc < 0) // 重入次数太多,溢出了 throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
1 2 3 4 5 6 7 8 9 10 11
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
/** * Creates and enqueues node for current thread and given mode. *用当前线程去构造一个Node对象,mode是一个表示Node类型的字段,仅仅表示这个节点是独占的,还是共享的,或者说,AQS的这个队列中,哪些节点是独占的,哪些是共享的。 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) // 如果这一步不满足,都会return false /* * 前一节点已经设置了signal状态,释放锁后会通知后继节点 * 所以可以安全挂起 */ return true; if (ws > 0) { /* 判断如果前驱节点状态为CANCELLED,那就一直往前找,直到找到最近一个正常等待的状态 */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //放置在其后面 pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //如果前驱节点正常,则修改前驱节点状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ /* * 为什么不关心是否成功却还要设置呢? * * 注意这里的Node实际就是head * * 如果设置成功,即head.waitStatus=0,则可以让这时即将被阻塞的线程有机会再次调用tryAcquire获取锁。 * 也就是让shouldParkAfterFailedAcquire方法里的compareAndSetWaitStatus(pred, ws, Node.SIGNAL)执行失败返回false,这样就能再有机会再tryAcquire了 * * 如果设置失败,新跟随在head后面的线程被阻塞,但是没关系,下面的代码会立即将这个阻塞线程释放掉 */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,但仍是找离node最近的一个有效节点(也就是它的继承者) for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
应用之——非公平锁
其实, 它和公平锁的唯一区别就是获取锁的方式不同,一个是按前后顺序一次获取锁,一个是抢占式的获取锁
获取锁
1 2 3 4 5 6 7
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }