多线程小抄集(新编四)

多线程小抄集(新编四)

ConcurrentHashMap

ConcurrentHashMap是线程安全的HashMap,键值都不能为null。

JDK7的实现:内部采用分段锁来实现,默认初始容量为16(所以理论上这个时候最多可以同时支持 16 个线程并发写,只要它们的操作分别分布在不同的 Segment 上。这个值可以在初始化的时候设置为其他值,但是一旦初始化以后,它是不可以扩容的),装载因子为0.75f,分段16,每个段的HashEntry<K,V>[]大小为2。每次扩容为原来容量的2倍,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。在获取size操作的时候,先尝试2(RETRIES_BEFORE_LOCK)次通过不锁住Segment的方式统计各个Segment大小,如果统计的过程中,容器的count发生了变化,再采用加锁的方式来统计所有的Segment的大小。


JDK8的实现:ConcurrentHashMap中结构为:数组、链表和红黑树。当某个槽内的元素个数增加到8个且table容量大于或者等于64时,由链表转为红黑树;当某个槽内的元素个数减少到6个时,由红黑树重新转回链表。链表转红黑树的过程,就是把给定顺序的元素构造成一颗红黑树的过程。需要注意的是,当table容量小于64时,只会扩容,并不会把链表转为红黑树。在转化过程中,使用同步块锁住当前槽的首元素,防止其他进程对当前槽进行增删改操作,转化完成后利用CAS替换原有的链表。


无论是JDK7还是JDK8,ConcurrentHashMap的size方法都只能返回一个大概数量,无法做到100%的精确,因为已经统计过的槽在size返回最终结果前有可能又出现了变化,从而导致返回大小与实际大小存在些许差异。

JDK8中size方法的实现比JDK7要简单很多:

public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
}

最大值是 Integer 类型的最大值,但是 Map 的 size 可能超过 MAX_VALUE, 所以还有一个方法 mappingCount(),JDK 的建议使用 mappingCount() 而不是size()。mappingCount() 的代码如下:

public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}

以上可以看出,无论是 size() 还是 mappingCount(), 计算大小的核心方法都是 sumCount()。sumCount() 的代码如下:

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

CounterCell 这个类到底是什么?我们会发现它使用了 @sun.misc.Contended 标记的类,内部包含一个 volatile 变量。@sun.misc.Contended 这个注解标识着这个类防止需要防止 “伪共享”。

JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。

JDK8 ConcurrentHashMap CPU 100%

递归调用computeIfAbsent方法

线程安全的非阻塞队列

非阻塞队列有ConcurrentLinkedQueue, ConcurrentLinkedDeque。元素不能为null。以ConcurrentLinkedQueue为例,有头head和尾tail两个指针,遵循FIFO的原则进行入队和出队,方法有add(E e), peek()取出不删除, poll()取出删除, remove(Object o),size(), contains(Object o), addAll(Collection c), isEmpty()。ConcurrentLinkedDeque是双向队列,可以在头和尾两个方向进行相应的操作。

阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。 支持阻塞的移除方法:意思是队列为空时,获取元素的线程会等待队列变为非空。 任何阻塞队列中的元素都不能为null.

阻塞队列的插入和移除操作处理方式:

方法-处理方法 抛出异常 返回特殊值 可能阻塞等待 可设定等待时间
入队 add(e) offer(e) put(e) offer(e,timeout,unit)
出队 remove() poll() take() poll(timeout,unit)
查看 element() peek()

如果是无界队列,队列不可能出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true.

Java里的阻塞队列:ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。 LinkedeBlockingQueue:一个有链表结构组成的(有界/无界)阻塞队列。 PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列 。DelayQueue:一个使用优先级队列实现的无界阻塞队列。 SynchronousQueue:一个不存储元素的阻塞队列。 LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。 LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

ArrayBlockingQueue

此队列按照FIFO的原则对元素进行排序,支持公平和非公平策略,默认为不公平。初始化时必须设定容量大小。

public ArrayBlockingQueue(int capacity)
public ArrayBlockingQueue(int capacity, boolean fair)
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c)

LinkedBlockingQueue

与ArrayBlockingQueue一样,按照FIFO原则进行排序,内部采用链表结构,且不能设置为公平的。默认队列长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity)
public LinkedBlockingQueue(Collection<? extends E> c)

PriorityBlockingQueue

是一个支持优先级的无界阻塞队列,默认初始容量为11,默认情况下采用自然顺序升序排列,不能保证同优先级元素的顺序。底层采用二叉堆实现。内部元素要么实现Comparable接口,要么在初始化的时候指定构造函数的Comparator来对元素进行排序。

public PriorityBlockingQueue()
public PriorityBlockingQueue(int initialCapacity)
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator)
public PriorityBlockingQueue(Collection<? extends E> c)

DelayQueue

支持延时获取元素的无界阻塞队列。内部包含一个PriorityQueue来实现,队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。 DelayQueue非常有用,可以将DelayQueue运用在下面应用场景。

  • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行。
    public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
    }

SynchronousQueue

不存储元素的阻塞队列,支持公平和非公平策略。每一个put操作必须等待一个take操作,否则不能继续添加元素。非常适合传递性场景。

CopyOnWriteArrayList

在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。CopyOnWriteArrayList的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。“写时复制”容器返回的迭代器不会抛出ConcurrentModificationException并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。显然,每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时,仅当迭代操作远远多于修改操作时,才应该使用“写入时赋值”容器。

原子类

Java中Atomic包里一共提供了12个类,属于4种类型的原子更新方式,分别是原子更新基本类型、原子更新数组、原子更新引用、原子更新属性(字段)。Atomic包里的类基本都是使用Unsafe实现的包装类。

1)原子更新基本类型:AtomicBoolean,AtomicInteger, AtomicLong.
2)原子更新数组:AtomicIntegerArray,AtomicLongArray, AtomicReferenceArray.
3)原子更新引用类型:AtomicReference, AtomicStampedReference, AtomicMarkableReference.
4) 原子更新字段类型:AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater.

原子更新基本类型

AtomicBoolean,AtomicInteger, AtomicLong三个类提供的方法类似,以AtomicInteger为例:有int addAndGet(int delta), boolean compareAndSet(int expect, int update), int getAndIncrement(), void lazySet(int newValue),int getAndSet(int newValue)。

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;

public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

sun.misc.Unsafe只提供三种CAS方法:compareAndSwapObject, compareAndSwapInt和compareAndSwapLong。对于AtomicBoolean,它是先把Boolean转换成整形,再使用compareAndSwapInt进行CAS,原子更新char,float,double变量也可以用类似的思路来实现。

原子更新数组

以AtomicIntegerArray为例,此类主要提供原子的方式更新数组里的整形,常用方法如下:

  • int addAndGet(int i, int delta):以原子的方式将输入值与数组中索引i的元素相加。
  • boolean compareAndSet(int i, int expect, int update):如果当前值等于预期值,则以原子方式将数组位置i的元素设置成update值。
  • AtomicIntegerArray的两个构造方法:
    • AtomicIntegerArray(int length):指定数组的大小,并初始化为0
    • AtomicIntegerArray(int [] array):对给定的数组进行拷贝。

案例:

int value[] = new int[]{1,2,3};
AtomicIntegerArray aia = new AtomicIntegerArray(value);
System.out.println(aia.getAndSet(1, 9));
System.out.println(aia.get(1));
System.out.println(value[1]);

运行结果:2 9 2

原子更新引用示例

public class AtomicReferenceDemo {
public static void main(String[] args) {
User user = new User("conan", 15);
AtomicReference<User> userRef = new AtomicReference<>(user);
User userChg = new User("Shinichi", 17);
userRef.compareAndSet(user, userChg);
User newUser = userRef.get();
System.out.println(user);
System.out.println(userChg);
System.out.println(userRef.get());

AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
fieldUpdater.getAndIncrement(newUser);
System.out.println(newUser);
System.out.println(fieldUpdater.get(newUser));
}

@Data
@AllArgsConstructor
static class User{
private String name;
volatile int age;
}
}

输出:

AtomicReferenceDemo.User(name=conan, age=15)
AtomicReferenceDemo.User(name=Shinichi, age=17)
AtomicReferenceDemo.User(name=Shinichi, age=17)
AtomicReferenceDemo.User(name=Shinichi, age=18)
18

CAS

全称CompareAndSwap。假设有三个操作数:内存值V,旧的预期值A,要修改的值B,当且仅当预期值A和内存值V相同时,才会将内存值修改为B并返回true,否则什么都不做并返回false。当然CAS一定要配合volatile变量,这样才能保证每次拿到的遍历是主内存中最新的那个值,否则旧的预期值A对某条线程来说,永远是一个不会变的值A,只要某次CAS操作失败,永远都不可能成功。cmpxchg指令

AQS

全称AbstractQueuedSynchronizer。如果说JUC(java.util.concurrent)的基础是CAS的话,那么AQS就是整个JAVA并发包的核心了,ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore等都用到了它。

CAS的问题

  1. ABA问题发生在类似这样的场景:线程1转变使用CAS将变量A的值替换为C,在此时线程2将变量的值由A替换为C,又由C替换为A,然后线程1执行CAS时发现变量的值仍为A,所以CAS成功。但实际上这时的现场已经和最初的不同了。大多数情况下ABA问题不会产生什么影响。如果有特殊情况下由于ABA问题导致,可用采用AtomicStampedReference来解决,原理:乐观锁+version。

AtomicStampedReference

public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp)
  1. 循环时间长开销大

自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。

  1. 只能保证一个共享变量的原子操作

当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性。AtomicReference类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来执行CAS操作。

如何避免死锁

死锁是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,他们都将无法推进下去。这是一个严重的问题,因为死锁会让你的程序挂起无法完成任务,死锁的发生必须满足以下4个条件:

  1. 互斥条件:一个资源每次只能被一个进程使用。
  2. 请求与保持条件:一个进程因请求资源而阻塞时,对已获得的资源保持不放。
  3. 不剥夺条件:进程已获得的资源,在未使用完之前,不能强行剥夺。
  4. 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系。

避免死锁最简单的方法就是阻止循环等待条件,将系统中所有的资源设置标志位、排序,规定所有的进程申请资源必须以一定的顺序做操作来避免死锁。

支持定时的锁 boolean tryLock(long timeout, TimeUnit unit)

通过ThreadDump来分析找出死锁

怎么检测一个线程是否拥有锁

java.lang.Thread中有一个方法:public static native boolean holdsLock(Object obj). 当且仅当当前线程拥有某个具体对象的锁时返回true

JAVA中的线程调度算法

抢占式。一个线程用完CPU之后,操作系统会根据现场优先级、线程饥饿情况等数据算出一个总的优先级并分配下一个时间片给某个线程执行。

无锁

要保证现场安全,并不是一定就要进行同步,两者没有因果关系。同步只是保证共享数据争用时的正确性的手段,如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性,因此会有一些代码天生就是线程安全的。

  1. 无状态编程。无状态代码有一些共同的特征:不依赖于存储在对上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非无状态的方法等。可以参考Servlet。
  2. 线程本地存储。可以参考ThreadLocal
  3. volatile
  4. CAS
  5. 协程:在单线程里实现多任务的调度,并在单线程里维持多个任务间的切换。

欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×