Java常用类 - BigDecimal

BigDecimal

BigDecimal 可以实现对浮点数的运算,不会造成精度丢失。通常情况下,大部分需要浮点数精确运算结果的业务场景(比如涉及到钱的场景)都是通过 BigDecimal 来做的。

BigDecimal 常见方法

创建

使用**BigDecimal(String val)构造方法或者 BigDecimal.valueOf(double val) 静态方法来创建对象。禁止使用BigDecimal(double val)**方式把double值转化为BigDecimal对象,因为存在精度损失风险。

加减乘除

add 方法用于将两个 BigDecimal 对象相加,subtract 方法用于将两个 BigDecimal 对象相减。multiply 方法用于将两个 BigDecimal 对象相乘,divide 方法用于将两个 BigDecimal 对象相除。

1
2
3
4
5
6
7
BigDecimal a = new BigDecimal("1.0");
BigDecimal b = new BigDecimal("0.9");
System.out.println(a.add(b));// 1.9
System.out.println(a.subtract(b));// 0.1
System.out.println(a.multiply(b));// 0.90
System.out.println(a.divide(b));// 无法除尽,抛出 ArithmeticException 异常
System.out.println(a.divide(b, 2, RoundingMode.HALF_UP));// 1.11

使用 divide 方法的时候尽量使用 3 个参数版本,并且RoundingMode 不要选择 UNNECESSARY,否则很可能会遇到 ArithmeticException(无法除尽出现无限循环小数的时候),其中 scale 表示要保留几位小数,roundingMode 代表保留规则,常见保留规则的枚举类型为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public enum RoundingMode {
// 2.5 -> 3 , 1.6 -> 2
// -1.6 -> -2 , -2.5 -> -3
UP(BigDecimal.ROUND_UP),
// 2.5 -> 2 , 1.6 -> 1
// -1.6 -> -1 , -2.5 -> -2
DOWN(BigDecimal.ROUND_DOWN),
// 2.5 -> 3 , 1.6 -> 2
// -1.6 -> -1 , -2.5 -> -2
CEILING(BigDecimal.ROUND_CEILING),
// 2.5 -> 2 , 1.6 -> 1
// -1.6 -> -2 , -2.5 -> -3
FLOOR(BigDecimal.ROUND_FLOOR),
// 2.5 -> 3 , 1.6 -> 2
// -1.6 -> -2 , -2.5 -> -3
HALF_UP(BigDecimal.ROUND_HALF_UP),
//......
}

大小比较

a.compareTo(b) : 返回 -1 表示 a 小于 b,0 表示 a 等于 b , 1 表示 a 大于 b。

保留几位小数

通过 setScale方法设置保留几位小数以及保留规则。保留规则有挺多种,不需要记,IDEA 会提示。

1
2
3
BigDecimal m = new BigDecimal("1.255433");
BigDecimal n = m.setScale(3,RoundingMode.HALF_DOWN);
System.out.println(n);// 1.255

等值比较

BigDecimal 使用 equals() 方法进行等值比较出现问题的代码示例:

1
2
3
BigDecimal a = new BigDecimal("1");
BigDecimal b = new BigDecimal("1.0");
System.out.println(a.equals(b));//false

这是因为 equals() 方法不仅仅会比较值的大小(value)还会比较精度(scale),而 compareTo() 方法比较的时候会忽略精度。1.0 的 scale 是 1,1 的 scale 是 0,因此 a.equals(b) 的结果是 false。

compareTo()** 方法可以比较两个 BigDecimal 的值,如果相等就返回 0,如果第 1 个数比第 2 个数大则返回 1,反之返回-1。**

Bigdecimal一定不会丢失精度吗?

不是的,Bigdecimal也会丢失精度。精度丢失情况分两种:

第一种:不能分解为以(1/2)^n为单位的十进制小数,例如无限循环的小数未指定位数

Bigdecimal可以比double更能保证精度的原因在于:****十进制整数在转化成二进制数时不会有精度问题,那么把十进制小数扩大N倍让它在整数的维度上进行计算,并保留相应的精度信息就能保证精度。

但当出现无限循环的计算结果时,Bigdecimal也只能通过开发者指定保留位数和保留规则来计算结果,例如计算结果为1/3:

1
2
3
BigDecimal b1 = new BigDecimal("1");
BigDecimal b2 = new BigDecimal("3");
System.out.println(b1.divide(b2));

则会报错:

1
2
3
Exception in thread "main" java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result.
at java.math.BigDecimal.divide(BigDecimal.java:1707)
at test01.main(test01.java:26)

需要指定保留位数和保留规则,如下:

1
2
3
BigDecimal b1 = new BigDecimal("1");
BigDecimal b2 = new BigDecimal("3");
System.out.println(b1.divide(b2,3, RoundingMode.UP));
1
0.334

第二种:使用Bigdecimal(String s)之外的其他构造器

1
2
3
4
5
6
BigDecimal a = new BigDecimal(1.01);
BigDecimal b = new BigDecimal(1.02);
BigDecimal c = new BigDecimal("1.01");
BigDecimal d = new BigDecimal("1.02");
System.out.println(a.add(b));
System.out.println(c.add(d));
1
2
2.0300000000000000266453525910037569701671600341796875
2.03

double类型的变量在用二进制保存时本身就存在精度丢失问题,如果将其作为构造器的输入则无法正确保留精度。

枚举类

枚举的意义

我们在很多时候会拿枚举和常量来做对比,可实际我们在程序中大量使用枚举的地方就是为了代替常量。因为相对于静态的常量,枚举类显得更加直观,类型也相对更加安全

枚举在生活中非常常见,列举如下:

  • 表示星期几:SUNDAY、MONDAY、TUESTDAY、WEDNESDAY、THURSDAY、FRIDAY、SATURDAY就是一个枚举;
  • 性别:MALE(男)、FEMALE(女)也是一个枚举;
  • 订单的状态:PAIDED(已付款)、UNPAIDED(未付款)、FINISHED(已完成),CANCELED(已取消)。

安全性

若一个方法中要求入参为int类型,开发者传入任意的int类型值就行,但是如果是枚举类型的话,就只能传入枚举类中包含的对象。

命名空间

开发者要在命名的时候建议以枚举类的共有属性名称开头,例如定义季节的枚举类,在枚举类中定义四个季节,则枚举类以SEASON_开头,这样另外一个开发者再看这段代码的时候,才知道这四个常量分别代表季节。

枚举的特点

  • enum和class、interface的地位一样
  • 类的对象只有有限个,确定的。我们称此类为枚举类;
  • 当需要定义一组常量时,强烈建议使用枚举类;
  • 如果枚举类中只有一个对象,则可以作为单例模式的实现方式
  • switch参数可以使用Enum;
  • enum 允许为eunm 实例编写方法。所以可以为每个enum 实例赋予各自不同的行为
  • 使用enum定义的枚举类默认继承了java.lang.Enum,而不是继承Object类,由于Java是单继承机制,所以枚举类不可以被继承,枚举类可以实现一个或多个接口。
  • 枚举类的所有实例都必须放在第一行展示,不需使用new 关键字,不需显式调用构造器。自动添加public static final修饰。
  • 枚举类的构造器只能是私有的

枚举类应用

基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//1.最简单的枚举类
public enum Season {
//1.下面这几个就是枚举类的所有对象,多个对象之间用","隔开,末尾对象";"结束
spring,
SUMMER,
AUTUMN,
WINTER;
}

//可以通过 类名+. 的方式获取其实例化对象的引用
public static void main(String[] args) {
//获取枚举类的对象
Season season = Season.spring;
//输出结果是: spring
System.out.println(season);
//此时的 Season.spring可以当做一个常量
}

//2. enum类也可以定义构造器、属性、方法。
//其中,构造器只能是私有的(默认是private)
public enum Season {
spring("春天","绿色"),
SUMMER("夏天","橙色"),
AUTUMN("秋天","黄色"),
WINTER("冬天","白色");

private final String name;
private final String color;

Season(String name,String color) {
this.name = name;
this.color = color;
}

public String getName() {
return name;
}

public String getColor() {
return color;
}
}
//可以通过 类名+. 的方式获取其实例化对象的引用
public static void main(String[] args) {
Season season = Season.spring;
//输出结果是:绿色
System.out.println(season.getColor());
System.out.println(Season.spring.getColor());
}

常用方法

java.lang.Enum类 是 Java 语言枚举类型的公共基类,我们使用enum关键字定义的枚举类,是隐式继承自Enum类的,下面我们来看一下Enum类的常用方法:

  • **values()**:返回枚举类型的对象数组。该方法可以很方便的遍历所有的枚举值;
  • **valueOf()**:可以把一个字符串转换为对应的枚举类对象。要求字符串必须是枚举类对象的“名字”,如果不是,会抛出IllegalArguementException;
  • **toString()**:返回当前枚举类对象常量的名称。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Test2 {
public static void main(String[] args) {
Test test1 = Test.man;
System.out.println("调用 toString方法");
System.out.println(test1.toString());

System.out.println("调用values方法");
Test[] values = Test.values();
for (Test value : values){
System.out.println(value);
}

System.out.println("调用values方法");
Test test2 = Test.valueOf("man");
System.out.println(test2);
}

enum Test{
//使用enum关键字生成枚举类
//1.枚举类内部提供多个对象,这些对象用逗号分隔开来
//2.声明枚举类的属型
//3.编写构造方法,为属型赋值
//3.提供获得属型的Getter方法(封装里的知识)
man("男"),
woman("女"),
unknow("未知");

private final String sexName;

Test(String sexName){
this.sexName=sexName;
}
public String getSexName() {
return sexName;
}
}
}
1
2
3
4
5
6
7
8
调用 toString方法
man
调用values方法
man
woman
unknow
调用values方法
man

值得注意的是,当调用valuOf()方法时,我们传递的对象的“名字”,在枚举类中并不存在,此时会抛出运行时异常:****IllegalArgumentException,实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Test2 {
public static void main(String[] args) {
Test test1 = Test.man;
System.out.println("调用values方法");
Test test2 = Test.valueOf("man1");
System.out.println(test2);
}

enum Test{
man("男"),
woman("女"),
unknow("未知");
private final String sexName;
Test(String sexName){
this.sexName=sexName;
}
public String getSexName() {
return sexName;
}
}
}
1
2
3
4
5
调用values方法
Exception in thread "main" java.lang.IllegalArgumentException: No enum constant com.caq.exception.Test2.Test.man1
at java.lang.Enum.valueOf(Enum.java:238)
at com.caq.exception.Test2$Test.valueOf(Test2.java:12)
at com.caq.exception.Test2.main(Test2.java:8)

枚举类对象实现接口的方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
interface Info{
void show();
}

//使用enum关键字枚举类
enum Season1 implements Info{
SPRING("春天","春暖花开"){
@Override
public void show() {
System.out.println("春天在哪里?");
}
},
SUMMER("夏天","夏日炎炎"){
@Override
public void show() {
System.out.println("宁夏");
}
},
AUTUMN("秋天","秋高气爽"){
@Override
public void show() {
System.out.println("秋天不回来");
}
},
WINTER("冬天","冰天雪地"){
@Override
public void show() {
System.out.println("大约在冬季");
}
};
}

单例模式

单例模式可以说是每个Java开发者必须掌握的一个设计模式了,通常我们说它的实现,有饱汉式和饿汉式,也有经常说的双重判断,今天我们介绍另外一种方式,借助枚举来实现:

1
2
3
4
5
6
7
8
9
10
11
public enum SingleEnum {
INSTANCE;

public void print(String word) {
System.out.println(word);
}
}
@Test
public void testSingle() {
SingleEnum.INSTANCE.print("hello world");
}

如上,用枚举实现单例模式真的非常简单,将类声明为枚举,内部只定义一个值即可。这样做主要是基于枚举的三个特性,即:枚举类不能new,因此保证单例、枚举类不能被继承、类不加载时,不会实例化

使用枚举类创建的单例还有一个好处,就是使用了反射也无法打破它的单例性质,这是相比较于其他的实现方式的一个优点。但是在实际的项目中这种写法并不常见,这是因为我们习惯了将枚举作为常量来使用,很少在枚举类类中,添加复杂的业务逻辑

策略模式

除了轻松实现上面的单例模式之外,枚举还可以非常简单的实现策略模式,比如下面这个例子:现有一个接口,通过接受的参数,来决定最终的数据存在什么地方,如果按照正常的写法,可能就是很多的if/else

1
2
3
4
5
6
7
8
9
10
11
12
public void save(String type, Object data) {
if ("mysql".equals(type) {
// 保存到mysql
saveMySQL(data);
} else if ("redis".equals(type))
// 保存在redis
saveRedis(data);
} else if ("file".eqauls(type)) {
// 保存在文件
saveFile(type);
}
}

以上写法虽说简单直观,但是当type类型多了之后,这个if/else的代码行数就会越来越多了,而且看起来也不美观。如果我们换成枚举,基于策略模式的思想来解决上面的if/else问题,就会好的多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public enum SaveStrategyEnum {
MYSQL("mysql") {
@Override
public void save(Object obj) {
System.out.println("save in mysql:" + obj);
}
},
REDIS("redis") {
@Override
public void save(Object obj) {
System.out.println("save in redis: " + obj);
}
},
FILE("file") {
@Override
public void save(Object obj) {
System.out.println("save in file: " + obj);
}
};

private String type;

SaveStrategyEnum(String type) {
this.type = type;
}

public abstract void save(Object obj);

public static SaveStrategyEnum typeOf(String type) {
for (SaveStrategyEnum strategyEnum: values()) {
// equalsIgnoreCase() 方法用于将字符串与指定的对象比较,不考虑大小写
if (strategyEnum.type.equalsIgnoreCase(type)) {
return strategyEnum;
}
}
return null;
}
}

public void save(String type, Object data) {
SaveStrategyEnum strategyEnum = SaveStrategyEnum.typeOf(type);
if (strategyEnum != null) {
strategyEnum.save(data);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class enumTest {
public static void main(String[] args) {
enumTest enumTest = new enumTest();
enumTest.save("MYSQL","我是MySQL");
}
public void save(String type, Object data) {
SaveStrategyEnum strategyEnum = SaveStrategyEnum.typeOf(type);
if (strategyEnum != null) {
strategyEnum.save(data);
}
}
}
1
save in mysql:我是MySQL

以上主要利用的是抽象类 + 枚举来完成不同的策略具体实现,这种实现方式体现了抽象方法的使用 (在模板设计模式中,更能体会抽象方法的使用妙处),利用枚举原生提供的values(),来实现遍历,找到目标

Stream流函数

初步认识

在JAVA中,涉及到对数组Collection等集合类中的元素进行操作的时候,通常会通过循环的方式进行逐个处理,或者使用Stream的方式进行处理。

例如,现在有这么一个需求:

从给定句子中返回单词长度大于7的单词列表,按长度倒序输出,最多返回5个:在JAVA7及之前的代码中,我们会可以照如下的方式进行实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public List<String> sortGetTop5LongWords(@NotNull String sentence) {
// 先切割句子,获取具体的单词信息
String[] words = sentence.split(" ");
List<String> wordList = new ArrayList<>();
// 循环判断单词的长度,先过滤出符合长度要求的单词
for (String word : words) {
if (word.length() > 7) {
wordList.add(word);
}
}
// 对符合条件的列表按照长度进行排序
wordList.sort((o1, o2) -> o2.length() - o1.length());
// 判断list结果长度,如果大于3则截取前三个数据的子list返回
if (wordList.size() > 5) {
wordList = wordList.subList(0, 5);
}
return wordList;
}

JAVA8及之后的版本中,引入了Stream流,我们可以更加优雅的写出如下代码:

1
2
3
4
5
6
7
8

public List<String> sortGetTop5LongWordsByStream(@NotNull String sentence) {
return Arrays.stream(sentence.split(" "))
.filter(word -> word.length() > 7)
.sorted((o1, o2) -> o2.length() - o1.length())
.limit(5)
.collect(Collectors.toList());
}

直观感受上,Stream的实现方式代码更加简洁、一气呵成。很多的同学在代码中也经常使用Stream流,但是对Stream流的认知往往也是仅限于会一些简单的filtermapcollect等操作,但JAVA的Stream可以适用的场景与能力远不止这些。

基本介绍

Stream流操作分为3种类型

  • 创建Stream
  • Stream中间处理
  • 终止Steam

每个Stream管道操作类型都包含若干API方法,先列举下各个API方法的功能介绍。

开始管道

主要负责新建一个Stream流,或者基于现有的数组、List、Set、Map等集合类型对象创建出新的Stream流。

stream():创建出一个新的stream串行流对象

parallelStream():创建出一个可并行执行的stream流对象

Stream.of():通过给定的一系列元素创建一个新的Stream串行流对象

中间管道

负责对Stream进行处理操作,并返回一个新的Stream对象,中间管道操作可以进行叠加

filter():按照条件过滤符合要求的元素返回新的stream流

map():将已有元素转换为另一个对象类型一对一逻辑返回新的stream流

flatMap():将已有元素转换为另一个对象类型一对多逻辑,即原来一个元素对象可能会转换为1个或者多个新类型的元素,返回新的stream流

limit()仅保留集合前面指定个数的元素,返回新的stream流

skip()跳过集合前面指定个数的元素,返回新的stream流

concat():将两个流的数据合并起来为一个新的流,返回新的stream流

distinct():对Stream中所有元素进行去重,返回新的stream流

sorted():对stream中所有的元素按照指定规则进行排序,返回新的stream流

终止管道

通过终止管道操作之后,Stream流将会结束,最后可能会执行某些逻辑处理,或者是按照要求返回某些执行后的结果数据。

count():返回stream处理后最终的元素个数

max():返回stream处理后的元素最大值

min():返回stream处理后的元素最小值

findFirst():找到第一个符合条件的元素时则终止流处理

findAny():找到任何一个符合条件的元素时则退出流处理,这个对于串行流时与findFirst相同,对于并行流时比较高效,任何分片中找到都会终止后续计算逻辑

anyMatch():返回一个boolean值,类似于isContains(),用于判断是否有符合条件的元素

allMatch():返回一个boolean值,用于判断是否所有元素都符合条件

noneMatch():返回一个boolean值, 用于判断是否所有元素都不符合条件

collect():将流转换为指定的类型,通过Collectors进行指定

toArray():将流转换为数组

iterator():将流转换为Iterator对象

foreach():无返回值,对元素进行逐个遍历,然后执行给定的处理逻辑

方法使用

map与flatMap

mapflatMap都是用于转换已有的元素为其它元素,区别点在于:

  • map 必须是一对一的,即每个元素都只能转换为1个新的元素
  • flatMap 可以是一对多的,即每个元素都可以转换为1个或者多个新的元素

比如:有一个字符串ID列表,现在需要将其转为User对象列表。可以使用map来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

/**
* map函数使用:一对一转换
*/
public void stringToIntMap() {
List<String> ids = Arrays.asList("111", "222", "333", "444", "555", "666", "777");
// 使用流操作
List<User> results = ids.stream()
.map(id -> {
User user = new User();
user.setId(id);
return user;
})
.collect(Collectors.toList());
System.out.println(results);
}

执行之后,会发现每一个元素都被转换为对应新的元素,但是前后总元素个数是一致的:

1
2
3
4
5
6
7
8

[User{id='111'},
User{id='222'},
User{id='333'},
User{id='444'},
User{id='555'},
User{id='666'},
User{id='777'}]

再比如:现有一个句子列表,需要将句子中每个单词都提取出来得到一个所有单词列表。这种情况用map就无法满足需求了,需要使用flatMap的特性:

1
2
3
4
5
6
7
8
9

public void stringToIntFlatmap() {
List<String> sentences = Arrays.asList("hello world","your name is Chris");
// 使用流操作
List<String> results = sentences.stream()
.flatMap(sentence -> Arrays.stream(sentence.split(" ")))
.collect(Collectors.toList());
System.out.println(results);
}
1
[hello, world, your, name, is, Chris]

执行结果如下,可以看到结果列表中元素个数是比原始列表元素个数要多的,是将各个String类型的字符串句子以空格拆分,将结果加入同一个结果集进行返回。这里flatMap操作的时候其实是先每个元素处理并返回一个新的Stream,然后将多个Stream展开合并为了一个完整的新的Stream

peek和foreach方法

peek和foreach,都可以用于对元素进行遍历然后逐个的进行处理。

但根据前面的介绍,peek属于中间方法,而foreach属于终止方法。这也就意味着peek只能作为管道中途的一个处理步骤,而没法直接执行得到结果,其后面必须还要有其它终止操作的时候才会被执行;而foreach作为无返回值的终止方法,则可以直接执行相关操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

public void peekAndforeach() {
List<String> sentences = Arrays.asList("hello world","your name is Chris");
// 仅peek操作,最终不会执行
System.out.println("----before peek----");
sentences.stream().peek(sentence -> System.out.println(sentence));
System.out.println("----after peek----");
// 仅foreach操作,最终会执行
System.out.println("----before foreach----");
sentences.stream().forEach(sentence -> System.out.println(sentence));
System.out.println("----after foreach----");
// peek操作后面增加终止操作,peek会执行
System.out.println("----before peek and count----");
sentences.stream().peek(sentence -> System.out.println(sentence)).count();
System.out.println("----after peek and count----");
}
1
2
3
4
5
6
7
8
9
10
----before peek----
----after peek----
----before foreach----
hello world
your name is Chris
----after foreach----
----before peek and count----
hello world
your name is Chris
----after peek and count----

输出结果可以看出,peek独自调用时并没有被执行、但peek后面加上终止操作之后便可以被执行,而foreach可以直接被执行。

filter、sorted、distinct、limit

这几个都是常用的Stream的中间操作方法,具体的方法的含义在上面的表格里面有说明。具体使用的时候,可以根据需要选择一个或者多个进行组合使用,或者同时使用多个相同方法的组合

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Student {
Integer id;
String name;
Integer age;

public Student(Integer id) {
this.id = id;
this.name = "学生";
this.age = 20;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14

public void testGetTargetStudents() {
List<String> ids = Arrays.asList("111","222","333","444","555","666","777", "888");
// 使用流操作
List<Student> results = ids.stream()
.filter(s -> s.length() > 2)
.distinct()
.map(Integer::valueOf)
.sorted(Comparator.comparingInt(o -> o))
.limit(3)
.map(id -> new Student(id))
.collect(Collectors.toList());
System.out.println(results);
}
1
[Student(id=111, name=学生, age=20), Student(id=222, name=学生, age=20), Student(id=333, name=学生, age=20)]

上面的代码片段的处理逻辑很清晰:

  1. 使用filter过滤掉不符合条件的数据
  2. 通过distinct对存量元素进行去重操作
  3. 通过map操作将字符串转成整数类型
  4. 借助sorted指定按照数字大小正序排列
  5. 使用limit截取排在前3位的元素
  6. 又一次使用map将id转为Student对象类型
  7. 使用collect终止操作将最终处理后的数据收集到list

Optional.ofNullable()方法

**Optional.ofNullable()**是可以避免空指针异常的对入参进行封装的方法,举个例子:

1
2
3
4
public static void main(String[] args) {
List<String> list = null;
list.forEach(c -> System.out.println(c));
}

工作中经常会遇到,查询返回空,如果没有判空处理,一不小心就会空指针异常。加上if判断处理也可以,但是Java8更优雅的处理方式

1
2
3
4
5
public static void main(String[] args) {
List<String> list = null;
List<String> newList = Optional.ofNullable(list).orElse(Lists.newArrayList());
newList.forEach(c -> System.out.println(c));
}

代码含义:

  • 如果list集合不为空,将list集合赋值给newList
  • 如果list集合为空创建一个空对象集合赋值给newList,保证list集合永远不为空,也就避免了空指针异常

阅读源码查看如何实现的避免空指针异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//静态变量 empty
private static final Optional<?> EMPTY = new Optional<>();

//如果对象为空,执行empty()方法;不为空,执行of(value)方法
public static <T> Optional<T> ofNullable(T value) {
return value == null ? empty() : of(value);
}

public static<T> Optional<T> empty() {
@SuppressWarnings("unchecked")
Optional<T> t = (Optional<T>) EMPTY;
return t;
}

public static <T> Optional<T> of(T value) {
return new Optional<>(value);
}
  1. 首先执行ofNullable()方法,如果T对象为空,执行empty()方法不为空,执行of(value)方法
  2. empty()方法初始化一个空对象Optional(空对象和null不是一回事);
  3. of(value)方法将泛型对象T用于Optional构造方法的参数上,返回一个有值的对象
  4. 经过上面两步,从而保证了Optional不为null,避免了空指针

orElse、orElseGet、orElseThrow

orElse和orElseGet的用法如下所示,相当于value值为null时,给予一个默认值:

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void test() {
User user = null;
user = Optional.ofNullable(user).orElse(createUser());
user = Optional.ofNullable(user).orElseGet(() -> createUser());

}
public User createUser(){
User user = new User();
user.setName("zhangsan");
return user;
}

这两个函数的区别:

  • user值不为null时orElse函数依然会执行createUser()方法
  • orElseGet函数并不会执行createUser()方法

至于orElseThrow,就是value值为null时,直接抛一个异常出去,用法如下所示:

1
2
User user = null;
Optional.ofNullable(user).orElseThrow(()->new Exception("用户不存在"));

简单结果终止方法

按照前面介绍的,终止方法里面像count、max、min、findAny、findFirst、anyMatch、allMatch、nonneMatch等方法,均属于这里说的简单结果终止方法。所谓简单,指的是其结果形式是数字、布尔值或者Optional对象值等。

1
2
3
4
5
6
7
8
9
10
11
12

public void testSimpleStopOptions() {
List<String> ids = Arrays.asList("111","222","333","444","555","666","777", "888");
// 统计stream操作后剩余的元素个数
System.out.println(ids.stream().filter(s -> s.length() > 2).count());
// 判断是否有元素值等于222
System.out.println(ids.stream().filter(s -> s.length() > 2).anyMatch("222"::equals));
// findFirst操作
ids.stream().filter(s -> s.length() > 2)
.findFirst()
.ifPresent(s -> System.out.println("findFirst:" + s));
}
1
2
3
8
true
findFirst:111

避坑提醒

这里需要补充提醒下,一旦一个Stream被执行了终止操作之后,后续便不可以再读这个流执行其他的操作了,否则会报错,看下面示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public void testHandleStreamAfterClosed() {
List<String> ids = Arrays.asList("111","222","333","444","555","666","777", "888");
Stream<String> stream = ids.stream().filter(s -> s.length() > 2);
// 统计stream操作后剩余的元素个数
System.out.println(stream.count());
System.out.println("-----下面会报错-----");
// 判断是否有元素值等于222
try {
System.out.println(stream.anyMatch("222"::equals));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("-----上面会报错-----");
}
1
2
3
4
5
6
7
8
9

8
-----下面会报错-----
-----上面会报错-----
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449)
at JavaKnowledge.StreamStudy.StreamFunction.testHandleStreamAfterClosed(StreamFunction.java:84)
at JavaKnowledge.StreamStudy.StreamFunction.main(StreamFunction.java:20)

因为stream已经被执行count()终止方法了,所以对stream再执行anyMatch方法的时候,就会报错stream has already been operated upon or closed,这一点在使用的时候需要特别注意。

结果收集终止方法

因为Stream主要用于对集合数据的处理场景,所以除了上面几种获取简单结果的终止方法之外,更多的场景是获取一个集合类的结果对象,比如List、Set或者HashMap等

这里就需要collect方法了,它可以支持生成如下类型的结果数据

  • 一个集合类,比如List、Set或者HashMap等
  • StringBuilder对象,支持将多个字符串进行拼接处理并输出拼接后结果
  • 一个可以记录个数或者计算总和的对象(数据批量运算统计)
生成集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void testCollectStopOptions() {
List<Student> ids = Arrays.asList(new Student(17), new Student(22), new Student(23));
// collect成list
List<Student> collectList = ids.stream().filter(dept -> dept.getId() > 20)
.collect(Collectors.toList());
System.out.println("collectList:" + collectList);
// collect成Set
Set<Student> collectSet = ids.stream().filter(dept -> dept.getId() > 20)
.collect(Collectors.toSet());
System.out.println("collectSet:" + collectSet);
// collect成HashMap,key为id,value为Student对象
Map<Integer, Student> collectMap = ids.stream().filter(dept -> dept.getId() > 20)
.collect(Collectors.toMap(Student::getId, dept -> dept));
System.out.println("collectMap:" + collectMap);
}
1
2
3
collectList:[Student(id=22, name=学生, age=20), Student(id=23, name=学生, age=20)]
collectSet:[Student(id=22, name=学生, age=20), Student(id=23, name=学生, age=20)]
collectMap:{22=Student(id=22, name=学生, age=20), 23=Student(id=23, name=学生, age=20)}
生成拼接字符串

将一个List或者数组中的值拼接到一个字符串里并以逗号分隔开,这个场景在日常使用中很常见,如果通过for循环和StringBuilder去循环拼接,还得考虑下最后一个逗号如何处理的问题,很繁琐:

1
2
3
4
5
6
7
8
9
10
public void testForJoinStrings() {
List<String> ids = Arrays.asList("111","222","333","444","555","666","777", "888");
StringBuilder builder = new StringBuilder();
for (String id : ids) {
builder.append(id).append(',');
}
// 去掉末尾多拼接的逗号
builder.deleteCharAt(builder.length() - 1);
System.out.println("拼接后:" + builder.toString());
}
1
拼接后:111,222,333,444,555,666,777,888

Java8提供的Stream使用collect可以轻而易举的实现,代码风格相对优雅:

1
2
3
4
5
public void testCollectJoinStrings() {
List<String> ids = Arrays.asList("111","222","333","444","555","666","777", "888");
String joinResult = ids.stream().collect(Collectors.joining(","));
System.out.println("拼接后:" + joinResult);
}
1
拼接后:111,222,333,444,555,666,777,888
数据批量数学运算

使用collect生成数字数据的总和信息

1
2
3
4
5
6
7
8
9
public void testNumberCalculate() {
List<Integer> ids = Arrays.asList(10, 20, 30, 40, 50);
// 计算平均值
Double average = ids.stream().collect(Collectors.averagingInt(value -> value));
System.out.println("平均值:" + average);
// 数据统计信息
IntSummaryStatistics summary = ids.stream().collect(Collectors.summarizingInt(value -> value));
System.out.println("数据统计信息: " + summary);
}
1
2
平均值:30.0
数据统计信息: IntSummaryStatistics{count=5, sum=150, min=10, average=30.000000, max=50}

并行Stream

机制说明

使用并行流,可以有效利用计算机的多CPU硬件,提升逻辑的执行速度。并行流通过将一整个stream划分为****多个片段,然后对各个分片流并行执行处理逻辑,最后将各个分片流的执行结果汇总为一个整体流

约束与限制

并行流类似于多线程在并行处理,所以与多线程场景相关的一些问题同样会存在,比如死锁等问题,所以在并行流终止执行的函数逻辑,必须要保证线程安全

Stream相较于传统的foreach的方式处理stream,有什么优势?

优点:

  • 代码更简洁:偏声明式的编码风格,更容易体现出代码的逻辑意图
  • 逻辑间解耦:一个stream中间处理逻辑,无需关注上游与下游的内容,只需要按约定实现自身逻辑即可
  • 并行流场景效率会比迭代器逐个循环更高
  • 函数式接口,延迟执行的特性,中间管道操作不管有多少步骤都不会立即执行,只有遇到终止操作的时候才会开始执行,可以避免一些中间不必要的操作消耗

缺点:

  • 代码调测debug不便
  • 程序员从历史写法切换到Stream时,需要一定的适应时间

Java时间类

Year

Java 8中,Year类是一个不可变、线程安全的类,用于表示和操作年份。它主要用于处理与年份相关的日期和时间操作,例如获取当前年份、创建指定年份的Date对象等。

核心方法

**now()**:该方法返回表示当前日期的Year对象。该方法使用系统默认的时区来获取当前年份。

**now(Clock clock)**:该方法返回表示指定时间戳的Year对象。该方法使用指定的Clock对象来获取当前年份。

**of(int year)**:该方法用于创建一个表示指定年份的Year对象。如果输入的年份参数不在范围内(公元1年-公元9999年),则会抛出IllegalArgumentException异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void test1() {
// 使用now()方法获取当前年份
Year year = Year.now();
int currentYear = year.getValue(); // 获取年份的值
System.out.println("当前年份:" + currentYear);
// 使用now(Clock clock)方法获取指定时间戳的年份
Clock clock = Clock.systemDefaultZone();
Year yearFromClock = Year.now(clock);
int yearFromClockValue = yearFromClock.getValue(); // 获取年份的值
System.out.println("指定时间戳的年份:" + yearFromClockValue);
// 使用of()方法创建指定年份的Year对象
Year customYear = Year.of(2024);
int customYearValue = customYear.getValue(); // 获取年份的值
System.out.println("自定义年份:" + customYearValue);
}
1
2
3
当前年份:2024
指定时间戳的年份:2024
自定义年份:2024

YearMonth

YearMonth类代表一个特定的年和月,可以表示任何合法的年和月组合,例如2020-02。它主要用于处理与年和月相关的日期和时间操作。

核心方法

**now()**:获取当前年份和月份。例如,YearMonth.now()将返回当前的年和月。

**String format(DateTimeFormatter formatter)**:使用特定的格式格式化year-month。

**int get(TemporalField field)**:返回日期中特定域的int类型

**of(int year, int month)**:创建一个表示特定年和月的YearMonth对象。例如,YearMonth.of(2024, 1)将创建一个表示2024年1月的YearMonth对象。

**lengthOfMonth()**:返回当前YearMonth实例有多少天。例如,YearMonth.of(2020, 2).lengthOfMonth()将返回29,因为2020年是闰年,2月有29天。

1
2
3
4
5
6
7
8
public void test2() {
// 获取当前年和月
YearMonth currentYearMonth = YearMonth.now();
System.out.printf("这个月的年月 %s 有 %d 天 %n", currentYearMonth, currentYearMonth.lengthOfMonth());
// 创建一个表示特定年和月的YearMonth对象
YearMonth creditCardExpiry = YearMonth.of(2025, Month.JULY);
System.out.printf("你输入的年月是 %s %n", creditCardExpiry);
}
1
2
这个月的年月 2024-0131
你输入的年月是 2025-07

这个示例中,YearMonth.now()方法获取了当前的年和月,并使用lengthOfMonth()方法获取了当前年月的天数。另外,YearMonth.of(2025, Month.JULY)方法创建了一个表示2025年7月的YearMonth对象,并输出了该对象。

格式化为字符串

1
2
3
4
5
6
7
8
public void test3(){
YearMonth ym = YearMonth.now();
String s = ym.format(DateTimeFormatter.ofPattern("MM yyyy"));
System.out.println(s);
YearMonth start = YearMonth.parse("2023-01-01", DateTimeFormatter.ofPattern("yyyy-MM-dd"));
boolean flag = ym.isAfter(start);
System.out.println(ym + "比" + start + "时间更晚吗?" + flag);
}
1
2
01 2024
2024-012023-01时间更晚吗?true

获取特定域

1
2
3
4
5
6
7
public void test4(){
YearMonth y = YearMonth.now();
long l1 = y.get(ChronoField.YEAR);
System.out.println(l1);
long l2 = y.get(ChronoField.MONTH_OF_YEAR);
System.out.println(l2);
}
1
2
2024
1

增加月份

1
2
3
4
5
6
public void test5(){
YearMonth ym1 = YearMonth.now();
System.out.println(ym1);
YearMonth ym2 = ym1.plus(Period.ofYears(2));
System.out.println(ym1 + "加两年后为" + ym2);
}
1
2
2024-01
2024-01加两年后为2026-01

YearMonth与LocalDate的转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 转换YearMonth为LocalDate
*/
public static LocalDate parseDateYearMonth(YearMonth yearMonth){
return LocalDate.of(yearMonth.getYear(),yearMonth.getMonthValue(),1);
}

/**
* 转换LocalDate为YearMonth
*/
public static YearMonth parseYearMonth(LocalDate localDate){
return YearMonth.of(localDate.getYear(),localDate.getMonthValue());
}

public void test6(){
YearMonth curYM = YearMonth.now();
System.out.println(curYM);
LocalDate curYMlocalDate = parseDateYearMonth(curYM);
System.out.println("LocalDate类型的当前年月为" + curYMlocalDate);
YearMonth curYearMonth = parseYearMonth(curYMlocalDate);
System.out.println("YearMonth类型的当前年月为" + curYearMonth);
}
1
2
3
2024-01
LocalDate类型的当前年月为2024-01-01
YearMonth类型的当前年月为2024-01

Java比较时间差几年/几个月

1
2
3
4
5
6
7
8
9
10
11
12
public static void test7() {
YearMonth yearMonthFirst = YearMonth.of(2022, 6);
YearMonth yearMonthSecond = YearMonth.of(2022, 1);
//当前年月
YearMonth yearMonthNow = YearMonth.now();
int FirstToSecond = yearMonthFirst.compareTo(yearMonthSecond);
int FirstToNow = yearMonthFirst.compareTo(yearMonthNow);
int NowToFirst = yearMonthNow.compareTo(yearMonthFirst);
System.out.println(FirstToSecond);
System.out.println(FirstToNow);
System.out.println(NowToFirst);
}
1
2
3
5
-2
2

MonthDay

在Java 8中,MonthDay是一个非常实用的类,它用于处理只有月和日的信息,而没有年和其他时间信息的情况。这可以用于处理生日、纪念日和星座等周期性问题。

核心方法

of(int month, int day): 创建一个表示特定月日的MonthDay对象。

from(LocalDate): 从给定的LocalDate对象中提取月和日的信息,创建一个新的MonthDay对象。

equals(Object): 比较两个MonthDay对象是否相等。

这些方法的具体使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void test8() {
// 创建一个表示6月18日的MonthDay对象
MonthDay monthDay = MonthDay.of(1, 15);
// 从当前日期创建一个MonthDay对象
LocalDate localDate = LocalDate.now();
MonthDay day = MonthDay.from(localDate);
// 检查两个MonthDay对象是否相等
if (monthDay.equals(day)) {
System.out.println("两个MonthDay对象相等");
} else {
System.out.println("两个MonthDay对象不相等");
}
LocalDate nowDate = LocalDate.now();
LocalDate date = LocalDate.of(2024, Month.JANUARY, 1);
long days = ChronoUnit.DAYS.between(date, nowDate);
System.out.println(nowDate + "距离" + date + "有" + days + "天");
}
1
2
两个MonthDay对象相等
2024-01-15距离2024-01-0114

此外,你还可以使用MonthDay对象来计算两个日期之间相差的天数、月数或年数。例如,要计算两个给定的日期之间包含多少天,多少周或者多少年,可以使用**ChronoUnit.DAYS.between()ChronoUnit.WEEKS.between()ChronoUnit.YEARS.between()**等方法。

DayOfWeek

Java 8中的DayOfWeek是一个不可变的、线程安全的枚举,表示一周中的一天,如MONDAY、TUESDAY等,适用于需要表示一周中某一天的场景。除了日期名称,DayOfWeek也有一个数字值。可以使用数字值来获取日期名称,也可以通过日期名称来获取数字值。

核心方法

**getValue()**:获取数字值(1-7)。

**toString()**:获取日期名称(如MONDAY、TUESDAY等)。

**of()**:通过数字值创建DayOfWeek对象。

1
2
3
4
5
6
7
8
9
10
11
public void test9() {
// 设置变量为周一
DayOfWeek dayOfWeek = DayOfWeek.MONDAY;
int dayNumber = dayOfWeek.getValue(); // 获取数字值(1-7)
System.out.println(dayNumber);//输出结果:1
dayOfWeek = DayOfWeek.MONDAY;
String dayName = dayOfWeek.toString(); // 获取日期名称(如MONDAY、TUESDAY等)
System.out.println(dayName);//输出结果:MONDAY
DayOfWeek dayOfWeekFromNumber = DayOfWeek.of(1); // 通过数字值创建DayOfWeek对象
System.out.println(dayOfWeekFromNumber);//输出结果:MONDAY
}
1
2
3
1
MONDAY
MONDAY

总结

Year、YearMonth、MonthDay、DayOfWeek是Java 8中新增的日期时间API的一部分,它们提供了更灵活和强大的日期和时间处理能力。这些类都是不可变的,意味着一旦创建了对象,其值就不能更改。这种设计使得对这些对象的并发操作更加安全和高效。同时,这些类都实现了Comparable接口,可以根据时间顺序进行比较。

ConcurrentHashMap和LongAdder()的配合使用

使用 ConcurrentHashMap 来统计,Key 的范围是 10。使用最多 10 个并发,循环操作 1000 万次,每次操作累加随机的 Key。如果 Key 不存在的话,首次设置值为 1。normaluse()方法直接使用synchronized加锁的方式保证线程安全,锁的粒度很大,虽然能够保证完成业务需求,且线程安全,但性能低下gooduse()方法使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断 Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为 Value,也就是新创建一个 LongAdder 对象,最后返回 Value。由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ConcurrentHashMapTest {

//循环次数
private static int LOOP_COUNT = 10000000;
//线程数量
private static int THREAD_COUNT = 10;
//元素数量
private static int ITEM_COUNT = 10;

/**
* 一共循环10000000次 10个线程共同完成这些次循环 一个线程最多创建10个Key
* @return
* @throws InterruptedException
*/
private Map<String, Long> normaluse() throws InterruptedException {
ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
//获得一个随机的Key
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
synchronized (freqs) {
if (freqs.containsKey(key)) {
//Key存在则+1
freqs.put(key, freqs.get(key) + 1);
} else {
//Key不存在则初始化为1
freqs.put(key, 1L);
}
}
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
return freqs;
}

private Map<String, Long> gooduse() throws InterruptedException {
ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
// 利用computeIfAbsent()方法来实例化LongAdder,然后利用LongAdder来进行线程安全计数
// 存在类型为LongAdder的value则累加 不存在则新建
freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
}
));
forkJoinPool.shutdown();
forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
// 因为我们的Value是LongAdder而不是Long,所以需要做一次转换才能返回
return freqs.entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey(),
e -> e.getValue().longValue())
);
}

public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis(); // 获取当前时间的毫秒数
ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest();
// concurrentHashMapTest.normaluse();
concurrentHashMapTest.gooduse();
long endTime = System.currentTimeMillis();
long durationInMilli = endTime - startTime;

System.out.println("方法执行时间(毫秒):" + durationInMilli);
}
}

normaluse()方法执行用时:

1
方法执行时间(毫秒):1822

gooduse()方法执行用时:

1
方法执行时间(毫秒):428

优化后的代码,相比使用锁来操作 ConcurrentHashMap 的方式,性能提升了 4 倍。Java 自带的 Unsafe 实现的 CAS,在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多。

CopyOnWriteArrayList的合理使用

在 Java 中,CopyOnWriteArrayList 虽然是一个线程安全的 ArrayList,但其实现方式是,每次修改数据时都会复制一份数据出来,所以仅适用于读多写少或者说希望无锁读的场景。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会十分低下。下面我们写一个demo比较一下使用 CopyOnWriteArrayList 和使用普通加锁方式的 ArrayList 的读写性能,在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一定次数的写或读操作的耗时。

pom文件中加入依赖,才可以使用统计耗时的StopWatch工具类:

1
2
3
4
5
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.3.19</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

import org.springframework.util.StopWatch;

import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class CopyOnWriteTest {

//测试并发写的性能
public Map testWrite() {
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
// ArrayList 不是线程安全,可以通过Collections.synchronizedList(new ArrayList())生成线程安全的list
// 该方法的本质是:在对原始ArrayList操作时,都会增加关键字synchronized 保证线程安全。
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
StopWatch stopWatch = new StopWatch();
// 循环次数100万
int loopCount = 100000;
stopWatch.start("写操作:copyOnWriteArrayList");
// 循环100万次并发往CopyOnWriteArrayList写入随机元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(c -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
stopWatch.start("写操作:synchronizedList");
// 循环100万次并发往加锁的ArrayList写入随机元素
IntStream.rangeClosed(1, loopCount).parallel().forEach(c -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
stopWatch.stop();
// 输出代码执行耗时,以及执行时间百分比
System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

// 帮助方法用来填充List
private void addAll(List<Integer> list) {
list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}

// 测试并发读的性能
public Map testRead() {
// 创建两个测试对象
List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
// 填充数据
addAll(copyOnWriteArrayList);
addAll(synchronizedList);
StopWatch stopWatch = new StopWatch();
int loopCount = 1000000;
int count = copyOnWriteArrayList.size();
// stopWatch 是 StopWatch 类的一个实例,用于测量代码块的执行时间。
// start("Read:copyOnWriteArrayList") 方法用于开始计时,并为当前计时任务命名为 "读操作:copyOnWriteArrayList",方便后续统计和区分不同的操作耗时情况。
stopWatch.start("读操作:copyOnWriteArrayList");
// 循环100万次并发从CopyOnWriteArrayList随机查询元素
// ThreadLocalRandom.current() 返回当前线程的 ThreadLocalRandom 实例,用于生成随机数。每个线程都有自己独立的 ThreadLocalRandom 实例,避免了多线程并发时的竞争。
// nextInt(100) 是 ThreadLocalRandom 类中的方法,用于生成一个小于指定上限(不包括上限值)的随机整数
// 在这里,参数 100 指定了随机数的上限为 100,即生成的随机数范围是 [0, 100)。
IntStream.rangeClosed(1, loopCount).parallel().forEach(c -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
stopWatch.start("读操作:synchronizedList");
//循环1000000次并发从加锁的ArrayList随机查询元素
IntStream.range(1, loopCount).parallel().forEach(c -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
stopWatch.stop();
System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint());
Map result = new HashMap();
result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
result.put("synchronizedList", synchronizedList.size());
return result;
}

public static void main(String[] args) {
CopyOnWriteTest copyOnWriteTest = new CopyOnWriteTest();
Map map = copyOnWriteTest.testWrite();
Map map1 = copyOnWriteTest.testRead();
System.out.println(map.toString());
System.out.println(map1.toString());
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
输出代码执行耗时,以及执行时间百分比:StopWatch '': running time = 2943391700 ns
---------------------------------------------
ns % Task name
---------------------------------------------
2932055700 100% 写操作:copyOnWriteArrayList
011336000 000% 写操作:synchronizedList

输出代码执行耗时,以及执行时间百分比:StopWatch '': running time = 122014900 ns
---------------------------------------------
ns % Task name
---------------------------------------------
015497400 013% 读操作:copyOnWriteArrayList
106517500 087% 读操作:synchronizedList

{copyOnWriteArrayList=100000, synchronizedList=100000}
{copyOnWriteArrayList=1000000, synchronizedList=1000000}

从实践结果可以看出,大量写的场景(10 万次 add 操作),CopyOnWriteArray 比 ArrayList 慢一百倍不止,而读操作CopyOnWriteArray性能优于 ArrayList,是ArrayList性能的6.7倍。原因在于CopyOnWriteArrayList 中的 add 方法,在每次 add 时,都会用 Arrays.copyOf 创建一个新数组,频繁 add 时内存的申请释放消耗会很大:

1
2
3
4
5
6
7
8
9
10
public boolean add(E e) {
synchronized (lock) {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
}
}

综上所述,使用 CopyOnWriteArrayList 要注意使用场景是否为读多写少的场景或者说希望无锁读的场景

@Data使用注意事项

将@Data注解加在类上可以帮助我们根据类中的字段自动生成get()、set()、equals() 和 hashcode()方法,使用@Data注解需要导入lombok依赖:

1
2
3
4
5
6
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
<scope>provided</scope>
</dependency>

我们实践一下这个注解是否自动生成equals() 和 hashcode()方法,并能正确判断两个对象是否相同,创建一个Person类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import lombok.Data;

/**
* @author 不吃辣的Chris
* @create 2024-03-13-21:06
*/
@Data
class Person {
private String name;
private String identity;

public Person(String name, String identity) {
this.name = name;
this.identity = identity;
}
}
1
2
3
4
5
6
7
8
9
public class DataTest {

public static void main(String[] args) {
Person person1 = new Person("Chris","123456");
Person person2 = new Person("Marry","123456");
System.out.println(person1.equals(person2));
}

}
1
false

由于两个对象的name字段的值不同,所以两个对象不相同,name改成一致的,再测试:

1
2
3
4
5
6
7
8
9
public class DataTest {

public static void main(String[] args) {
Person person1 = new Person("Chris","123456");
Person person2 = new Person("Chris","123456");
System.out.println(person1.equals(person2));
}

}
1
true

此时两个name和identity两个字段都相同,返回true,符合我们的预期,已实践验证。如果你希望只要identity字段值一致就认为是同一个Person的话,可以使用 @EqualsAndHashCode.Exclude 注解来修饰 name 字段,从 equals 和 hashCode 的实现中排除 name 字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* @author 不吃辣的Chris
* @create 2024-03-13-21:06
*/
@Data
class Person {
@EqualsAndHashCode.Exclude
private String name;

private String identity;

public Person(String name, String identity) {
this.name = name;
this.identity = identity;
}
}
1
2
3
4
5
6
7
8
9
public class DataTest {

public static void main(String[] args) {
Person person1 = new Person("Chris","123456");
Person person2 = new Person("Marry","123456");
System.out.println(person1.equals(person2));
}

}
1
true

实践验证了此时只要identity值相同,则两个对象相同,在equals 和 hashCode 的实现中排除了name字段。

如果类之间有继承关系上述实践还会生效吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* @author 不吃辣的Chris
* @create 2024-03-13-21:06
*/
@Data
class Person {
@EqualsAndHashCode.Exclude
private String name;

private String identity;

public Person(String name, String identity) {
this.name = name;
this.identity = identity;
}

public Person() {

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import lombok.Data;

/**
* @author 不吃辣的Chris
* @create 2024-03-13-21:20
*/
@Data
class Employee extends Person {

private String company;
public Employee(String name, String identity, String company) {
super(name, identity);
this.company = company;
}
}
1
2
3
4
5
6
7
8
9
public class DataTest {

public static void main(String[] args) {
Employee employee1 = new Employee("Chris","123456", "sky");
Employee employee2 = new Employee("Marry","666666", "sky");
System.out.println(employee1.equals(employee2));
}

}
1
true

实践得知,子类Employee在equals 和 hashCode 的实现中默认只考虑了自身特有的字段company,父类Person的identity字段没有考虑在内(name刚才已经通过注解排除了),为解决这个问题,我们可以手动设置@EqualsAndHashCode注解的配置 callSuper 开关为 true,来修改这种默认行为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import lombok.Data;
import lombok.EqualsAndHashCode;

/**
* @author 不吃辣的Chris
* @create 2024-03-13-21:20
*/
@Data
@EqualsAndHashCode(callSuper = true)
class Employee extends Person {

private String company;
public Employee(String name, String identity, String company) {
super(name, identity);
this.company = company;
}
}
1
2
3
4
5
6
7
8
9
public class DataTest {

public static void main(String[] args) {
Employee employee1 = new Employee("Chris","123456", "sky");
Employee employee2 = new Employee("Marry","666666", "sky");
System.out.println(employee1.equals(employee2));
}

}
1
false

修改后的代码,实现了同时以子类的属性 company 加上父类中的属性 identity,作为 equals 和 hashCode 方法的实现条件(实现上其实是调用了父类的 equals 和 hashCode)。

实现排序的 Comparable 和 Comparator

  • Comparable 接口有一个 compareTo(Object obj)方法用来排序
  • Comparator 接口有一个compare(Object obj1, Object obj2)方法用来排序

CompletableFuture使用实践

CompletableFuture 和 Future使用实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Task {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;

public class TaskService {

public User getUserInfo(Long userId) throws InterruptedException {
// 模拟执行耗时
Thread.sleep(200);
// 模拟查询数据库中一条用户数据
return new User(1L, "Chris");
}

public Task getTaskInfo(Long taskId) throws InterruptedException {
// 模拟执行耗时
Thread.sleep(300);
// 模拟查询数据库中一条任务数据
return new Task(1L, "学习");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;

import org.example.ideaspringboottest.JavaTest.DesensitizationTest.UserInfo;

import java.util.concurrent.*;

public class FutureTest {

public static void main(String[] args) throws ExecutionException, InterruptedException, ExecutionException {

ExecutorService executorService = Executors.newFixedThreadPool(10);
TaskService taskService = new TaskService();
Long userId =1L;
Long taskId =1L;
Long startTime = System.currentTimeMillis();

// 调用用户服务获取用户基本信息
FutureTask<User> userInfoFutureTask = new FutureTask<>(new Callable<User>() {
@Override
public User call() throws Exception {
return taskService.getUserInfo(userId);
}
});
executorService.submit(userInfoFutureTask);
// 模拟主线程其它操作耗时
// Thread.sleep(200);

FutureTask<Task> taskInfoFutureTask = new FutureTask<>(new Callable<Task>() {
@Override
public Task call() throws Exception {
return taskService.getTaskInfo(taskId);
}
});
executorService.submit(taskInfoFutureTask);
// 获取用户信息
User user = userInfoFutureTask.get();
// 获取任务信息
Task task = taskInfoFutureTask.get();

System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");
}

}
1
总共用时304ms
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CompletableFutureTest {

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException{
TaskService taskService = new TaskService();
Long userId =1L;
Long taskId =1L;
Long startTime = System.currentTimeMillis();

// 获取用户信息
CompletableFuture<User> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> {
try {
return taskService.getUserInfo(userId);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

});

// 模拟主线程其它操作耗时
// Thread.sleep(200);

CompletableFuture<Task> completableTaskInfoFuture = CompletableFuture.supplyAsync(() -> {
try {
return taskService.getTaskInfo(taskId);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});

User user = completableUserInfoFuture.get(2, TimeUnit.SECONDS);
Task task = completableTaskInfoFuture.get();
System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms");

}

}
1
总共用时306ms

CompletableFuture 和 Future 的区别

  1. 功能和灵活性

Future:

java.util.concurrent.Future 是 Java 5 引入的接口,用于表示一个异步操作的未来结果。它提供了基本的异步操作支持,可以用于检查是否完成、等待结果以及获取结果,但在处理结果、异常和组合等方面功能有限

CompletableFuture:

java.util.concurrent.CompletableFuture 是 Java 8 引入的类,扩展了 Future 的功能,提供了更强大和灵活的异步编程支持。它允许更容易地处理异步操作的结果、异常和组合,以及在操作完成时执行回调等

  1. 异步编程支持

Future:

Future 只能用于异步操作的基本管理,如提交任务和获取结果

CompletableFuture:

CompletableFuture 提供了丰富的操作符、方法和组合功能,可以在异步操作之间进行更复杂的处理,如转换、组合、串联、并行等

  1. 组合和链式操作

Future:

Future 没有提供内置的操作符来对多个异步操作进行链式组合

CompletableFuture:

CompletableFuture 支持在操作完成后进行链式操作,使多个异步操作可以依次执行,以及在其中一个或多个操作完成后执行其他操作

  1. 异常处理

Future:

Future 的异常处理相对有限,通常需要使用 try-catch 块来捕获操作过程中的异常

CompletableFuture:

CompletableFuture 具有更强大的异常处理机制,可以使用 exceptionally()、handle() 等方法来处理操作过程中的异常

  1. 回调执行

Future:

Future 不支持在操作完成时执行回调操作

CompletableFuture:

CompletableFuture 支持使用 thenApply()、thenCompose()、thenCombine() 等方法来在操作完成后执行回调

综上所述,CompletableFuture 提供了更丰富的异步编程支持和功能,使处理异步操作的结果、异常和组合变得更加灵活和便捷。如果使用的是 Java 8 或更高版本,通常建议使用 CompletableFuture 来实现更高级的异步编程需求。

CompletableFuture常用方法

创建异步任务

CompletableFuture创建异步任务,一般有supplyAsyncrunAsync两个方法

  • supplyAsync执行CompletableFuture任务,支持有返回值
  • runAsync执行CompletableFuture任务,没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class RunAsyncTest {

public static void main(String[] args) {
// 自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
// runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("这是runAsync方法"), executor);
// supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("这是supplyAsync方法");
return "B站:不吃辣的Chris"; }, executor);
// runAsync的future没有返回值 输出null
System.out.println(runFuture.join());
// supplyAsync的future 有返回值
System.out.println(supplyFuture.join());
// 关闭线程池
executor.shutdown();
}
}
1
2
3
4
这是runAsync方法
null
这是supplyAsync方法
B站:不吃辣的Chris

任务异步回调

thenRun/thenRunAsync

CompletableFuture的thenRun方法负责做完第一个任务后,再做第二个任务某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenRunTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第一个CompletableFuture方法任务");
return "B站:不吃辣的Chris";
}
);
CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第二个CompletableFuture方法任务");
});
System.out.println(thenRunFuture.get());
}
}
1
2
3
4
5
当前线程名称:ForkJoinPool.commonPool-worker-1
这是第一个CompletableFuture方法任务
当前线程名称:ForkJoinPool.commonPool-worker-1
这是第二个CompletableFuture方法任务
null

thenRun 和thenRunAsync有什么区别

1
2
3
4
5
6
7
8
9
10
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}

public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}

如果你执行第一个任务的时候,传入了一个自定义线程池:

  • 调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池
  • 调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenRunAsyncTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第一个CompletableFuture方法任务");
return "B站:不吃辣的Chris";
},executor
);
CompletableFuture thenRunFuture = orgFuture.thenRunAsync(() -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第二个CompletableFuture方法任务");
});
System.out.println(thenRunFuture.get());
}
}
1
2
3
4
5
当前线程名称:pool-1-thread-1
这是第一个CompletableFuture方法任务
当前线程名称:ForkJoinPool.commonPool-worker-1
这是第二个CompletableFuture方法任务
null
thenAccept/thenAcceptAsync

CompletableFuture的thenAccept方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenAcceptTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("第一个CompletableFuture方法任务");
return "B站:不吃辣的Chris";
}
);
CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
if ("B站:不吃辣的Chris".equals(a)) {
System.out.println("第一个方法的结果是 " + a + "!");
}

System.out.println("一键三连加关注哦~");
});

System.out.println(thenAcceptFuture.get());
}
}
1
2
3
4
第一个CompletableFuture方法任务
第一个方法的结果是 B站:不吃辣的Chris!
一键三连加关注哦~
null
thenApply/thenApplyAsync

CompletableFuture的thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ThenApplyTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("第一个CompletableFuture方法任务");
return "B站:不吃辣的Chris";
}
);

CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> {
if ("B站:不吃辣的Chris".equals(a)) {
return "第一个方法的结果是 " + a + " ,已一键三连加关注!";
}
return "不是 " + a + " 哦~";
});
System.out.println(thenApplyFuture.get());
}
}
1
2
第一个CompletableFuture方法任务
第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注!
exceptionally

CompletableFuture的exceptionally方法表示,某个任务执行异常时,执行的回调方法,并且有抛出异常作为参数,传递到回调方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class ExceptionallyTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
throw new RuntimeException();
}
);
CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
e.printStackTrace();
return "出现异常,请及时处理!";
});
System.out.println(exceptionFuture.get());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
当前线程名称:ForkJoinPool.commonPool-worker-1
出现异常,请及时处理!
java.util.concurrent.CompletionException: java.lang.RuntimeException
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.RuntimeException
at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.ExceptionallyTest.lambda$main$0(ExceptionallyTest.java:12)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 6 more
whenComplete方法

CompletableFuture的whenComplete方法表示,某个任务执行完成后,执行的回调方法,****无返回值;并且whenComplete方法返回的CompletableFuture的result是****上个任务的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class WhenCompleteTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第一个CompletableFuture任务");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "B站:不吃辣的Chris";
}
);
CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("第一个任务执行结束,执行结果为 " + a + " ");
if ("B站:不吃辣的Chris".equals(a)) {
System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!");
}
System.out.println("耶~");
});
System.out.println(rstFuture.get());
}
}
1
2
3
4
5
6
7
当前线程名称:ForkJoinPool.commonPool-worker-1
这是第一个CompletableFuture任务
当前线程名称:ForkJoinPool.commonPool-worker-1
第一个任务执行结束,执行结果为 B站:不吃辣的Chris
第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注!
耶~
B站:不吃辣的Chris
handle方法

CompletableFuture的handle方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的,并且handle方法返回的CompletableFuture的result是****回调方法执行的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class HandleTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
System.out.println("这是第一个CompletableFuture任务");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "B站:不吃辣的Chris";
}
);
CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {
System.out.println("第一个任务执行结束,执行结果为 " + a + " ");
if ("B站:不吃辣的Chris".equals(a)) {
System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!");
return "已关注";
}
System.out.println("耶~");
return null;
});
System.out.println(rstFuture.get());
}
}
1
2
3
4
5
当前线程名称:ForkJoinPool.commonPool-worker-1
这是第一个CompletableFuture任务
第一个任务执行结束,执行结果为 B站:不吃辣的Chris
第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注!
已关注

任务组合处理

AND组合处理

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务

区别在于:

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth不会把执行结果当做方法入参,且没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.*;

public class ThenCombineAsyncTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
CompletableFuture<String> first = CompletableFuture.completedFuture("这是第一个CompletableFuture任务");
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture
//第二个异步任务
.supplyAsync(() -> "这是第二个CompletableFuture任务", executor)
// 执行完前两个任务后的第三个任务
.thenCombineAsync(first, (s, w) -> {
System.out.println(w);
System.out.println(s);
return "这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合";
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
1
2
3
这是第一个CompletableFuture任务
这是第二个CompletableFuture任务
这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合

OR组合处理

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务

区别在于:

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither不会把执行结果当做方法入参,且没有返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class AcceptEitherAsyncTest {
public static void main(String[] args) {
// 第一个异步任务,休眠1秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(1000L);
System.out.println("第一个任务执行结束");}
catch (Exception e){
return "第一个任务异常执行出现了异常";
}
return "任务1结果";
});
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture
// 第二个异步任务
.supplyAsync(() -> {
System.out.println("第二个任务执行结束");
return "任务2结果";}
, executor)
// 第三个任务
.acceptEitherAsync(first, System.out::println, executor);
executor.shutdown();
}
}
1
2
第二个任务执行结束
任务2结果
AllOf

所有任务都执行完成后,才执行 allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AllOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {

CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
System.out.println("执行第一个任务");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("执行第二个任务");
System.out.println(1/0);
System.out.println("第二个任务执行结束");
});
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{
System.out.println("两个任务都执行结束了");
});
System.out.println(allOfFuture.get());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
执行第一个任务
执行第二个任务
两个任务都执行结束了
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396)
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073)
at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.main(AllOfTest.java:20)
Caused by: java.lang.ArithmeticException: / by zero
at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.lambda$main$1(AllOfTest.java:14)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
AnyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class AnyOfTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> a = CompletableFuture.runAsync(()->{
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行第一个任务");
});
CompletableFuture<Void> b = CompletableFuture.runAsync(() -> {
System.out.println("执行第二个任务");
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{
System.out.println("某一个任务执行结束了");
// return "B站:不吃辣的Chris";
});
anyOfFuture.join();
}
}
1
2
执行第二个任务
某一个任务执行结束了
thenCompose

thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法,该方法会返回一个新的CompletableFuture实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThenComposeAsyncTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("执行第一个任务");
// 第二个异步任务
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "执行第二个任务", executor)
.thenComposeAsync(data -> {
System.out.println(data);
// 使用第一个任务作为返回
return completedFuture;
}, executor);
System.out.println(future.join());
executor.shutdown();
}
}
1
2
执行第二个任务
执行第一个任务

Future使用实践

ThreadPoolExecutor的submit()方法

Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。3 个 submit() 方法如下:

1
2
3
4
5
6
// 提交Callable任务
<T> Future<T> submit(Callable<T> task);
// 提交Runnable任务及结果引用
<T> Future<T> submit(Runnable task, T result);
// 提交Runnable任务
Future<?> submit(Runnable task);

这 3 个 submit() 方法之间的区别在于方法参数不同。

  • 提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口,Runnable 接口的 run() 方法是没有返回值的,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了,类似于 Thread.join()。
  • 提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且这个方法是有返回值的,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果
  • 提交 Runnable 任务及结果引用 submit(Runnable task, T result):假设这个方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result

下面示例一下** submit(Runnable task, T result)的使用,Runnable 接口的实现类 Task 需要声明一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据**。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Result {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
public class Task implements Runnable {
Result result;
Task(Result result){ this.result = result; }
public void run() {
Long resultId = result.getId();
System.out.println(resultId);
result.setName("修改后的任务执行结果名称");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class SubmitTest {

public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建Result对象result
Result result = new Result();
result.setId(1L);
result.setName("执行结果名称");
// 提交任务
Future<Result> future = executor.submit(new Task(result), result);
Result result1 = future.get();
System.out.println(result1.getId());
System.out.println(result1.getName());
executor.shutdown();
}
}
1
2
3
1
1
修改后的任务执行结果名称

Future 接口

submit(Runnable task, T result)的使用实践过了,同时还发现上述三个submit方法返回值都是 Future 接口,Future 接口有 5 个方法,分别是取消任务的方法 cancel()判断任务是否已取消的方法 isCancelled()判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒

1
2
3
4
5
6
7
8
9
10
// 取消任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断任务是否已取消
boolean isCancelled();
// 判断任务是否已结束
boolean isDone();
// 获得任务执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit);

FutureTask

FutureTask基本使用

Future 是一个接口,而 FutureTask 是一个工具类,有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。

1
2
3
4
5
6
7
8
9
10
11
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的实践一下将 FutureTask 对象提交给 ThreadPoolExecutor 执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;

public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 10+22);
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 提交FutureTask
executorService.submit(futureTask);
// 获取方法返回值
Integer result = futureTask.get();
executorService.shutdown();
System.out.println(result);
}
}
1
32

实践一下 FutureTask 对象直接被 Thread 执行,利用 FutureTask 对象可以获取子线程的执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskThreadTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(()-> 10+22);
// 创建并启动线程
Thread t1 = new Thread(futureTask);
t1.start();
// 获取方法返回值
Integer result = futureTask.get();
System.out.println(result);
}
}
1
32

实践一个需要保证多个任务执行顺序的场景,例如煮茶这个过程可以拆分为:洗水壶、烧水和泡茶三个任务,在泡茶前需要洗茶壶、洗茶杯和准备茶叶三个任务,而洗水壶、烧水这两个任务可以和洗茶壶、洗茶杯和准备茶叶三个任务并行执行,在洗水壶、烧水这两个任务执行完后等待洗茶壶、洗茶杯和准备茶叶三个任务执行完成,然后就可以泡茶了。首先,我们创建了两个 FutureTask——futureTask1 和 futureTask2,futureTask1 完成洗水壶、烧开水、泡茶的任务,futureTask2 完成洗茶壶、洗茶杯、准备茶叶的任务;这里需要注意的是 futureTask1 这个任务在执行泡茶任务前,需要等待 futureTask2 把茶叶拿来,所以 futureTask1 内部需要引用 futureTask2,并在执行泡茶之前,调用 futureTask2 的 get() 方法实现等待。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;

// T1Task需要执行的任务:
// 洗水壶、烧开水、泡茶
class T1Task implements Callable<String> {
FutureTask<String> futureTask2;
// T1任务需要T2任务的FutureTask
T1Task(FutureTask<String> futureTask2){
this.futureTask2 = futureTask2;
}
@Override
public String call() throws Exception {
System.out.println("T1:洗水壶...");
TimeUnit.SECONDS.sleep(1);

System.out.println("T1:烧开水...");
TimeUnit.SECONDS.sleep(2);
// 获取T2线程的茶叶
String futureTask2Result = futureTask2.get();
System.out.println("T1:准备茶叶:"+futureTask2Result);

System.out.println("T1:泡茶...");
return "上茶:" + futureTask2Result;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

// T2Task需要执行的任务:
// 洗茶壶、洗茶杯、拿茶叶
class T2Task implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("T2:洗茶壶...");
TimeUnit.SECONDS.sleep(1);

System.out.println("T2:洗茶杯...");
TimeUnit.SECONDS.sleep(2);

System.out.println("T2:准备茶叶...");
TimeUnit.SECONDS.sleep(5);
return "西湖龙井";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class FutureTaskTea {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建任务T2的FutureTask
FutureTask<String> futureTask2 = new FutureTask<>(new T2Task());
// 创建任务T1的FutureTask
FutureTask<String> futureTask1 = new FutureTask<>(new T1Task(futureTask2));
// 线程T1执行任务futureTask1
Thread T1 = new Thread(futureTask1);
T1.start();
// 线程T2执行任务futureTask2
Thread T2 = new Thread(futureTask2);
T2.start();
// 等待线程T1执行结果
System.out.println(futureTask1.get());
}
}
1
2
3
4
5
6
7
8
T1:洗水壶...
T2:洗茶壶...
T2:洗茶杯...
T1:烧开水...
T2:准备茶叶...
T1:准备茶叶:西湖龙井
T1:泡茶...
上茶:西湖龙井

FutureTask结合阻塞队列的使用

没有使用阻塞队列的实践:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.knowledgesharing.knowledgesharing.JavaUtils.CompletionServiceTest;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class TaskIncomeTestBlock {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 计算任务1执行收益
Future<Integer> f1 = executor.submit(()-> {
Thread.sleep(3000);
return 1;
});
// 计算任务2执行收益
Future<Integer> f2 = executor.submit(()->{return 2;});
// 计算任务3执行收益
Future<Integer> f3 = executor.submit(()->{return 3;});
// 获取任务1执行收益计算结果并保存至数据库
Integer taskIncome1 = f1.get();
executor.execute(()-> {
System.out.println("任务1的收益为:" + taskIncome1 + ",现保存入库");
} );
// 获取任务2执行收益计算结果并保存至数据库
Integer taskIncome2 = f2.get();
executor.execute(()-> {
System.out.println("任务2的收益为:" + taskIncome2 + ",现保存入库");
} );
// 获取任务3执行收益计算结果并保存至数据库
Integer taskIncome3 = f3.get();
executor.execute(()-> {
System.out.println("任务3的收益为:" + taskIncome3 + ",现保存入库");
} );
executor.shutdown();
}
}
1
2
3
任务1的收益为:1,现保存入库
任务2的收益为:2,现保存入库
任务3的收益为:3,现保存入库

可以发现任务2和任务3都要等任务1的f1.get()执行结束才可以执行,也就是说调用f1.get()的主线程阻塞在了f1.get()处,只有执行结束后才能执行后续的逻辑,没有很好地利用异步执行,现结合阻塞队列进行优化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import java.util.concurrent.*;

public class TaskIncomeTestNotBlock {

public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建阻塞队列BlockingQueue
LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>();
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 计算任务1执行收益
Future<Integer> f1 = executor.submit(()-> {
Thread.sleep(3000);
return 1;
});
// 计算任务2执行收益
Future<Integer> f2 = executor.submit(()->{return 2;});
// 计算任务3执行收益
Future<Integer> f3 = executor.submit(()->{return 3;});
// 计算任务1执行收益 进入阻塞队列
executor.execute(()-> {
try {
blockingQueue.put(f1.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
// 计算任务2执行收益 进入阻塞队列
executor.execute(()-> {
try {
blockingQueue.put(f2.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
// 计算任务3执行收益 进入阻塞队列
executor.execute(()-> {
try {
blockingQueue.put(f3.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
// 异步获取任务执行收益计算结果并保存至数据库
for (int i=0; i<3; i++) {
Integer r = (Integer) blockingQueue.take();
executor.execute(()-> {
System.out.println("任务的收益为:" + r + ",现保存入库");
});
}
executor.shutdown();
}
}
1
2
3
任务的收益为:2,现保存入库
任务的收益为:3,现保存入库
任务的收益为:1,现保存入库

此时任务1的执行不会阻塞任务2和任务3的执行。

CompletionService实践

CompletionService基本使用

CompletionService 的实现原理是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,Future中结合阻塞队列的实践是把任务最终的执行结果放入了阻塞队列中。

CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}

这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。下面实践优化一下Future中结合阻塞队列实现3个任务的收益的计算和入库处理。

通过 CompletionService 接口提供的 submit() 方法提交三个任务收益计算的任务,这三个任务将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回任务执行结果了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.*;

public class CompletionServiceTest {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
// 异步计算任务1执行收益
completionService.submit(()-> {
Thread.sleep(3000);
return 1;
});
// 异步计算任务2执行收益
completionService.submit(()->{return 2;});
// 异步计算任务3执行收益
completionService.submit(()->{return 3;});
// 异步获取任务执行收益计算结果并保存至数据库
for (int i = 0; i<3; i++) {
Integer result = completionService.take().get();
executor.execute(()->{
System.out.println("任务的收益为:" + result + ",现保存入库");
});
}
executor.shutdown();
}
}
1
2
3
任务的收益为:3,现保存入库
任务的收益为:2,现保存入库
任务的收益为:1,现保存入库

CompletionService 接口说明

CompletionService 接口提供的方法有** 5 个,submit() 相关的方法有两个一个方法参数是Callable另外一个方法有两个参数,分别是Runnable task和V result**,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result) ,

CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素,区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}

public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}

public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

public Future<V> poll() {
return completionQueue.poll();
}

public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}

CompletionService实现方法高可用

在集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。比如你需要提供一个查询用户当前位置的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果,就可以直接返回了,也就是说可以容错 2 个地图服务商服务异常,但缺点是消耗的资源偏多。

下面实践一下:首先创建一个线程池 executor 、一个 CompletionService 对象 completionService 和一个Future类型的列表 futures,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务进行执行执行结束会返回一个 Future 对象到CompletionService的阻塞队列中,调用completionService.take()就能拿到并消费队列头部的一个Future,也就是最快返回的任务,调用completionService.take().get()就能拿到这个任务的执行结果。同时我们把这些 Future 对象保存在列表 futures 中,用于在finally中取消所有任务,因为我们想要的效果就是只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class HighAvailabilityTest {

public static void main(String[] args) throws InterruptedException {
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
// 用于保存Future对象
List<Future<String>> futures = new ArrayList<>(3);
// 提交异步任务,并保存future到futures
futures.add(completionService.submit(()-> {
Thread.sleep((long) (Math.random() * 1000) + 1);
return "服务商1提供的用户地址";})
);
futures.add(completionService.submit(()-> {
Thread.sleep((long) (Math.random() * 1000) + 1);
return "服务商2提供的用户地址";})
);
futures.add(completionService.submit(()-> {
Thread.sleep((long) (Math.random() * 1000) + 1);
return "服务商3提供的用户地址";})
);
// 获取最快返回的任务执行结果
String loaction = "";
try {
// 只要有一个成功返回结果 则break
for (int i = 0; i < 3; ++i) {
loaction = completionService.take().get();
// 简单地通过判空来检查是否成功返回
if (loaction != null) {
break;
}
}
} catch (ExecutionException e) {
throw new RuntimeException(e);
} finally {
// 取消所有任务
for(Future<String> future : futures)
future.cancel(true);
}
System.out.println(loaction);
executor.shutdown();
}
}
1
服务商1提供的用户地址

实践可知:每次运行程序都是不同服务商提供的用户地址,说明只要一个服务提供商提供了地址,则直接返回结果,保证了服务的高可用。

小结

当需要批量提交异步任务的时候建议使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽然看起来麻烦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

Fork/Join使用实践

分治任务模型

分治任务模型可分为两个阶段:

  • 任务分解:将任务迭代地分解为子任务,直至子任务可以直接计算出结果;
  • 结果合并:逐层合并子任务的执行结果,直至获得最终结果。

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,往往都采用递归算法。

Fork/Join 的使用

Fork/Join 是一个并行计算的框架,用于支持分治任务模型,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask

ForkJoinTask 是一个抽象类,它的核心方法是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果ForkJoinTask** 有两个子类——RecursiveAction** 和 RecursiveTask,它们都是用递归的方式来处理分治任务的。这两个子类都**定义了抽象方法 compute()**,不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。

实践一下用 Fork/Join 计算斐波那契数列。首先需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.concurrent.RecursiveTask;

public class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n){
this.n = n;
}
protected Integer compute(){
if (n <= 1) {
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
// 创建子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// 等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.concurrent.ForkJoinPool;

public class ForkJoinTest {
public static void main(String[] args) {
// 创建分治任务线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fibonacci = new Fibonacci(30);
// 启动分治任务
Integer result = forkJoinPool.invoke(fibonacci);
// 打印结果
System.out.println(result);
}
}

ForkJoinPool 工作原理

ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程共享一个任务队列。Fork/Join 并行计算的核心组件是 ForkJoinPool,ForkJoinPool 本质上也是一个生产者 - 消费者的实现

ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中

如果工作线程对应的任务队列空了,也是不会“躺平”的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,比如线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争

统计单词数量实践

现在有统计一个文件里面每个单词的数量的场景,可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果

开始实践一下,用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute())

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class WordStatisticsTest {

public static void main(String[] args) {

String[] fc = {"hello world",
"I am Chris",
"I am fork",
"I am join",
"We are fork and join"};
// 创建ForkJoin线程池
ForkJoinPool fjp = new ForkJoinPool(3);
// 创建任务
MR mr = new MR(fc, 0, fc.length);
// 启动任务
Map<String, Long> result = fjp.invoke(mr);
// 输出结果
result.forEach((k, v)-> System.out.println(k+":"+v));
}
//MR模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
private String[] fc;
private int start, end;

/**
* 构造函数
*/
MR(String[] fc, int fr, int to){
this.fc = fc;
this.start = fr;
this.end = to;
}
@Override
protected Map<String, Long> compute(){
if (end - start == 1) {
return calc(fc[start]);
} else {
int mid = (start + end)/2;
MR mr1 = new MR(fc, start, mid);
mr1.fork();
MR mr2 = new MR(fc, mid, end);
// 计算子任务,并返回合并的结果
return merge(mr2.compute(), mr1.join());
}
}

/**
* 合并Map中的结果
*/
private Map<String, Long> merge(Map<String, Long> r1, Map<String, Long> r2) {
Map<String, Long> result = new HashMap<>();
result.putAll(r1);
// 合并结果
r2.forEach((k, v) -> {
Long c = result.get(k);
if (c != null)
result.put(k, c+v);
else
result.put(k, v);
});
return result;
}

/**
* 统计单词数量
*/
private Map<String, Long> calc(String line) {
Map<String, Long> result = new HashMap<>();
// 分割单词
String [] words = line.split("\\s+");
// 统计单词数量
for (String w : words) {
Long v = result.get(w);
if (v != null)
result.put(w, v+1);
else
result.put(w, 1L);
}
return result;
}
}
}
1
2
3
4
5
6
7
8
9
10
fork:2
world:1
are:1
and:1
Chris:1
I:3
join:2
hello:1
am:3
We:1

小结

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果

Fork/Join 并行计算框架的核心组件是 ForkJoinPoolForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务

线程池原理

自定义线程池实践

线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。实践创建一个简单的线程池 MyThreadPool,以辅助理解线程池的工作原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

/**
* 自定义线程池
*/
class MyThreadPool {
// 利用阻塞队列实现生产者-消费者模式
BlockingQueue<Runnable> workQueue;
// 保存内部工作线程
List<WorkerThread> threads = new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx = 0; idx<poolSize; idx++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command) throws InterruptedException {
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread {
public void run() {
// 循环取任务并执行
while(true){
Runnable task = null;
try {
task = workQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
task.run();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class ThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 创建有界阻塞队列
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任务
pool.execute(()->{
System.out.println("I am Chris.");
});
}
}
1
I am Chris.

在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码 while 循环部分

使用 Java 中的线程池方式

Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,ThreadPoolExecutor 的构造函数有 7 个参数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

可以把线程池类比为一个项目组,而线程就是项目组的成员。

  1. corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
  2. maximumPoolSize:表示线程池创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
  3. keepAliveTime & unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了
  4. workQueue:工作队列,和上面示例代码的工作队列同义。
  5. threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  6. handler:通过这个参数你可以自定义任务的拒绝策略如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
  • CallerRunsPolicy提交任务的线程自己去执行该任务
  • AbortPolicy:默认的拒绝策略,直接抛出异常 throws RejectedExecutionException。
  • DiscardPolicy直接丢弃任务,没有任何异常抛出
  • DiscardOldestPolicy丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列

使用线程池注意事项

Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors ,最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略,因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略,并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。

使用线程池,还要注意异常处理的问题,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。

1
2
3
4
5
6
7
try {
// 业务逻辑
} catch (RuntimeException x) {
// 按需处理
} catch (Throwable x) {
// 按需处理
}

CountDownLatch使用实践

有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CountDownLatch优化串行执行查询订单和查询运单的逻辑

创建一个 CountDownLatch,计数器的初始值等于 2,之后在模拟从数据库中查询订单的操作orders.add(orderList.get(index));和模拟从数据库中查询运单的操作dispatches.add(dispatchList.get(index));对应的两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用** latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() **来实现对计数器等于 0 的等待。

1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Dispatch {
private Long id;

private Integer money;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {

private Long id;

private Integer money;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class CountDownLatchTest {
// 订单队列
public static Vector<Order> orders = new Vector<>();
// 派送单队列
public static Vector<Dispatch> dispatches = new Vector<>();
// 执行回调的线程池
public static Executor executor = Executors.newFixedThreadPool(2);

public static void main(String[] args) throws InterruptedException {
List<Order> orderList = new ArrayList<>();
List<Dispatch> dispatchList = new ArrayList<>();
orderList.add(new Order(1L, 10));
orderList.add(new Order(2L, 12));
dispatchList.add(new Dispatch(1L, 6));
dispatchList.add(new Dispatch(2L, 8));

for (int i = 0; i < orderList.size();i++) {
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 模拟查询订单
int index = i;
executor.execute(()-> {
orders.add(orderList.get(index));
latch.countDown();
});
// 模拟查询运单
executor.execute(()-> {
dispatches.add(dispatchList.get(index));
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
System.out.println("执行对账操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index));
// 对账结果写入数据库
System.out.println("对账结果写入数据库操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index));
}
}

}
1
2
3
4
执行对账操作,order为:Order(id=1, money=10),dispatch为Dispatch(id=1, money=6)
对账结果写入数据库操作,order为:Order(id=1, money=10),dispatch为Dispatch(id=1, money=6)
执行对账操作,order为:Order(id=2, money=12),dispatch为Dispatch(id=2, money=8)
对账结果写入数据库操作,order为:Order(id=2, money=12),dispatch为Dispatch(id=2, money=8)

CyclicBarrier使用实践

有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CyclicBarrier优化串行执行查询订单和查询运单的逻辑

首先创建一个计数器初始值为 2 的 CyclicBarrier,创建 CyclicBarrier 的时候,还需要传入一个回调函数,这样的话当计数器减到 0 的时候,会调用这个回调函数。线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询运单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。比较有意思的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,使用起来十分方便。

1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Dispatch {
private Long id;

private Integer money;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {

private Long id;

private Integer money;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class CyclicBarrierTest {
// 订单队列
public static Vector<Order> orders = new Vector<>();
// 派送单队列
public static Vector<Dispatch> dispatches = new Vector<>();
// 执行回调的线程池
public static Executor executor = Executors.newFixedThreadPool(1);
public static final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
executor.execute(()-> check() );
});
public static void main(String[] args) {
List<Order> orderList = new ArrayList<>();
List<Dispatch> dispatchList = new ArrayList<>();
orderList.add(new Order(1L, 10));
orderList.add(new Order(2L, 12));
dispatchList.add(new Dispatch(1L, 6));
dispatchList.add(new Dispatch(2L, 8));

// 循环查询订单库
Thread T1 = new Thread(()->{
for(Order order : orderList){
// 查询订单库
orders.add(order);
// 等待
try {
barrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(()->{
for (Dispatch dispatch : dispatchList) {
// 查询运单库
dispatches.add(dispatch);
// 等待
try {
barrier.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
});
T2.start();
}

public static void check(){
Order order = orders.remove(0);
Dispatch dispatch = dispatches.remove(0);
// 执行对账操作
System.out.println("执行对账操作,order为:" + order + ",dispatch为" + dispatch);
// 对账结果写入数据库
System.out.println("对账结果写入数据库,order为:" + order + ",dispatch为" + dispatch);
}
}
1
2
3
4
执行对账操作,order为:Order(id=1, money=10),dispatch为Dispatch(id=1, money=6)
对账结果写入数据库,order为:Order(id=1, money=10),dispatch为Dispatch(id=1, money=6)
执行对账操作,order为:Order(id=2, money=12),dispatch为Dispatch(id=2, money=8)
对账结果写入数据库,order为:Order(id=2, money=12),dispatch为Dispatch(id=2, money=8)

CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别:CountDownLatch 主要用来解决一个线程等待多个线程的场景,而 CyclicBarrier 是一组线程之间互相等待。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但** CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数**。

ReadWriteLock实践

读多写少场景下,适用读写锁保证数据获取的安全和性能,例如缓存元数据、缓存基础数据等。

读写锁是什么

读写锁,某一个语言特有的,是一中设计的思想,所有的读写锁都遵守以下三条基本原则:

  1. 允许多个线程同时读共享变量
  2. 只允许一个线程写共享变量
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量

读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量,而互斥锁是不允许,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。

自定义一个缓存

实践一下,用 ReadWriteLock 实现一个缓存工具类。缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap 不是线程安全的,这里我们使用读写锁 ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,是支持可重入的。通过 rwl 创建了一把读锁和一把写锁。

Cache 这个工具类,提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法 put()。读缓存需要用到读锁,是 try{}finally{}这个格式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。

假设缓存的源头是数据库,如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,可以调用 w.lock() 来获取写锁。需要注意的是,在获取写锁之后,不能直接去查询数据库,而是在重新验证了一次缓存中是否存在,因为有可能已经有线程从数据库把这条数据写入缓存了,再次验证如果还是不存在,当前线程再执行从数据库写入缓存的操作。比如高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get() 方法,并且参数 key 也是相同的。那么它们会同时执行到代码w.lock();处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程 T2 和 T3 会再有一个线程能够获取写锁,假设是 T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3 完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class Cache {
final Map<Long,User> m = new HashMap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();

public User get(Long key) {
User user = null;
// 读缓存
r.lock();
try {
user = m.get(key);
} finally{
r.unlock();
}
// 缓存中存在,返回
if(user != null) {
return user;
}
// 缓存中不存在,查询数据库
w.lock();
try {
// 再次验证
// 其他线程可能已经查询过数据库
user = m.get(key);
if(user == null){
// 模拟查询数据库
user = queryUserById(key);
m.put(key, user);
}
} finally{
w.unlock();
}
return user;
}

public User put(Long key, User user) {
w.lock();
try {
// 缓存中可能已经存在了
user = m.get(key);
if(user == null){
// 模拟查询数据库
user = queryUserById(key);
m.put(key, user);
}
} finally{
w.unlock();
}
return user;
}

public User queryUserById(Long id) {
return new User(id, "Chris");
}
}

StampedLock使用实践

使用实践

ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁悲观读锁乐观读。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp。相关的示例代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final StampedLock sl = new StampedLock();

// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
// 具体业务逻辑
} finally {
sl.unlockRead(stamp);
}

// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
// 具体业务逻辑
} finally {
sl.unlockWrite(stamp);
}

StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞。这里是乐观读,而不是乐观读锁,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些

下面模拟计算两点之间距离的场景实践一下:在 distanceFromOrigin() 这个方法中,首先通过调用 tryOptimisticRead() 获取了一个 stamp,这里的 tryOptimisticRead() 就是我们前面提到的乐观读。之后将共享变量 x 和 y 读入方法的局部变量中,不过需要注意的是,由于 tryOptimisticRead() 是无锁的,所以共享变量 x 和 y 读入方法局部变量时,x 和 y 有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用 validate(stamp) 来实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.util.concurrent.locks.StampedLock;

class Point {
private int x, y;
final StampedLock sl = new StampedLock();
/**
* 计算到原点的距离
*/
int distanceFromOrigin() {
// 乐观读
long stamp = sl.tryOptimisticRead();
// 读入局部变量 读的过程数据可能被修改
int curX = x;
int curY = y;
// 判断执行读操作期间 是否存在写操作 如果存在写操作 则sl.validate返回false
if (!sl.validate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try {
curX = x;
curY = y;
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
return (int) Math.sqrt(curX * curX + curY * curY);
}
}

如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。升级为悲观读锁,代码简练且不易出错,建议在具体实践时也采用这样的方法。

注意事项

StampedLock 在命名上并没有增加 Reentrant,所以表明 StampedLock 是不可重入的。StampedLock 的悲观读锁、写锁都不支持条件变量如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.StampedLock;

public class StampedLockInterruptTest {
public static void main(String[] args) throws InterruptedException {
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(()->{
// 获取写锁
lock.writeLock();
// 永远阻塞在此处 不释放写锁
LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
// 阻塞在悲观读锁
lock.readLock()
);
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
// 中断线程T2会导致线程T2所在CPU飙升
T2.interrupt();
T2.join();
}
}

所以,使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定**使用可中断的悲观读锁 ****readLockInterruptibly() ****和写锁 ****writeLockInterruptibly()**。

Semaphore实践

基本概念

Semaphore是信号量,比如我们开车时在等的红绿灯,红灯停,绿灯行,根据红绿灯的信号,车和行人决定做出等待还是前行的行为。信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。

  • init():设置计数器的初始值。
  • down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

上述三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

信号量使用

举一个累加器的例子,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。用信号量控制互斥的方法,就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作。下面的实践中,acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.util.concurrent.Semaphore;

public class SemaphoreBasicTest {
static int count = 0;

//初始化信号量
static final Semaphore s = new Semaphore(1);

public static void main(String[] args) throws InterruptedException {
addOne();
System.out.println(count);
addOne();
System.out.println(count);
}

// 用信号量保证互斥
static void addOne() throws InterruptedException {
s.acquire();
try {
count+=1;
} finally {
s.release();
}
}
}
1
2
1
2

假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0另外一个线程(T2)则是将计数器减为 -1对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以****线程 T1 会继续执行;对于线程 T2,信号量里面的计数器的值是 -1,小于 0(自己想要执行就得大于0),按照信号量模型里对 down() 操作的描述,****线程 T2 将被阻塞。所以此时只有线程 T1 会进入临界区执行count+=1;。当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0(想要唤醒其他线程就得小于等于0,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

实践实现一个限流器

有 Java SDK 里面提供了 Lock,还要提供一个 Semaphore 的原因在于,实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。比如各种池化资源,例如连接池、对象池、线程池等等,比如数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。

我们实践一下通过Semaphore创建一个对象池,也就是指的是一次性创建出多个对象,之后所有的线程重复利用这些对象,当然对象在被释放前,也是不允许其他线程使用的。对象池,可以用 List 保存实例对象,限流器的限流指的是不允许多于 N 个线程同时进入临界区,使用信号量解决这个问题。信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就可以解决对象池的限流问题了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Function;

class ObjPool<T, R> {
final List<T> pool;
// 用信号量实现限流器
final Semaphore semaphore;

/**
* 构造函数
* @param size 不允许多于 size 个线程同时进入临界区
* @param list 对象池
* @return
*/

ObjPool(int size, List<T> list){
pool = list;
semaphore = new Semaphore(size);
}

R exec(Function<T,R> func) throws InterruptedException {
T t = null;
semaphore.acquire();
try {
t = pool.remove(0);
return func.apply(t);
} finally {
// 用完再放回对象池
pool.add(t);
semaphore.release();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class SemaphoreTest {

public static void main(String[] args) throws InterruptedException {
List<User> list = new ArrayList<>();
list.add(new User(1L, "Chris"));
list.add(new User(2L, "Zhangsan"));
list.add(new User(3L, "John"));
// 创建对象池 这里不要写成1 否则能进入临界区的只有一个线程 现在设置为3 则允许最多3个线程
// 同时进入临界区 即创建的Semaphore的值为3
ObjPool<User, String> pool = new ObjPool<User, String>(3, list);
// 创建一个固定大小的线程池,最多同时运行3个线程
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务
for (int i = 0; i < 3; i++) {
executor.execute(() -> {
try {
String result = pool.exec(param -> {
try {
Thread.sleep(3000);
return param.getName();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
System.out.println(result);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
}
}
1
2
3
John
Zhangsan
Chris

用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 3,信号量的计数器初始化为 3,那么前 3 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。

不变性(Immutability)模式

不变性(Immutability)模式就是对象一旦被创建之后,状态就不再发生变化,即变量一旦被赋值,就不允许修改了(没有写操作),没有修改操作,也就是保持了不变性。

实现具备不可变性的类

将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了。更严格的做法是这个类本身也是 final 的,也就是不允许继承。因为子类可以覆盖父类的方法,有可能改变不可变性。

Java SDK 里很多类都具备不可变性,例如经常用到的 String 和 Long、Integer、Double 等基础类型的包装类都具备不可变性这些对象的线程安全性都是靠不可变性来保证的,它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的

例如String 这个类以及它的属性 value[]都是 final 的,而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public String replace(char oldChar, char newChar) {
if (oldChar != newChar) {
int len = value.length;
int i = -1;
char[] val = value; /* avoid getfield opcode */
// 找到第一个匹配的字符串的索引位置i
while (++i < len) {
if (val[i] == oldChar) {
break;
}
}
if (i < len) {
char buf[] = new char[len];
// i之前的字符串直接拷贝过来
for (int j = 0; j < i; j++) {
buf[j] = val[j];
}
// i及i之后的字符串判断是不是匹配 匹配则用替换后的字符串赋值到对应索引值位置
while (i < len) {
char c = val[i];
buf[i] = (c == oldChar) ? newChar : c;
i++;
}
// 返回一个替换后的新的字符串 而不是修改当前字符串
return new String(buf, true);
}
}
return this;
}

如果具备不可变性的类,需要提供类似修改的功能,具体就是创建一个新的不可变对象,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性所有的修改操作都创建一个新的不可变对象,如果不利用缓存,则十分浪费内存,因此引出了缓存的设计

享元模式避免创建重复对象

利用享元模式可以减少创建对象的数量,从而减少内存占用。Java中 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式。享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑:创建之前,首先去对象池里看看是不是存在;如果已经存在,就利用对象池里的对象;如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/**
* Java 在运行时会预先创建并缓存范围内的 Integer 对象 以提高性能和节省内存
* 这样 在代码中多次使用相同的整数值时 可以直接从缓存中获取对应的 Integer 对象
* 而无需每次都创建新的对象
**/
private static class IntegerCache {
// low 和 high分别定义了缓存范围的下限和上限
static final int low = -128;
static final int high;
// 用于存储 Integer 对象的数组。它的长度由 low 和 high 决定,即缓存范围的大小
static final Integer cache[];

// 这个块在类加载时执行 用于初始化 high 和 cache 数组
static {
// high value may be configured by property
int h = 127;
// 首先尝试获取系统属性 "java.lang.Integer.IntegerCache.high" 的值
// 该属性可以用来配置缓存的上限
String integerCacheHighPropValue =
sun.misc.VM.getSavedProperty("java.lang.Integer.IntegerCache.high");
if (integerCacheHighPropValue != null) {
try {
// 如果该属性存在且能够被解析为整数,则使用它作为缓存范围的上限 否则使用默认值127
int i = parseInt(integerCacheHighPropValue);
i = Math.max(i, 127);
// Maximum array size is Integer.MAX_VALUE
h = Math.min(i, Integer.MAX_VALUE - (-low) -1);
} catch( NumberFormatException nfe) {
// If the property cannot be parsed into an int, ignore it.
}
}
high = h;

cache = new Integer[(high - low) + 1];
int j = low;
// 根据计算得到的 high 值 初始化 cache 数组
// 为其中的每个元素赋值为从 low 到 high 的连续整数
for(int k = 0; k < cache.length; k++)
cache[k] = new Integer(j++);

// range [-128, 127] must be interned (JLS7 5.1.7)
assert IntegerCache.high >= 127;
}

private IntegerCache() {}
}

public static Integer valueOf(int i) {
// 先查询缓存数组中是否存在可用对象 不存在再创建新对象
if (i >= IntegerCache.low && i <= IntegerCache.high)
return IntegerCache.cache[i + (-IntegerCache.low)];
return new Integer(i);
}

需要注意的是:基本上所有的基础类型的包装类都不适合做锁,因为它们内部用到了享元模式,这会导致看上去私有的锁,其实是共有的,实践一下:

1
2
3
4
5
6
7
8
9
10
11
12
public class Student {

public Integer num = Integer.valueOf(1);

public void test() throws InterruptedException {

synchronized (num) {
Thread.sleep(3000);
System.out.println("我是Student~");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public class User {

public Integer num = Integer.valueOf(1);

public void test() throws InterruptedException {

synchronized (num) {
Thread.sleep(3000);
System.out.println("我是User~");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import java.util.concurrent.*;

public class IntegerLockTest {

public static void main(String[] args) {
Student student = new Student();
User user = new User();
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
// 任务1
pool.execute(() -> {
try {
student.test();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 任务2
pool.execute(() -> {
try {
user.test();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
pool.shutdown();
}
}
1
2
我是Student~
我是User~

执行了6s,说明同一时刻,只有一个线程获取到了Integer类型的对象1的锁。

使用不可变性注意事项

  1. 对象的所有属性都是 final 的,并不能保证不可变性
  2. 不可变对象也需要正确发布

Java 中final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。

1
2
3
4
5
6
7
8
9
10
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Student02 {

private Long id;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class School {

final Student02 student;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IntegerLockTest02 {

public static void main(String[] args) {
School school = new School(new Student02(1L, "Chris"), "阳光小学");
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
// 任务1
pool.execute(() -> {
school.student.setId(2L);
});
// 任务2
pool.execute(() -> {
school.student.setId(3L);
});
pool.shutdown();
System.out.println(school.getStudent().getId());
}
}
1
2

实践证明:虽然School类中Student02为final类型,但Student02类型的成员变量student的值可以在多个线程中被访问和修改。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性

不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。实践一下:Student03 具备不可变性,线程安全,但是类 School02 并不是线程安全的,类 School02 中持有对 Student03 的引用 student,对 student 这个引用的修改在多线程中并不能保证可见性和原子性。

1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
final public class Student03 {

final private Long id;

final private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class School02 {

Student03 student;

private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class IntegerLockTest03 {

public static void main(String[] args) {
School02 school = new School02(new Student03(1L, "Chris"), "阳光小学");
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
// 任务1
pool.execute(() -> {
Student03 lucky = new Student03(2L, "Lucky");
school.setStudent(lucky);
});
// 任务2
pool.execute(() -> {
Student03 jack = new Student03(3L, "Jack");
school.setStudent(jack);
});
pool.shutdown();
System.out.println(school.getStudent());
}
}
1
Student03(id=2, name=Lucky)

实践可知:多线程会修改School02中对Student03 student的引用

如果你的程序仅仅需要 student 保持可见性,无需保证原子性,那么可以将 student 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现,实践一下:

1
2
3
4
5
6
7
8
9
10
11
12
import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
final public class Student03 {

final private Long id;

final private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
import lombok.AllArgsConstructor;
import lombok.Data;

import java.util.concurrent.atomic.AtomicReference;

@Data
@AllArgsConstructor
public class School02 {

final AtomicReference studentAtomicReference;
private String name;

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class IntegerLockTest03 {

public static void main(String[] args) throws InterruptedException {
School02 school = new School02(new AtomicReference<>(new Student03(1L,"Chris")), "阳光小学");
ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
Student03 preStudent = (Student03) school.getStudentAtomicReference().get();
// 任务1
pool.execute(() -> {
Student03 lucky = new Student03(2L, "Lucky");
boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, lucky);
System.out.println("执行任务1");
System.out.println(flag);
System.out.println(school.getStudentAtomicReference().get());
});
// 任务2
pool.execute(() -> {
Student03 jack = new Student03(3L, "Jack");
boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, jack);
System.out.println("执行任务2");
System.out.println(flag);
System.out.println(school.getStudentAtomicReference().get());
});
pool.shutdown();
}
}
1
2
3
4
5
6
执行任务1
true
Student03(id=2, name=Lucky)
执行任务2
false
Student03(id=2, name=Lucky)

实践证明:当两个线程都尝试更新studentAtomicReference时,一个线程使用compareAndSet函数更新该成员变量成功后,另一个线程调用compareAndSet函数会失败,因为此时预期的值不再是preStudent,而是首先更新的线程修改后的值

具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法。除了无状态的对象,你可能还听说过无状态的服务、无状态的协议等等。无状态有很多好处,最核心的一点就是性能。在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好;在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。

Guarded Suspension模式:等待唤醒机制实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import lombok.Data;
import java.util.Map;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

@Data
class GuardedObject<T>{
// 受保护的对象
T obj;
final Lock lock = new ReentrantLock();
final Condition done = lock.newCondition();
final int timeout=2;
// 保存所有GuardedObject
final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>();

final static BlockingDeque<GuardedObject> historyGos = new LinkedBlockingDeque<>();
// 静态方法创建GuardedObject
static <K> GuardedObject create(K key){
GuardedObject go = new GuardedObject();
gos.put(key, go);
return go;
}
static <K, T> void fireEvent(K key, T obj){
GuardedObject go = gos.remove(key);
if (go != null){
go.onChanged(obj);
}
}
// 获取受保护对象
T get(Predicate<T> p) {
lock.lock();
try {
// MESA管程推荐写法
while(!p.test(obj)) {
done.await(timeout, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}finally {
lock.unlock();
}
//返回非空的受保护对象
return obj;
}
// 事件通知方法
void onChanged(T obj) {
lock.lock();
try {
this.obj = obj;
// 将自身加入阻塞队列中待消费
historyGos.add(this);
done.signalAll();
} finally {
lock.unlock();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class GuardedSuspensionTest {

public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
// 任务1
pool.execute(() -> {
handleWebReq(1L, "第一个任务的消息");
});
// 任务2
pool.execute(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
handleWebReq(2L, "第二个任务的消息");
});
// 任务3
pool.execute(() -> {
handleWebReq(3L, "第三个任务的消息");
});
// 任务4
pool.execute(() -> {
Message message1 = null;
try {
while (true) {
// 等待MQ消息 GuardedObject对象中被保护对象不为空则返回该被保护对象 此处被保护对象为Message类的实例对象
GuardedObject guardedObject = GuardedObject.historyGos.poll(3, TimeUnit.SECONDS);
if (guardedObject == null) {
System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费");
// 如果3秒内没有消息,跳出循环
break;
}
message1 = (Message) guardedObject.get(t -> t != null);
System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// 任务5
pool.execute(() -> {
Message message1 = null;
try {
while (true) {
// 等待MQ消息 GuardedObject对象中被保护对象不为空则返回该被保护对象 此处被保护对象为Message类的实例对象
GuardedObject guardedObject = GuardedObject.historyGos.poll(3, TimeUnit.SECONDS);
if (guardedObject == null) {
System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费");
// 如果3秒内没有消息,跳出循环
break;
}
message1 = (Message) guardedObject.get(t -> t != null);
System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent());
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
pool.shutdown();
}

// 处理浏览器发来的请求
public static void handleWebReq(Long id, String msgContent){
// 创建消息
Message msg = new Message(id, msgContent);
// 创建GuardedObject实例
GuardedObject<Message> go = GuardedObject.create(id);
// 发送消息
send(msg);
}
public static void onMessage(Message msg){
// 唤醒等待的线程
GuardedObject.fireEvent(msg.getId(), msg);
}

public static void send(Message msg) {
System.out.println("当前线程为:" + Thread.currentThread().getName() + "," + "发送消息:" + msg.getContent());
onMessage(msg);
}
}
1
2
3
4
5
6
7
8
当前线程为:pool-1-thread-1,发送消息:第一个任务的消息
当前线程为:pool-1-thread-3,发送消息:第三个任务的消息
当前线程为:pool-1-thread-5,接收到消息:第三个任务的消息
当前线程为:pool-1-thread-4,接收到消息:第一个任务的消息
当前线程为:pool-1-thread-2,发送消息:第二个任务的消息
当前线程为:pool-1-thread-5,接收到消息:第二个任务的消息
当前线程为:pool-1-thread-4, 3s内没有消息,停止消费
当前线程为:pool-1-thread-5, 3s内没有消息,停止消费

实践证明:3个线程生产消息,2个线程消费消息,保证了一个消息只有一个线程消费,不存在同一个消息被重复消费的情况。