并发的意义

并发通常是提高运行在单处理器上的程序的性能。如果程序中的某个任务因为该程序控制范围之外的某些条件(I/O)而导致不能继续执行,那么这个任务或线程就阻塞了。如果没有并发,整个程序都将停下来。从性能的角度来看,单处理器上使用并发没有任何好处。

进程与线程

实现并发有两种方式,多进程与多线程,最直接的方式是使用操作系统级别的进程。进程是运行在它自己地址空间内的自包容的程序。多任务操作系统可以周期性地将CPU从一个进程切换到另一个进程,来实现多进程。操作系统会将多进程隔离开,保证互相不会干涉。但进程通常会有数量和开销的限制,避免它们在不同的并发系统之间的可应用性。Java使用的多线程并发会共享内存和I/O这样的资源,编写多线程程序最基本的困难在于协调不同线程之间的资源使用,保证资源不会被同时访问。函数式编程语言做到了每个函数的调用都不能干涉其他函数,可以当做独立的任务来驱动。如果程序中的某个部分必须大量使用并发,可以考虑使用函数式编程实现。

  1. 进程

进程是资源分配的基本单位

进程有独立的空间地址,一个进程崩溃后,不会对其他进程造成影响,例如最常见的chrome浏览器的多标签也就是用的多进程,防止chrome卡死。

进程控制块 (Process Control Block, PCB) 描述进程的基本信息和运行状态,所谓的创建进程和撤销进程,都是指对 PCB 的操作

  1. 线程

    线程是基本调度的单位

一个进程可以有多个线程,共享进程资源

线程是进程中程序执行的不同路径。线程有自己的堆栈空间和局部变量,单线程之间没有单独的地址空间。一个线程挂了会导致整个进程挂掉。

  1. 区别
  • 资源

    进程是资源分配的基本单位,一个进程可以有多个线程,线程可以访问隶属进程的资源。进程间不会相互影响,多进程程序比较强健,鲁棒性高。线程间会相互影响,一个线程的崩溃会导致整个线程的崩溃。

  • 调度

    线程是调度的基本单位,线程切换开销小,进程切换开销大

  • 系统开销

    由于创建或撤销进程时,系统都要为之分配或回收资源,如内存空间、I/O 设备等,所付出的开销远大于创建或撤销线程时的开销。类似地,在进行进程切换时,涉及当前执行进程 CPU 环境的保存及新调度进程 CPU 环境的设置,而线程切换时只需保存和设置少量寄存器内容,开销很小。

  • 通信

    线程间可以通过直接读写同一进程中的数据(资源共享)进行通信,但是进程通信需要借助IPC。

进程调度算法

  1. 批处理系统

    先来先服务 first-come first-serverd(FCFS)、短作业优先 shortest job first(SJF)、最短剩余时间优先 shortest remaining time next(SRTN)

  2. 交互式系统

    时间片轮转、优先级调度、、多级反馈队列

    1. 实时系统

    进程同步

进程同步是一个操作系统级别的概念,是在多道程序的环境下,存在着不同的制约关系,为了协调这种互相制约的关系,实现资源共享和进程协作,从而避免进程之间的冲突,引入了进程同步。

临界资源

在操作系统中,进程是占有资源的最小单位(线程可以访问其所在进程内的所有资源,但线程本身并不占有资源或仅仅占有一点必须资源)。但对于某些资源来说,其在同一时间只能被一个进程所占用。这些一次只能被一个进程所占用的资源就是所谓的临界资源。典型的临界资源比如物理上的打印机,或是存在硬盘或内存中被多个进程所共享的一些变量和数据等(如果这类资源不被看成临界资源加以保护,那么很有可能造成丢数据的问题)。

对于临界资源的访问,必须是互诉进行。也就是当临界资源被占用时,另一个申请临界资源的进程会被阻塞,直到其所申请的临界资源被释放。而进程内访问临界资源的代码被成为临界区。

对于临界区的访问过程分为四个部分:

1.进入区:查看临界区是否可访问,如果可以访问,则转到步骤二,否则进程会被阻塞

2.临界区:在临界区做操作

3.退出区:清除临界区被占用的标志

4.剩余区:进程与临界区不相关部分的代码

进程同步方式

  • 临界区

    通过多线程串行化访问公共资源或一段代码,速度快,适合控制数据访问

    优点:保证在某一时刻只有一个线程能访问数据

    缺点:虽然临界区同步速度很快,但却只能用来同步本进程内的线程,而不可用来同步多个进程中的线程。

  • 信号量

    信号量(Semaphore)是一个整型变量,可以对其执行 down 和 up 操作,也就是常见的 P 和 V 操作。为控制一个具有有限数量用户资源而设计。它允许多个进程在同一时刻访问同一资源,但是需要限制在同一时刻访问此资源的最大进程数目。

    • down : 如果信号量大于 0 ,执行 -1 操作;如果信号量等于 0,进程睡眠,等待信号量大于 0;
    • up :对信号量执行 +1 操作,唤醒睡眠的进程让其完成 down 操作。

    down 和 up 操作需要被设计成原语,不可分割,通常的做法是在执行这些操作的时候屏蔽中断。

    如果信号量的取值只能为 0 或者 1,那么就成为了 互斥量(Mutex),0 表示临界区已经加锁,1 表示临界区解锁。

    优点:

    适用于对Socket(套接字)程序中线程的同步。(例如,网络上的HTTP服务器要对同一时间内访问同一页面的用户数加以限制,只有不大于设定的最大用户数目的线程能够进行访问,而其他的访问企图则被挂起,只有在有用户退出对此页面的访问后才有可能进入。)

    缺点:

    ①信号量机制必须有公共内存,不能用于分布式操作系统,这是它最大的弱点;

    ②信号量机制功能强大,但使用时对信号量的操作分散, 而且难以控制,读写和维护都很困难,加重了程序员的编码负担;

    ③核心操作P-V分散在各用户程序的代码中,不易控制和管理,一旦错误,后果严重,且不易发现和纠正。

进程通信

每个进程各自有不同的用户地址空间,任何一个进程的全局变量在另一个进程中都看不到,所以进程之间的数据交换必须通过内核,在内核中开辟出一块缓冲区域,进程1把数据拷到缓冲区,进程2在把数据从缓冲区拷走。

1. 管道(匿名管道)

管道的实质是一个内核缓冲区,进程以先进先出的方式从缓冲区存取数据,管道一端的进程顺序的将数据写入缓冲区,另一端的进程则顺序的读出数据。

  • 管道是半双工的,实现双方通信需要建立两个管道
  • 只能用于父子或者兄弟进程之间(具有血缘关系的进程)
  • 单独构成一个文件系统,管道对于管道两端的进程而言,就是一个文件,但他不是普通的文件,不属于某种系统文件系统,而是自立门户,单独构成一种文件系统,只存在于内存中。
  • 一个进程向管道写数据,另一个进程从另一端读出。写入的内容每次都添加在管道缓冲区的末尾,并且每次都是从缓冲区的头部读出数据。
  • 管道所传送的是无格式字节流,这就要求管道通信双方约定好数据格式

2. 有名管道(FIFO)

匿名管道,由于没有名字,只能用于亲缘关系的进程间通信。为了克服这个缺点,提出了有名管道(FIFO)。

有名管道不同于匿名管道之处在于它提供了一个路径名与之关联,以有名管道的文件形式存在于文件系统中,这样,即使与有名管道的创建进程不存在亲缘关系的进程,只要可以访问该路径,就能够彼此通过有名管道相互通信,因此,通过有名管道不相关的进程也能交换数据。值的注意的是,有名管道严格遵循先进先出(first in first out),对匿名管道及有名管道的读总是从开始处返回数据,对它们的写则把数据添加到末尾。它们不支持诸如lseek()等文件定位操作。有名管道的名字存在于文件系统中,内容存放在内存中。

3. 消息队列

  • 消息队列可以独立于读写进程存在,从而避免了 FIFO 中同步管道的打开和关闭时可能产生的困难;
  • 避免了 FIFO 的同步阻塞问题,不需要进程自己提供同步方法;
  • 读进程可以根据消息类型有选择地接收消息,而不像 FIFO 那样只能默认地接收。
  • 与管道(匿名管道:只存在于内存中的文件;有名管道:存在于实际的磁盘介质或者文件系统)不同的是消息队列存放在内核中,只有在内核重启(即,操作系统重启)或者显示地删除一个消息队列时,该消息队列才会被真正的删除。

4.信号

  • 信号是Linux系统中用于进程间互相通信或者操作的一种机制,信号可以在任何时候发给某一进程,而无需知道该进程的状态。
  • 如果该进程当前并未处于执行状态,则该信号就有内核保存起来,直到该进程回复执行并传递给它为止。
  • 如果一个信号被进程设置为阻塞,则该信号的传递被延迟,直到其阻塞被取消是才被传递给进程。

信号是软件层次上对中断机制的一种模拟,是一种异步通信方式,信号可以在用户空间进程和内核之间直接交互,内核可以利用信号来通知用户空间的进程发生了哪些系统事件,信号事件主要有两个来源:

  • 硬件来源:用户按键输入Ctrl+C退出、硬件异常如无效的存储访问等。
  • 软件终止:终止进程信号、其他进程调用kill函数、软件异常产生信号。

5. 信号量

信号量是一个计数器,用于多进程对共享数据的访问,信号量的意图在于进程间同步。

(1)创建一个信号量:这要求调用者指定初始值,对于二值信号量来说,它通常是1,也可是0。

(2)等待一个信号量:该操作会测试这个信号量的值,如果小于0,就阻塞。也称为P操作。

(3)挂出一个信号量:该操作将信号量的值加1,也称为V操作。

6.共享内存(shared memory)

允许多个进程共享一个给定的存储区。因为数据不需要在进程之间复制,所以这是最快的一种 IPC。为了在多个进程间交换信息,内核专门留出了一块内存区,可以由需要访问的进程将其映射到自己的私有地址空间。进程就可以直接读写这一块内存而不需要进行数据的拷贝,从而大大提高效率。由于多个进程共享一段内存,因此需要依靠某种同步机制(如信号量)来达到进程间的同步及互斥

7. 套接字

套接字是一种通信机制,凭借这种机制,客户/服务器(即要进行通信的进程)系统的开发工作既可以在本地单机上进行,也可以跨网络进行。也就是说它可以让不在同一台计算机但通过网络连接计算机上的进程进行通信。

socket是应用层和传输层之间的桥梁

java并发

1. 线程的状态

  • new(新建)

    线程已经创建但未启动

  • Runnable (可运行)

    线程正在运行或者正在等待cpu时间切片

  • Blocked (阻塞)

    等待获取排它锁中

  • Time waiting (限期等待)

    无需等待其它线程显式地唤醒,在一定时间之后会被系统自动唤醒。阻塞和等待的区别在于,阻塞是被动的,它是在等待获取一个排它锁。而等待是主动的,通过调用 Thread.sleep() 和 Object.wait() 等方法进入。

    | 进入方法 | 退出方法 |
    | ———————————————————— | ———————————————————————- |
    | Thread.sleep() 方法 | 时间结束 |
    | 设置了 Timeout 参数的 Object.wait() 方法 | 时间结束 / Object.notify() / Object.notifyAll() |
    | 设置了 Timeout 参数的 Thread.join() 方法 | 时间结束 / 被调用的线程执行完毕 |
    | LockSupport.parkNanos() 方法 | LockSupport.unpark(Thread) |
    | LockSupport.parkUntil() 方法 | LockSupport.unpark(Thread) |

  • Waiting (无限期等待)

    等待其它线程显式地唤醒,否则不会被分配 CPU 时间片。

    | 进入方法 | 退出方法 |
    | ————————————————————— | —————————————————— |
    | 没有设置 Timeout 参数的 Object.wait() 方法 | Object.notify() / Object.notifyAll() |
    | 没有设置 Timeout 参数的 Thread.join() 方法 | 被调用的线程执行完毕 |
    | LockSupport.park() 方法 | LockSupport.unpark(Thread) |

  • Terminated (死亡)

    可以是线程结束任务之后自己结束,或者产生了异常而结束

2. 实现多线程的方式

  • 实现Runnable接口并实现run()方法

错误实例

1
2
3
4
5
6
7
8
9
public class MyRunnable implements Runnable {
public void run() {
System.out.println("Thread is running");
}
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
instance.run(); //这里并没有使用额外线程,仍在main线程中执行,这个类本身不会产生任何内在的线程能力,要实现线程行为,必须显示地将任务附着到线程上
}
}

正确实例

1
2
3
4
5
6
7
8
9
10
public class MyRunnable implements Runnable {
public void run() {
System.out.println("Thread is running");
}
public static void main(String[] args) {
MyRunnable instance = new MyRunnable();
Thread thread = new Thread(instance);
thread.start(); //调用Thread的start方法为该线程执行必须的初始化操作然后调用Runnable的run()方法
}
}

思考:

Q1:主线程和子线程结束的顺序?

任何线程都可以启动另一个线程,这两个线程是独立的两个线程会并行执行,结束的顺序完全取决于线程本身的运行时间。通过对调用setDaemon(true)可以设置线程为后台线程,后台线程是程序运行时提供的通用服务线程并且这种线程不属于程序不可或缺的一部分,当所有非后台线程结束时,程序就已经结束了。

**Q2: 执行完

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

一个线程会创建一个单独的执行线程,在对start()方法调用后,它仍会继续存在。每个Thread都注册了自己,有一个对自己的引用,在run()方法结束并死亡之前,垃圾回收器无法回收它

- 继承Thread

同样也是需要实现 run() 方法,因为 Thread 类也实现了 Runable 接口。

当调用 start() 方法启动一个线程时,虚拟机会将该线程放入就绪队列中等待被调度,当一个线程被调度时会执行该线程的 run() 方法。

```java
public class MyThread extends Thread {
public void run() {
System.out.println("Thread is running");
}
public static void main(String[] args) {
MyThread mt = new MyThread();
mt.start();
}
}

  • 实现callable接口

与 Runnable 相比,Callable 可以有返回值(从call方法获得返回值),返回值通过 FutureTask 进行封装。

1
2
3
4
5
6
7
8
9
10
11
12
public class MyCallable implements Callable<Integer> {
public Integer call() {
return 123;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyCallable mc = new MyCallable();
FutureTask<Integer> ft = new FutureTask<>(mc);
Thread thread = new Thread(ft);
thread.start();
System.out.println(ft.get());
}
}

3. Executor

java.util.concurrent包中的Executor可以管理Thread对象,从而简化并发编程。Executor在客户端和执行任务之间提供了一个间接层,与客户端直接执行任务不同,这个中介方法将执行任务。Executor可以管理多个异步任务(多个任务执行互不干扰,不需要进行同步),无须显示地管理线程生命周期。

ExecutorService对象是由Executors的静态方法实现的

  • CachedThreadPool 一个任务创建一个线程
  • FixedThreadPool 所有任务共享数量有限的线程,不需要为每个任务都固定地付出创建线程的开销(线程池技术)
  • SingleThreadPool 线程数量为1的FixedThreadPool。如果向SingleThreadPool提交多个任务,每个任务将会串行执行,所有任务使用相同线程。
1
2
3
4
5
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new MyRunnable()); //execute执行一个任务
Future<Integer> f = exec.submit(new MyCallable()); //submit方法会产生一个feature对象,java通过泛型实现
f.isDone(); //返回一个boolean值,查询Future是否已完成
f.get(); //方法将阻塞,直到call方法执行结束并返回结果

4. 线程方法

线程的异常必须在线程内处理,异常不能跨线程传播

  • Thread.currentThread

    获取当前线程对象的引用

  • Thread.sleep

    休眠当前正在执行的线程

  • Thread.yield

    对静态方法 Thread.yield() 的调用声明了当前线程已经完成了生命周期中最重要的部分,可以切换给其它线程来执行。该方法只是对线程调度器的一个建议,而且也只是建议具有相同优先级的其它线程可以运行。

5. 线程协作

  • join

    在线程中调用另一个线程的 join() 方法,会将当前线程挂起,直到目标线程结束后恢复。也可以带一个超时参数,保证目标线程在超时后,join方法可以返回

  • wait() notify() notifyAll()

    调用 wait() 使得线程等待某个条件满足,线程在等待时会被挂起,当其他线程的运行使得这个条件满足时,其它线程会调用 notify() 或者 notifyAll() 来唤醒挂起的线程。使用 wait() 挂起期间,线程会释放锁。这是因为,如果没有释放锁,那么其它线程就无法进入对象的同步方法或者同步控制块中,那么就无法执行 notify() 或者 notifyAll() 来唤醒挂起的线程,造成死锁。

  • await() signal() signalAll()

    java.util.concurrent 类库中提供了 Condition 类来实现线程之间的协调,可以在 Condition 上调用 await() 方法使线程等待,其它线程调用 signal() 或 signalAll() 方法唤醒等待的线程。相比于 wait() 这种等待方式,await() 可以指定等待的条件,因此更加灵活。

5. Java内存模型

  1. 原子性

Java 内存模型保证了 read、load、use、assign、store、write、lock 和 unlock 操作具有原子性

  1. 可见性

可见性指当一个线程修改了共享变量的值,其它线程能够立即得知这个修改。Java 内存模型是通过在变量修改后将新值同步回主内存,在变量读取前从主内存刷新变量值来实现可见性的。

  1. 有序性

有序性是指:在本线程内观察,所有操作都是有序的。在一个线程观察另一个线程,所有操作都是无序的,无序是因为发生了指令重排序。在 Java 内存模型中,允许编译器和处理器对指令进行重排序,重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性。

6. 线程安全

多个线程不管以何种方式方式访问某个类,并且在主调函数中不需要进行任何同步,都能表现出正确的行为。

引发线程安全问题主要是因为多线程会同时操作共享数据(临界资源)

6.1 不可变

不可变(Immutable)的对象一定是线程安全的,不需要再采取任何的线程安全保障措施。只要一个不可变的对象被正确地构建出来,永远也不会看到它在多个线程之中处于不一致的状态。多线程环境下,应当尽量使对象成为不可变,来满足线程安全。

  • final关键词修饰
  • String
  • 枚举类型
  • Number 部分子类,如 Long 和 Double 等数值包装类型,BigInteger 和 BigDecimal 等大数据类型。但同为 Number 的原子类 AtomicInteger 和 AtomicLong 则是可变的。

6.2 互斥同步

  • synchronized
  • ReentrantLock

6.3 非阻塞同步

互斥同步最主要的问题就是线程阻塞和唤醒所带来的性能问题,因此这种同步也称为阻塞同步。

互斥同步属于一种悲观的并发策略,总是认为只要不去做正确的同步措施,那就肯定会出现问题。无论共享数据是否真的会出现竞争,它都要进行加锁(这里讨论的是概念模型,实际上虚拟机会优化掉很大一部分不必要的加锁)、用户态核心态转换、维护锁计数器和检查是否有被阻塞的线程需要唤醒等操作。

6.3.1 CAS

随着硬件指令集的发展,我们可以使用基于冲突检测的乐观并发策略:先进行操作,如果没有其它线程争用共享数据,那操作就成功了,否则采取补偿措施(不断地重试,直到成功为止)。这种乐观的并发策略的许多实现都不需要将线程阻塞,因此这种同步操作称为非阻塞同步。

乐观锁需要操作和冲突检测这两个步骤具备原子性,这里就不能再使用互斥同步来保证了,只能靠硬件来完成。硬件支持的原子性操作最典型的是:比较并交换(Compare-and-Set,CAS)。CAS 指令需要有 3 个操作数,分别是内存地址 V、旧的预期值 A 和新值 B。当执行操作时,只有当 V 的值等于 A,才将 V 的值更新为 B。

6.3.2 原子类

AtomicInteger/AtomicLong/AtomicReference

6.4 无同步方案

要保证线程安全,并不是一定就要进行同步。如果一个方法本来就不涉及共享数据,那它自然就无须任何同步措施去保证正确性。

6.4.1 栈封闭

多个线程访问同一个方法的局部变量时,不会出现线程安全问题,因为局部变量存储在虚拟机栈中,属于线程私有的。

6.4.2 线程本地存储(Thread Local Storage)

如果一段代码中所需要的数据必须与其他代码共享,那就看看这些共享数据的代码是否能保证在同一个线程中执行。如果能保证,我们就可以把共享数据的可见范围限制在同一个线程之内,这样,无须同步也能保证线程之间不出现数据争用的问题。

符合这种特点的应用并不少见,大部分使用消费队列的架构模式(如“生产者-消费者”模式)都会将产品的消费过程尽量在一个线程中消费完。其中最重要的一个应用实例就是经典 Web 交互模型中的“一个请求对应一个服务器线程”(Thread-per-Request)的处理方式,这种处理方式的广泛应用使得很多 Web 服务端应用都可以使用线程本地存储来解决线程安全问题

可以使用 java.lang.ThreadLocal 类来实现线程本地存储功能。每个 Thread 都有一个 ThreadLocal.ThreadLocalMap 对象。

6.4.3 可重入代码(Reentrant Code)

这种代码也叫做纯代码(Pure Code),可以在代码执行的任何时刻中断它,转而去执行另外一段代码(包括递归调用它本身),而在控制权返回后,原来的程序不会出现任何错误。

可重入代码有一些共同的特征,例如不依赖存储在堆上的数据和公用的系统资源、用到的状态量都由参数中传入、不调用非可重入的方法等。

锁优化

Java中的最常见的两种锁为Sychronized和ReentrantLock

Sychronized原理

Java 虚拟机中的同步(Synchronization)基于进入和退出管程(Monitor)对象实现,无论是显式同步(有明确的 monitorenter 和 monitorexit 指令,即同步代码块)还是隐式同步都是如此。

  • 同步语句块

    使用的是monitorenter 和 monitorexit 指令,其中monitorenter指令指向同步代码块的开始位置,monitorexit指令则指明同步代码块的结束位置

  • 同步方法

    Synchronized修饰的方法并没有monitorenter指令和monitorexit指令,取得代之的是ACC_SYNCHRONIZED标识,该标识指明了该方法是一个同步方法,JVM通过该ACC_SYNCHRONIZED访问标志来辨别一个方法是否声明为同步方法,从而执行相应的同步调用。在JVM中,对象在内存中的布局分为三块区域:对象头、实例数据和对齐填充。Java对象头是实现synchronized的锁对象的基础

在Java早期版本中,synchronized属于重量级锁,效率低下,因为监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock来实现的,而操作系统实现线程之间的切换时需要从用户态转换到核心态,这个状态之间的转换需要相对比较长的时间,时间成本相对较高,这也是为什么早期的synchronized效率低的原因。庆幸的是在Java 6之后Java官方对从JVM层面对synchronized较大优化,所以现在的synchronized锁效率也优化得很不错了

自旋锁

互斥同步进入阻塞状态的开销都很大,应该尽量避免。在许多应用中,共享数据的锁定状态只会持续很短的一段时间。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

锁消除

锁消除是指对于被检测出不可能存在竞争的共享数据的锁进行消除。锁消除主要是通过逃逸分析来支持,如果堆上的共享数据不可能逃逸出去被其它线程访问到,那么就可以把它们当成私有数据对待,也就可以将它们的锁进行消除。

锁粗化

如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗。如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部。

轻量级锁和偏向锁

JDK 1.6 引入了偏向锁和轻量级锁,从而让锁拥有了四个状态:无锁状态(unlocked)、偏向锁状态(biasble)、轻量级锁状态(lightweight locked)和重量级锁状态(inflated)。

轻量级锁是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

偏向锁的思想是偏向于让第一个获取锁对象的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要。当有另外一个线程去尝试获取这个锁对象时,偏向状态就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。

java.util.concurrent

ConcurrentHashMap

ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment

  • HashEntry 用来封装映射表的键 / 值对

  • Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶(分段锁)

    Segment类继承自ReentrantLock类,从而使得Segment对象可以充当锁的角色。

使用分段锁来实现多线程并发写操作,在 ConcurrentHashMap 中,线程对映射表做读操作时,一般情况下不需要加锁就可以完成,对容器做结构性修改的操作才需要加锁。

1
2
3
4
5
6
7
public V put(K key, V value) { 
if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值
throw new NullPointerException();
int hash = hash(key.hashCode()); // 计算键对应的散列码
// 根据散列码找到对应的 Segment
return segmentFor(hash).put(key, hash, value, false);
}

CountDownLatch

使用CountDownLatch对象设置一个初始计数器,任何在这个对象上调用wait()的方法都会被阻塞,直到计数值达到0,其他任务结束时,可以调用countDown()来减少这个计数值。CountDown设计为只能被触发一次,计数不能被重置

CyclicBarrier

适用于一组任务,他们并行地执行,然后在进行下一个步骤之前等待,直到所有的任务都完成。它使得所有的并行任务都将在栅栏处等待,可以一致性地向前移动。类似于可以多次触发的CountDownLatch

BlockingQueue

  • FIFO 队列 :LinkedBlockingQueue、ArrayBlockingQueue(固定长度)
  • 优先级队列 :PriorityBlockingQueue
  • 延时队列:DelayQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能取走,队头对象的延迟到期时间最长

提供了阻塞的 take() 和 put() 方法:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置。

ForkJoin

ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。img

计算从0累加到10000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class ForkJoinExample extends RecursiveTask<Integer> {

private final int threshold = 5;
private int first;
private int last;

public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}

@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任务足够小则直接计算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任务
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}