BigDecimal BigDecimal 可以实现对浮点数的运算,不会造成精度丢失。通常情况下,大部分需要浮点数精确运算结果的业务场景(比如涉及到钱的场景)都是通过 BigDecimal 来做的。
BigDecimal 常见方法 创建 使用**BigDecimal(String val)构造方法或者 BigDecimal.valueOf(double val) 静态方法来创建对象。禁止使用 BigDecimal(double val)**方式把double值转化为BigDecimal对象,因为存在精度损失风险。
加减乘除 add 方法用于将两个 BigDecimal 对象相加,subtract 方法用于将两个 BigDecimal 对象相减。multiply 方法用于将两个 BigDecimal 对象相乘,divide 方法用于将两个 BigDecimal 对象相除。
1 2 3 4 5 6 7 BigDecimal a = new BigDecimal("1.0" ); BigDecimal b = new BigDecimal("0.9" ); System.out.println(a.add(b)); System.out.println(a.subtract(b)); System.out.println(a.multiply(b)); System.out.println(a.divide(b)); System.out.println(a.divide(b, 2 , RoundingMode.HALF_UP));
使用 divide 方法的时候尽量使用 3 个参数版本,并且RoundingMode 不要选择 UNNECESSARY,否则很可能会遇到 ArithmeticException(无法除尽出现无限循环小数的时候),其中 scale 表示要保留几位小数,roundingMode 代表保留规则,常见保留规则的枚举类型为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public enum RoundingMode { UP(BigDecimal.ROUND_UP), DOWN(BigDecimal.ROUND_DOWN), CEILING(BigDecimal.ROUND_CEILING), FLOOR(BigDecimal.ROUND_FLOOR), HALF_UP(BigDecimal.ROUND_HALF_UP), }
大小比较 a.compareTo(b) : 返回 -1 表示 a 小于 b,0 表示 a 等于 b , 1 表示 a 大于 b。
保留几位小数 通过 setScale方法设置保留几位小数以及保留规则。保留规则有挺多种,不需要记,IDEA 会提示。
1 2 3 BigDecimal m = new BigDecimal("1.255433" ); BigDecimal n = m.setScale(3 ,RoundingMode.HALF_DOWN); System.out.println(n);
等值比较 BigDecimal 使用 equals() 方法进行等值比较出现问题的代码示例:
1 2 3 BigDecimal a = new BigDecimal("1" ); BigDecimal b = new BigDecimal("1.0" ); System.out.println(a.equals(b));
这是因为 equals() 方法不仅仅会比较值的大小(value)还会比较精度(scale),而 compareTo() 方法比较的时候会忽略精度。1.0 的 scale 是 1,1 的 scale 是 0,因此 a.equals(b) 的结果是 false。
compareTo() ** 方法可以比较两个 BigDecimal 的值,如果相等就返回 0,如果第 1 个数比第 2 个数大则返回 1,反之返回-1。**
Bigdecimal一定不会丢失精度吗? 不是的,Bigdecimal也会丢失精度。精度丢失情况分两种:
第一种:不能分解为以(1/2)^n为单位的十进制小数,例如无限循环的小数未指定位数
Bigdecimal可以比double更能保证精度的原因在于:****十进制整数在转化成二进制数时不会有精度问题,那么把十进制小数扩大N倍让它在整数的维度上进行计算,并保留相应的精度信息就能保证精度。
但当出现无限循环的计算结果时,Bigdecimal也只能通过开发者指定保留位数和保留规则来计算结果,例如计算结果为1/3:
1 2 3 BigDecimal b1 = new BigDecimal("1" ); BigDecimal b2 = new BigDecimal("3" ); System.out.println(b1.divide(b2));
则会报错:
1 2 3 Exception in thread "main" java.lang.ArithmeticException: Non-terminating decimal expansion; no exact representable decimal result. at java.math.BigDecimal.divide(BigDecimal.java:1707 ) at test01.main(test01.java:26 )
需要指定保留位数和保留规则,如下:
1 2 3 BigDecimal b1 = new BigDecimal("1" ); BigDecimal b2 = new BigDecimal("3" ); System.out.println(b1.divide(b2,3 , RoundingMode.UP));
第二种:使用Bigdecimal(String s)之外的其他构造器
1 2 3 4 5 6 BigDecimal a = new BigDecimal(1.01 ); BigDecimal b = new BigDecimal(1.02 ); BigDecimal c = new BigDecimal("1.01" ); BigDecimal d = new BigDecimal("1.02" ); System.out.println(a.add(b)); System.out.println(c.add(d));
1 2 2.0300000000000000266453525910037569701671600341796875 2.03
double类型的变量在用二进制保存时本身就存在精度丢失问题,如果将其作为构造器的输入则无法正确保留精度。
枚举类 枚举的意义 我们在很多时候会拿枚举和常量来做对比,可实际我们在程序中大量使用枚举的地方就是为了代替常量 。因为相对于静态的常量,枚举类显得更加直观,类型也相对更加安全 。
枚举在生活中非常常见,列举如下:
表示星期几 :SUNDAY、MONDAY、TUESTDAY、WEDNESDAY、THURSDAY、FRIDAY、SATURDAY就是一个枚举;
性别 :MALE(男)、FEMALE(女)也是一个枚举;
订单的状态 :PAIDED(已付款)、UNPAIDED(未付款)、FINISHED(已完成),CANCELED(已取消)。
安全性 若一个方法中要求入参为int类型,开发者传入任意的int类型值就行,但是如果是枚举类型的话,就只能传入枚举类中包含的对象。
命名空间 开发者要在命名的时候建议以枚举类的共有属性名称开头 ,例如定义季节的枚举类,在枚举类中定义四个季节,则枚举类以SEASON_开头,这样另外一个开发者再看这段代码的时候,才知道这四个常量分别代表季节。
枚举的特点
enum和class、interface的地位一样
类的对象只有有限个,确定的 。我们称此类为枚举类;
当需要定义一组常量 时,强烈建议使用枚举类;
如果枚举类中只有一个对象,则可以作为单例模式的实现方式 ;
switch参数可以使用Enum;
enum 允许为eunm 实例编写方法 。所以可以为每个enum 实例赋予各自不同的行为 。
使用enum定义的枚举类默认继承了java.lang.Enum,而不是继承Object类,由于Java是单继承机制,所以枚举类不可以被继承 ,枚举类可以实现一个或多个接口。
枚举类的所有实例都必须放在第一行展示,不需使用new 关键字,不需显式调用构造器 。自动添加public static final修饰。
枚举类的构造器只能是私有的 。
枚举类应用 基本使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public enum Season { spring, SUMMER, AUTUMN, WINTER; } public static void main (String[] args) { Season season = Season.spring; System.out.println(season); } public enum Season { spring("春天" ,"绿色" ), SUMMER("夏天" ,"橙色" ), AUTUMN("秋天" ,"黄色" ), WINTER("冬天" ,"白色" ); private final String name; private final String color; Season(String name,String color) { this .name = name; this .color = color; } public String getName () { return name; } public String getColor () { return color; } } public static void main (String[] args) { Season season = Season.spring; System.out.println(season.getColor()); System.out.println(Season.spring.getColor()); }
常用方法 java.lang.Enum类 是 Java 语言枚举类型的公共基类 ,我们使用enum关键字定义的枚举类,是隐式继承自Enum类 的,下面我们来看一下Enum类的常用方法:
**values()**:返回枚举类型的对象数组。该方法可以很方便的遍历所有的枚举值;
**valueOf()**:可以把一个字符串转换为对应的枚举类对象。要求字符串必须是枚举类对象的“名字”,如果不是,会抛出IllegalArguementException;
**toString()**:返回当前枚举类对象常量的名称。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Test2 { public static void main (String[] args) { Test test1 = Test.man; System.out.println("调用 toString方法" ); System.out.println(test1.toString()); System.out.println("调用values方法" ); Test[] values = Test.values(); for (Test value : values){ System.out.println(value); } System.out.println("调用values方法" ); Test test2 = Test.valueOf("man" ); System.out.println(test2); } enum Test { man("男" ), woman("女" ), unknow("未知" ); private final String sexName; Test(String sexName){ this .sexName=sexName; } public String getSexName () { return sexName; } } }
1 2 3 4 5 6 7 8 调用 toString方法 man 调用values方法 man woman unknow 调用values方法 man
值得注意的是,当调用valuOf()方法时,我们 传递的对象的“名字”,在枚举类中并不存在,此时会抛出运行时异常:****IllegalArgumentException ,实例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Test2 { public static void main (String[] args) { Test test1 = Test.man; System.out.println("调用values方法" ); Test test2 = Test.valueOf("man1" ); System.out.println(test2); } enum Test { man("男" ), woman("女" ), unknow("未知" ); private final String sexName; Test(String sexName){ this .sexName=sexName; } public String getSexName () { return sexName; } } }
1 2 3 4 5 调用values方法 Exception in thread "main" java.lang.IllegalArgumentException: No enum constant com .caq .exception .Test2 .Test .man1 at java .lang .Enum .valueOf (Enum .java :238) at com .caq .exception .Test2 $Test .valueOf (Test2 .java :12) at com .caq .exception .Test2 .main (Test2 .java :8)
枚举类对象实现接口的方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 interface Info { void show () ; } enum Season1 implements Info { SPRING("春天" ,"春暖花开" ){ @Override public void show () { System.out.println("春天在哪里?" ); } }, SUMMER("夏天" ,"夏日炎炎" ){ @Override public void show () { System.out.println("宁夏" ); } }, AUTUMN("秋天" ,"秋高气爽" ){ @Override public void show () { System.out.println("秋天不回来" ); } }, WINTER("冬天" ,"冰天雪地" ){ @Override public void show () { System.out.println("大约在冬季" ); } }; }
单例模式 单例模式可以说是每个Java开发者必须掌握的一个设计模式了,通常我们说它的实现,有饱汉式和饿汉式,也有经常说的双重判断,今天我们介绍另外一种方式,借助枚举来实现:
1 2 3 4 5 6 7 8 9 10 11 public enum SingleEnum { INSTANCE; public void print (String word) { System.out.println(word); } } @Test public void testSingle () { SingleEnum.INSTANCE.print("hello world" ); }
如上,用枚举实现单例模式真的非常简单,将类声明为枚举,内部只定义一个值即可。这样做主要是基于枚举的三个特性,即:枚举类不能new,因此保证单例、枚举类不能被继承、类不加载时,不会实例化 。
使用枚举类创建的单例还有一个好处,就是使用了反射也无法打破它的单例性质,这是相比较于其他的实现方式的一个优点。但是在实际的项目中这种写法并不常见,这是因为我们习惯了将枚举作为常量来使用,很少在枚举类类中,添加复杂的业务逻辑 。
策略模式 除了轻松实现上面的单例模式之外,枚举还可以非常简单的实现策略模式,比如下面这个例子:现有一个接口,通过接受的参数,来决定最终的数据存在什么地方,如果按照正常的写法,可能就是很多的if/else
1 2 3 4 5 6 7 8 9 10 11 12 public void save (String type, Object data) { if ("mysql" .equals(type) { saveMySQL(data); } else if ("redis" .equals(type)) saveRedis(data); } else if ("file" .eqauls(type)) { saveFile(type); } }
以上写法虽说简单直观,但是当type类型多了之后,这个if/else的代码行数就会越来越多了,而且看起来也不美观 。如果我们换成枚举,基于策略模式的思想来解决上面的if/else问题 ,就会好的多。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public enum SaveStrategyEnum { MYSQL("mysql" ) { @Override public void save (Object obj) { System.out.println("save in mysql:" + obj); } }, REDIS("redis" ) { @Override public void save (Object obj) { System.out.println("save in redis: " + obj); } }, FILE("file" ) { @Override public void save (Object obj) { System.out.println("save in file: " + obj); } }; private String type; SaveStrategyEnum(String type) { this .type = type; } public abstract void save (Object obj) ; public static SaveStrategyEnum typeOf (String type) { for (SaveStrategyEnum strategyEnum: values()) { if (strategyEnum.type.equalsIgnoreCase(type)) { return strategyEnum; } } return null ; } } public void save (String type, Object data) { SaveStrategyEnum strategyEnum = SaveStrategyEnum.typeOf(type); if (strategyEnum != null ) { strategyEnum.save(data); } }
1 2 3 4 5 6 7 8 9 10 11 12 public class enumTest { public static void main (String[] args) { enumTest enumTest = new enumTest(); enumTest.save("MYSQL" ,"我是MySQL" ); } public void save (String type, Object data) { SaveStrategyEnum strategyEnum = SaveStrategyEnum.typeOf(type); if (strategyEnum != null ) { strategyEnum.save(data); } } }
以上主要利用的是抽象类 + 枚举来完成不同的策略具体实现 ,这种实现方式体现了抽象方法的使用 (在模板设计模式中,更能体会抽象方法的使用妙处),利用枚举原生提供的values(),来实现遍历,找到目标 。
Stream流函数 初步认识 在JAVA中,涉及到对数组 、Collection 等集合类中的元素进行操作的时候,通常会通过循环的方式 进行逐个处理,或者使用Stream 的方式进行处理。
例如,现在有这么一个需求:
从给定句子中返回单词长度大于7的单词列表,按长度倒序输出,最多返回5个:在JAVA7及之前 的代码中,我们会可以照如下的方式进行实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public List<String> sortGetTop5LongWords (@NotNull String sentence) { String[] words = sentence.split(" " ); List<String> wordList = new ArrayList<>(); for (String word : words) { if (word.length() > 7 ) { wordList.add(word); } } wordList.sort((o1, o2) -> o2.length() - o1.length()); if (wordList.size() > 5 ) { wordList = wordList.subList(0 , 5 ); } return wordList; }
在JAVA8及之后 的版本中,引入了Stream流,我们可以更加优雅的写出如下代码:
1 2 3 4 5 6 7 8 public List<String> sortGetTop5LongWordsByStream (@NotNull String sentence) { return Arrays.stream(sentence.split(" " )) .filter(word -> word.length() > 7 ) .sorted((o1, o2) -> o2.length() - o1.length()) .limit(5 ) .collect(Collectors.toList()); }
直观感受上,Stream 的实现方式代码更加简洁、一气呵成。很多的同学在代码中也经常使用Stream流,但是对Stream流的认知往往也是仅限于会一些简单的filter 、map 、collect 等操作,但JAVA的Stream可以适用的场景与能力远不止这些。
基本介绍 Stream流操作分为3种类型 :
创建Stream
Stream中间处理
终止Steam
每个Stream管道操作类型都包含若干API方法,先列举下各个API方法的功能介绍。
开始管道 主要负责新建一个Stream流,或者基于现有的数组、List、Set、Map等集合类型对象创建出新的Stream流。
stream():创建出一个新的stream 串行流对象
parallelStream():创建出一个可 并行执行的stream流对象
Stream.of():通过给定的一系列元素创建一个新的Stream 串行流对象
中间管道 负责对Stream进行处理操作,并返回一个新的Stream对象,中间管道操作可以进行叠加 。
filter():按照条件 过滤符合要求的元素 , 返回新的stream流
map():将已有元素 转换为另一个对象类型 ,一对一逻辑 ,返回新的stream流
flatMap():将已有元素 转换为另一个对象类型 ,一对多逻辑 ,即原来一个元素对象可能会转换为1个或者多个新类型的元素,返回新的stream流
limit(): 仅保留集合前面指定个数的元素 ,返回新的stream流
skip(): 跳过集合前面指定个数的元素 ,返回新的stream流
concat():将 两个流的数据合并起来为一个新的流 ,返回新的stream流
distinct():对Stream中所有元素进行 去重 ,返回新的stream流
sorted():对stream中 所有的元素按照指定规则进行排序 ,返回新的stream流
终止管道 通过终止管道操作之后,Stream流将会结束 ,最后可能会执行某些逻辑处理,或者是按照要求返回某些执行后的结果数据。
count():返回stream处理后 最终的元素个数
max():返回stream处理后的 元素最大值
min():返回stream处理后的 元素最小值
findFirst():找到 第一个符合条件的元素 时则终止流处理
findAny():找到 任何一个符合条件的元素 时则退出流处理 ,这个对于串行流时与findFirst相同,对于并行流时比较高效 ,任何分片中找到都会终止后续计算逻辑
anyMatch():返回一个boolean值,类似于isContains(),用于 判断是否有符合条件的元素
allMatch():返回一个boolean值,用于 判断是否所有元素都符合条件
noneMatch():返回一个boolean值, 用于 判断是否所有元素都不符合条件
collect():将 流转换为指定的类型,通过Collectors进行指定
toArray():将 流转换为数组
iterator():将 流转换为Iterator对象
foreach():无返回值, 对元素进行逐个遍历,然后执行给定的处理逻辑
方法使用 map与flatMap map 与flatMap 都是用于转换已有的元素为其它元素,区别点在于:
map 必须是一对一的 ,即每个元素都只能转换为1个新的元素
flatMap 可以是一对多的 ,即每个元素都可以转换为1个或者多个新的元素
比如:有一个字符串ID列表,现在需要将其转为User对象列表 。可以使用map来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void stringToIntMap () { List<String> ids = Arrays.asList("111" , "222" , "333" , "444" , "555" , "666" , "777" ); List<User> results = ids.stream() .map(id -> { User user = new User(); user.setId(id); return user; }) .collect(Collectors.toList()); System.out.println(results); }
执行之后,会发现每一个元素都被转换为对应新的元素,但是前后总元素个数是一致的:
1 2 3 4 5 6 7 8 [User{id='111' }, User{id='222' }, User{id='333' }, User{id='444' }, User{id='555' }, User{id='666' }, User{id='777' }]
再比如:现有一个句子列表,需要将句子中每个单词都提取出来得到一个所有单词列表 。这种情况用map就无法满足需求了,需要使用flatMap 的特性:
1 2 3 4 5 6 7 8 9 public void stringToIntFlatmap () { List<String> sentences = Arrays.asList("hello world" ,"your name is Chris" ); List<String> results = sentences.stream() .flatMap(sentence -> Arrays.stream(sentence.split(" " ))) .collect(Collectors.toList()); System.out.println(results); }
1 [hello, world, your, name, is, Chris]
执行结果如下,可以看到结果列表中元素个数是比原始列表元素个数要多的,是将各个String类型的字符串句子以空格拆分,将结果加入同一个结果集进行返回 。这里flatMap 操作的时候其实是先每个元素处理并返回一个新的Stream,然后将多个Stream展开合并为了一个完整的新的Stream 。
peek和foreach方法 peek和foreach,都可以用于对元素进行遍历然后逐个的进行处理。
但根据前面的介绍,peek属于中间方法 ,而foreach属于终止方法 。这也就意味着peek只能作为管道中途的一个处理步骤,而没法直接执行得到结果,其后面必须还要有其它终止操作的时候才会被执行 ;而foreach作为无返回值的终止方法,则可以直接执行相关操作 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void peekAndforeach () { List<String> sentences = Arrays.asList("hello world" ,"your name is Chris" ); System.out.println("----before peek----" ); sentences.stream().peek(sentence -> System.out.println(sentence)); System.out.println("----after peek----" ); System.out.println("----before foreach----" ); sentences.stream().forEach(sentence -> System.out.println(sentence)); System.out.println("----after foreach----" ); System.out.println("----before peek and count----" ); sentences.stream().peek(sentence -> System.out.println(sentence)).count(); System.out.println("----after peek and count----" ); }
1 2 3 4 5 6 7 8 9 10 ----before peek---- ----after peek---- ----before foreach---- hello world your name is Chris ----after foreach---- ----before peek and count---- hello world your name is Chris ----after peek and count----
输出结果可以看出,peek 独自调用时并没有被执行、但peek后面加上终止操作之后便可以被执行,而foreach 可以直接被执行。
filter、sorted、distinct、limit 这几个都是常用的Stream的中间操作方法,具体的方法的含义在上面的表格里面有说明。具体使用的时候,可以根据需要选择一个或者多个进行组合使用,或者同时使用多个相同方法的组合 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Data @AllArgsConstructor @NoArgsConstructor public class Student { Integer id; String name; Integer age; public Student (Integer id) { this .id = id; this .name = "学生" ; this .age = 20 ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public void testGetTargetStudents () { List<String> ids = Arrays.asList("111" ,"222" ,"333" ,"444" ,"555" ,"666" ,"777" , "888" ); List<Student> results = ids.stream() .filter(s -> s.length() > 2 ) .distinct() .map(Integer::valueOf) .sorted(Comparator.comparingInt(o -> o)) .limit(3 ) .map(id -> new Student(id)) .collect(Collectors.toList()); System.out.println(results); }
1 [Student(id=111 , name=学生, age=20 ), Student(id=222 , name=学生, age=20 ), Student(id=333 , name=学生, age=20 )]
上面的代码片段的处理逻辑很清晰:
使用filter过滤掉不符合条件的数据
通过distinct 对存量元素进行去重 操作
通过map 操作将字符串转成整数类型
借助sorted 指定按照数字大小正序排列
使用limit截取 排在前3位的元素
又一次使用map将id转为Student对象类型
使用collect终止操作将最终处理后的数据收集到list 中
Optional.ofNullable()方法 **Optional.ofNullable()**是可以避免空指针异常的对入参进行封装的方法 ,举个例子:
1 2 3 4 public static void main (String[] args) { List<String> list = null ; list.forEach(c -> System.out.println(c)); }
工作中经常会遇到,查询返回空,如果没有判空处理,一不小心就会空指针异常 。加上if判断处理也可以,但是Java8 有更优雅的处理方式 。
1 2 3 4 5 public static void main (String[] args) { List<String> list = null ; List<String> newList = Optional.ofNullable(list).orElse(Lists.newArrayList()); newList.forEach(c -> System.out.println(c)); }
代码含义:
如果list集合不为空,将list集合赋值给newList
如果list集合为空创建一个空对象集合赋值给newList,保证list集合永远不为空,也就避免了空指针异常
阅读源码查看如何实现的避免空指针异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private static final Optional<?> EMPTY = new Optional<>(); public static <T> Optional<T> ofNullable (T value) { return value == null ? empty() : of(value); } public static <T> Optional<T> empty () { @SuppressWarnings("unchecked") Optional<T> t = (Optional<T>) EMPTY; return t; } public static <T> Optional<T> of (T value) { return new Optional<>(value); }
首先执行ofNullable()方法 ,如果T对象为空,执行empty()方法 ;不为空,执行of(value)方法 ;
empty()方法 ,初始化一个空对象Optional (空对象和null不是一回事);
of(value)方法 ,将泛型对象T用于Optional构造方法的参数上,返回一个有值的对象
经过上面两步,从而保证了Optional不为null,避免了空指针 ;
orElse、orElseGet、orElseThrow orElse和orElseGet的用法如下所示,相当于value值为null时,给予一个默认值:
1 2 3 4 5 6 7 8 9 10 11 12 @Test public void test () { User user = null ; user = Optional.ofNullable(user).orElse(createUser()); user = Optional.ofNullable(user).orElseGet(() -> createUser()); } public User createUser () { User user = new User(); user.setName("zhangsan" ); return user; }
这两个函数的区别:
当user值不为null时 ,orElse函数依然会执行createUser()方法
而orElseGet函数并不会执行createUser()方法
至于orElseThrow ,就是value值为null时,直接抛一个异常出去 ,用法如下所示:
1 2 User user = null ; Optional.ofNullable(user).orElseThrow(()->new Exception("用户不存在" ));
简单结果终止方法 按照前面介绍的,终止方法里面像count、max、min、findAny、findFirst、anyMatch、allMatch、nonneMatch 等方法,均属于这里说的简单结果终止方法。所谓简单,指的是其结果形式是数字、布尔值或者Optional对象值 等。
1 2 3 4 5 6 7 8 9 10 11 12 public void testSimpleStopOptions () { List<String> ids = Arrays.asList("111" ,"222" ,"333" ,"444" ,"555" ,"666" ,"777" , "888" ); System.out.println(ids.stream().filter(s -> s.length() > 2 ).count()); System.out.println(ids.stream().filter(s -> s.length() > 2 ).anyMatch("222" ::equals)); ids.stream().filter(s -> s.length() > 2 ) .findFirst() .ifPresent(s -> System.out.println("findFirst:" + s)); }
避坑提醒
这里需要补充提醒下,一旦一个Stream被执行了终止操作之后,后续便不可以再读这个流执行其他的操作 了,否则会报错,看下面示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void testHandleStreamAfterClosed () { List<String> ids = Arrays.asList("111" ,"222" ,"333" ,"444" ,"555" ,"666" ,"777" , "888" ); Stream<String> stream = ids.stream().filter(s -> s.length() > 2 ); System.out.println(stream.count()); System.out.println("-----下面会报错-----" ); try { System.out.println(stream.anyMatch("222" ::equals)); } catch (Exception e) { e.printStackTrace(); } System.out.println("-----上面会报错-----" ); }
1 2 3 4 5 6 7 8 9 8 -----下面会报错----- -----上面会报错----- java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229 ) at java.util.stream.ReferencePipeline.anyMatch(ReferencePipeline.java:449 ) at JavaKnowledge.StreamStudy.StreamFunction.testHandleStreamAfterClosed(StreamFunction.java:84 ) at JavaKnowledge.StreamStudy.StreamFunction.main(StreamFunction.java:20 )
因为stream已经被执行count()终止方法了,所以对stream再执行 anyMatch 方法的时候,就会报错stream has already been operated upon or closed ,这一点在使用的时候需要特别注意。
结果收集终止方法 因为Stream主要用于对集合数据的处理场景,所以除了上面几种获取简单结果的终止方法之外,更多的场景是获取一个集合类的结果对象,比如List、Set或者HashMap等 。
这里就需要collect方法 了,它可以支持生成如下类型的结果数据 :
一个集合类 ,比如List、Set或者HashMap等
StringBuilder对象 ,支持将多个字符串进行拼接处理并输出拼接后结果
一个可以记录个数或者计算总和的对象 (数据批量运算统计)
生成集合 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void testCollectStopOptions () { List<Student> ids = Arrays.asList(new Student(17 ), new Student(22 ), new Student(23 )); List<Student> collectList = ids.stream().filter(dept -> dept.getId() > 20 ) .collect(Collectors.toList()); System.out.println("collectList:" + collectList); Set<Student> collectSet = ids.stream().filter(dept -> dept.getId() > 20 ) .collect(Collectors.toSet()); System.out.println("collectSet:" + collectSet); Map<Integer, Student> collectMap = ids.stream().filter(dept -> dept.getId() > 20 ) .collect(Collectors.toMap(Student::getId, dept -> dept)); System.out.println("collectMap:" + collectMap); }
1 2 3 collectList:[Student(id=22 , name=学生, age=20 ), Student(id=23 , name=学生, age=20 )] collectSet:[Student(id=22 , name=学生, age=20 ), Student(id=23 , name=学生, age=20 )] collectMap:{22 =Student(id=22 , name=学生, age=20 ), 23 =Student(id=23 , name=学生, age=20 )}
生成拼接字符串 将一个List或者数组中的值拼接到一个字符串里并以逗号分隔开 ,这个场景在日常使用中很常见,如果通过for 循环和StringBuilder 去循环拼接,还得考虑下最后一个逗号如何处理的问题,很繁琐:
1 2 3 4 5 6 7 8 9 10 public void testForJoinStrings () { List<String> ids = Arrays.asList("111" ,"222" ,"333" ,"444" ,"555" ,"666" ,"777" , "888" ); StringBuilder builder = new StringBuilder(); for (String id : ids) { builder.append(id).append(',' ); } builder.deleteCharAt(builder.length() - 1 ); System.out.println("拼接后:" + builder.toString()); }
1 拼接后:111 ,222 ,333 ,444 ,555 ,666 ,777 ,888
Java8 提供的Stream 使用collect 可以轻而易举的实现,代码风格相对优雅:
1 2 3 4 5 public void testCollectJoinStrings () { List<String> ids = Arrays.asList("111" ,"222" ,"333" ,"444" ,"555" ,"666" ,"777" , "888" ); String joinResult = ids.stream().collect(Collectors.joining("," )); System.out.println("拼接后:" + joinResult); }
1 拼接后:111 ,222 ,333 ,444 ,555 ,666 ,777 ,888
数据批量数学运算 使用collect生成数字数据的总和信息 :
1 2 3 4 5 6 7 8 9 public void testNumberCalculate () { List<Integer> ids = Arrays.asList(10 , 20 , 30 , 40 , 50 ); Double average = ids.stream().collect(Collectors.averagingInt(value -> value)); System.out.println("平均值:" + average); IntSummaryStatistics summary = ids.stream().collect(Collectors.summarizingInt(value -> value)); System.out.println("数据统计信息: " + summary); }
1 2 平均值:30.0 数据统计信息: IntSummaryStatistics{count=5 , sum=150 , min=10 , average=30.000000 , max=50 }
并行Stream 机制说明 使用并行流,可以有效利用计算机的多CPU硬件,提升逻辑的执行速度 。并行流通过将一整个stream划分为****多个片段 ,然后对各个分片流并行执行处理逻辑 ,最后将各个分片流的执行结果汇总为一个整体流 。
约束与限制 并行流类似于多线程在并行处理 ,所以与多线程场景相关的一些问题同样会存在,比如死锁等问题,所以在并行流终止执行的函数逻辑,必须要保证线程安全 。
Stream相较于传统的foreach的方式处理stream,有什么优势? 优点:
代码更简洁 :偏声明式的编码风格,更容易体现出代码的逻辑意图
逻辑间解耦 :一个stream中间处理逻辑,无需关注上游与下游的内容,只需要按约定实现自身逻辑即可
并行流场景效率会比迭代器逐个循环更高
函数式接口,延迟执行 的特性,中间管道操作不管有多少步骤都不会立即执行,只有遇到终止操作的时候才会开始执行,可以避免一些中间不必要的操作消耗
缺点:
代码调测debug不便
程序员从历史写法切换到Stream时,需要一定的适应时间
Java时间类 Year Java 8中,Year类是一个不可变、线程安全的类,用于表示和操作年份 。它主要用于处理与年份相关的日期和时间操作 ,例如获取当前年份、创建指定年份的Date对象等。
核心方法 :
**now()**:该方法返回表示当前日期的Year对象。该方法使用系统默认的时区来获取当前年份。
**now(Clock clock)**:该方法返回表示指定时间戳的Year对象。该方法使用指定的Clock对象来获取当前年份。
**of(int year)**:该方法用于创建一个表示指定年份的Year对象。如果输入的年份参数不在范围内(公元1年-公元9999年),则会抛出IllegalArgumentException异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void test1 () { Year year = Year.now(); int currentYear = year.getValue(); System.out.println("当前年份:" + currentYear); Clock clock = Clock.systemDefaultZone(); Year yearFromClock = Year.now(clock); int yearFromClockValue = yearFromClock.getValue(); System.out.println("指定时间戳的年份:" + yearFromClockValue); Year customYear = Year.of(2024 ); int customYearValue = customYear.getValue(); System.out.println("自定义年份:" + customYearValue); }
1 2 3 当前年份:2024 指定时间戳的年份:2024 自定义年份:2024
YearMonth YearMonth类代表一个特定的年和月,可以表示任何合法的年和月组合 ,例如2020-02。它主要用于处理与年和月相关的日期和时间操作。
核心方法 :
**now()**:获取当前年份和月份。例如,YearMonth.now()将返回当前的年和月。
**String format(DateTimeFormatter formatter)**:使用特定的格式格式化year-month。
**int get(TemporalField field)**:返回日期中特定域的int类型
**of(int year, int month)**:创建一个表示特定年和月的YearMonth对象。例如,YearMonth.of(2024, 1)将创建一个表示2024年1月的YearMonth对象。
**lengthOfMonth()**:返回当前YearMonth实例有多少天。例如,YearMonth.of(2020, 2).lengthOfMonth()将返回29,因为2020年是闰年,2月有29天。
1 2 3 4 5 6 7 8 public void test2 () { YearMonth currentYearMonth = YearMonth.now(); System.out.printf("这个月的年月 %s 有 %d 天 %n" , currentYearMonth, currentYearMonth.lengthOfMonth()); YearMonth creditCardExpiry = YearMonth.of(2025 , Month.JULY); System.out.printf("你输入的年月是 %s %n" , creditCardExpiry); }
1 2 这个月的年月 2024 -01 有 31 天 你输入的年月是 2025 -07
这个示例中,YearMonth.now()方法获取了当前的年和月,并使用lengthOfMonth()方法获取了当前年月的天数。另外,YearMonth.of(2025, Month.JULY)方法创建了一个表示2025年7月的YearMonth对象,并输出了该对象。
格式化为字符串
1 2 3 4 5 6 7 8 public void test3 () { YearMonth ym = YearMonth.now(); String s = ym.format(DateTimeFormatter.ofPattern("MM yyyy" )); System.out.println(s); YearMonth start = YearMonth.parse("2023-01-01" , DateTimeFormatter.ofPattern("yyyy-MM-dd" )); boolean flag = ym.isAfter(start); System.out.println(ym + "比" + start + "时间更晚吗?" + flag); }
1 2 01 2024 2024 -01 比2023 -01 时间更晚吗?true
获取特定域
1 2 3 4 5 6 7 public void test4 () { YearMonth y = YearMonth.now(); long l1 = y.get(ChronoField.YEAR); System.out.println(l1); long l2 = y.get(ChronoField.MONTH_OF_YEAR); System.out.println(l2); }
增加月份
1 2 3 4 5 6 public void test5 () { YearMonth ym1 = YearMonth.now(); System.out.println(ym1); YearMonth ym2 = ym1.plus(Period.ofYears(2 )); System.out.println(ym1 + "加两年后为" + ym2); }
1 2 2024 -01 2024 -01 加两年后为2026 -01
YearMonth与LocalDate的转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public static LocalDate parseDateYearMonth (YearMonth yearMonth) { return LocalDate.of(yearMonth.getYear(),yearMonth.getMonthValue(),1 ); } public static YearMonth parseYearMonth (LocalDate localDate) { return YearMonth.of(localDate.getYear(),localDate.getMonthValue()); } public void test6 () { YearMonth curYM = YearMonth.now(); System.out.println(curYM); LocalDate curYMlocalDate = parseDateYearMonth(curYM); System.out.println("LocalDate类型的当前年月为" + curYMlocalDate); YearMonth curYearMonth = parseYearMonth(curYMlocalDate); System.out.println("YearMonth类型的当前年月为" + curYearMonth); }
1 2 3 2024 -01 LocalDate类型的当前年月为2024 -01 -01 YearMonth类型的当前年月为2024 -01
Java比较时间差几年/几个月
1 2 3 4 5 6 7 8 9 10 11 12 public static void test7 () { YearMonth yearMonthFirst = YearMonth.of(2022 , 6 ); YearMonth yearMonthSecond = YearMonth.of(2022 , 1 ); YearMonth yearMonthNow = YearMonth.now(); int FirstToSecond = yearMonthFirst.compareTo(yearMonthSecond); int FirstToNow = yearMonthFirst.compareTo(yearMonthNow); int NowToFirst = yearMonthNow.compareTo(yearMonthFirst); System.out.println(FirstToSecond); System.out.println(FirstToNow); System.out.println(NowToFirst); }
MonthDay 在Java 8中,MonthDay是一个非常实用的类,它用于处理只有月和日的信息,而没有年和其他时间信息的情况 。这可以用于处理生日、纪念日和星座等周期性问题。
核心方法 :
of(int month, int day): 创建一个表示特定月日的MonthDay对象。
from(LocalDate) : 从给定的LocalDate对象中提取月和日的信息,创建一个新的MonthDay对象。
equals(Object): 比较两个MonthDay对象是否相等。
这些方法的具体使用示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public void test8 () { MonthDay monthDay = MonthDay.of(1 , 15 ); LocalDate localDate = LocalDate.now(); MonthDay day = MonthDay.from(localDate); if (monthDay.equals(day)) { System.out.println("两个MonthDay对象相等" ); } else { System.out.println("两个MonthDay对象不相等" ); } LocalDate nowDate = LocalDate.now(); LocalDate date = LocalDate.of(2024 , Month.JANUARY, 1 ); long days = ChronoUnit.DAYS.between(date, nowDate); System.out.println(nowDate + "距离" + date + "有" + days + "天" ); }
1 2 两个MonthDay对象相等 2024 -01 -15 距离2024 -01 -01 有14 天
此外,你还可以使用MonthDay对象来计算两个日期之间相差的天数、月数或年数 。例如,要计算两个给定的日期之间包含多少天,多少周或者多少年,可以使用**ChronoUnit.DAYS.between(), ChronoUnit.WEEKS.between(), ChronoUnit.YEARS.between()**等方法。
DayOfWeek Java 8中的DayOfWeek是一个不可变的、线程安全的枚举,表示一周中的一天,如MONDAY、TUESDAY等,适用于需要表示一周中某一天的场景 。除了日期名称,DayOfWeek也有一个数字值。可以使用数字值来获取日期名称,也可以通过日期名称来获取数字值。
核心方法 :
**getValue()**:获取数字值(1-7)。
**toString()**:获取日期名称(如MONDAY、TUESDAY等)。
**of()**:通过数字值创建DayOfWeek对象。
1 2 3 4 5 6 7 8 9 10 11 public void test9 () { DayOfWeek dayOfWeek = DayOfWeek.MONDAY; int dayNumber = dayOfWeek.getValue(); System.out.println(dayNumber); dayOfWeek = DayOfWeek.MONDAY; String dayName = dayOfWeek.toString(); System.out.println(dayName); DayOfWeek dayOfWeekFromNumber = DayOfWeek.of(1 ); System.out.println(dayOfWeekFromNumber); }
总结 Year、YearMonth、MonthDay、DayOfWeek是Java 8中新增的日期时间API的一部分 ,它们提供了更灵活和强大的日期和时间处理能力。这些类都是不可变的,意味着一旦创建了对象,其值就不能更改 。这种设计使得对这些对象的并发操作更加安全和高效。同时,这些类都实现了Comparable接口,可以根据时间顺序进行比较。
ConcurrentHashMap和LongAdder()的配合使用 使用 ConcurrentHashMap 来统计,Key 的范围是 10。使用最多 10 个并发,循环操作 1000 万次,每次操作累加随机的 Key。如果 Key 不存在的话,首次设置值为 1。normaluse()方法直接使用synchronized加锁的方式保证线程安全,锁的粒度很大,虽然能够保证完成业务需求,且线程安全,但性能低下 。gooduse()方法使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断 Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为 Value,也就是新创建一个 LongAdder 对象,最后返回 Value 。由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.LongAdder;import java.util.stream.Collectors;import java.util.stream.IntStream;public class ConcurrentHashMapTest { private static int LOOP_COUNT = 10000000 ; private static int THREAD_COUNT = 10 ; private static int ITEM_COUNT = 10 ; private Map<String, Long> normaluse () throws InterruptedException { ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1 , LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); synchronized (freqs) { if (freqs.containsKey(key)) { freqs.put(key, freqs.get(key) + 1 ); } else { freqs.put(key, 1L ); } } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1 , TimeUnit.HOURS); return freqs; } private Map<String, Long> gooduse () throws InterruptedException { ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1 , LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1 , TimeUnit.HOURS); return freqs.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey(), e -> e.getValue().longValue()) ); } public static void main (String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest(); concurrentHashMapTest.gooduse(); long endTime = System.currentTimeMillis(); long durationInMilli = endTime - startTime; System.out.println("方法执行时间(毫秒):" + durationInMilli); } }
normaluse()方法执行用时:
gooduse()方法执行用时:
优化后的代码,相比使用锁来操作 ConcurrentHashMap 的方式,性能提升了 4 倍。Java 自带的 Unsafe 实现的 CAS,在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多。
CopyOnWriteArrayList的合理使用 在 Java 中,CopyOnWriteArrayList 虽然是一个线程安全的 ArrayList ,但其实现方式是,每次修改数据时都会复制一份数据出来 ,所以仅适用于读多写少或者说希望无锁读的场景 。如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会十分低下。下面我们写一个demo比较一下使用 CopyOnWriteArrayList 和使用普通加锁方式的 ArrayList 的读写性能,在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一定次数的写或读操作的耗时。
pom文件中加入依赖,才可以使用统计耗时的StopWatch工具类:
1 2 3 4 5 <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3 .19 </version> </dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import org.springframework.util.StopWatch;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ThreadLocalRandom;import java.util.stream.Collectors;import java.util.stream.IntStream;public class CopyOnWriteTest { public Map testWrite () { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); StopWatch stopWatch = new StopWatch(); int loopCount = 100000 ; stopWatch.start("写操作:copyOnWriteArrayList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); stopWatch.start("写操作:synchronizedList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList" , copyOnWriteArrayList.size()); result.put("synchronizedList" , synchronizedList.size()); return result; } private void addAll (List<Integer> list) { list.addAll(IntStream.rangeClosed(1 , 1000000 ).boxed().collect(Collectors.toList())); } public Map testRead () { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); addAll(copyOnWriteArrayList); addAll(synchronizedList); StopWatch stopWatch = new StopWatch(); int loopCount = 1000000 ; int count = copyOnWriteArrayList.size(); stopWatch.start("读操作:copyOnWriteArrayList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); stopWatch.start("读操作:synchronizedList" ); IntStream.range(1 , loopCount).parallel().forEach(c -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList" , copyOnWriteArrayList.size()); result.put("synchronizedList" , synchronizedList.size()); return result; } public static void main (String[] args) { CopyOnWriteTest copyOnWriteTest = new CopyOnWriteTest(); Map map = copyOnWriteTest.testWrite(); Map map1 = copyOnWriteTest.testRead(); System.out.println(map.toString()); System.out.println(map1.toString()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 输出代码执行耗时,以及执行时间百分比:StopWatch '' : running time = 2943391700 ns --------------------------------------------- ns % Task name --------------------------------------------- 2932055700 100 % 写操作:copyOnWriteArrayList011336000 000 % 写操作:synchronizedList输出代码执行耗时,以及执行时间百分比:StopWatch '' : running time = 122014900 ns --------------------------------------------- ns % Task name --------------------------------------------- 015497400 013 % 读操作:copyOnWriteArrayList 106517500 087% 读操作:synchronizedList{copyOnWriteArrayList=100000 , synchronizedList=100000 } {copyOnWriteArrayList=1000000 , synchronizedList=1000000 }
从实践结果可以看出,大量写的场景(10 万次 add 操作),CopyOnWriteArray 比 ArrayList 慢一百倍不止,而读操作CopyOnWriteArray性能优于 ArrayList,是ArrayList性能的6.7倍。原因在于CopyOnWriteArrayList 中的 add 方法,在每次 add 时,都会用 Arrays.copyOf 创建一个新数组,频繁 add 时内存的申请释放消耗会很大:
1 2 3 4 5 6 7 8 9 10 public boolean add (E e) { synchronized (lock) { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } }
综上所述,使用 CopyOnWriteArrayList 要注意使用场景是否为读多写少的场景或者说希望无锁读的场景 。
@Data使用注意事项 将@Data注解加在类上可以帮助我们根据类中的字段自动生成get()、set()、equals() 和 hashcode()方法,使用@Data注解需要导入lombok依赖:
1 2 3 4 5 6 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18 .20 </version> <scope>provided</scope> </dependency>
我们实践一下这个注解是否自动生成equals() 和 hashcode()方法,并能正确判断两个对象是否相同,创建一个Person类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import lombok.Data;@Data class Person { private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Marry" ,"123456" ); System.out.println(person1.equals(person2)); } }
由于两个对象的name字段的值不同,所以两个对象不相同,name改成一致的,再测试:
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Chris" ,"123456" ); System.out.println(person1.equals(person2)); } }
此时两个name和identity两个字段都相同,返回true,符合我们的预期,已实践验证。如果你希望只要identity字段值一致就认为是同一个Person的话,可以使用 @EqualsAndHashCode.Exclude 注解来修饰 name 字段,从 equals 和 hashCode 的实现中排除 name 字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import lombok.Data;import lombok.EqualsAndHashCode;@Data class Person { @EqualsAndHashCode .Exclude private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Marry" ,"123456" ); System.out.println(person1.equals(person2)); } }
实践验证了此时只要identity值相同,则两个对象相同,在equals 和 hashCode 的实现中排除了name字段。
如果类之间有继承关系上述实践还会生效吗?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import lombok.Data;import lombok.EqualsAndHashCode;@Data class Person { @EqualsAndHashCode .Exclude private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } public Person () { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import lombok.Data;@Data class Employee extends Person { private String company; public Employee (String name, String identity, String company) { super (name, identity); this .company = company; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Employee employee1 = new Employee("Chris" ,"123456" , "sky" ); Employee employee2 = new Employee("Marry" ,"666666" , "sky" ); System.out.println(employee1.equals(employee2)); } }
实践得知,子类Employee在equals 和 hashCode 的实现中默认只考虑了自身特有的字段company,父类Person的identity字段没有考虑在内(name刚才已经通过注解排除了),为解决这个问题,我们可以手动设置@EqualsAndHashCode注解的配置 callSuper 开关为 true,来修改这种默认行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import lombok.Data;import lombok.EqualsAndHashCode;@Data @EqualsAndHashCode(callSuper = true) class Employee extends Person { private String company; public Employee (String name, String identity, String company) { super (name, identity); this .company = company; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Employee employee1 = new Employee("Chris" ,"123456" , "sky" ); Employee employee2 = new Employee("Marry" ,"666666" , "sky" ); System.out.println(employee1.equals(employee2)); } }
修改后的代码,实现了同时以子类的属性 company 加上父类中的属性 identity,作为 equals 和 hashCode 方法的实现条件 (实现上其实是调用了父类的 equals 和 hashCode)。
实现排序的 Comparable 和 Comparator
Comparable 接口有一个 compareTo(Object obj)方法用来排序
Comparator 接口有一个compare(Object obj1, Object obj2)方法用来排序
CompletableFuture使用实践 CompletableFuture 和 Future使用实践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Task { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;public class TaskService { public User getUserInfo (Long userId) throws InterruptedException { Thread.sleep(200 ); return new User(1L , "Chris" ); } public Task getTaskInfo (Long taskId) throws InterruptedException { Thread.sleep(300 ); return new Task(1L , "学习" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import org.example.ideaspringboottest.JavaTest.DesensitizationTest.UserInfo;import java.util.concurrent.*;public class FutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); TaskService taskService = new TaskService(); Long userId =1L ; Long taskId =1L ; Long startTime = System.currentTimeMillis(); FutureTask<User> userInfoFutureTask = new FutureTask<>(new Callable<User>() { @Override public User call () throws Exception { return taskService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); FutureTask<Task> taskInfoFutureTask = new FutureTask<>(new Callable<Task>() { @Override public Task call () throws Exception { return taskService.getTaskInfo(taskId); } }); executorService.submit(taskInfoFutureTask); User user = userInfoFutureTask.get(); Task task = taskInfoFutureTask.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class CompletableFutureTest { public static void main (String[] args) throws InterruptedException, ExecutionException, TimeoutException { TaskService taskService = new TaskService(); Long userId =1L ; Long taskId =1L ; Long startTime = System.currentTimeMillis(); CompletableFuture<User> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> { try { return taskService.getUserInfo(userId); } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture<Task> completableTaskInfoFuture = CompletableFuture.supplyAsync(() -> { try { return taskService.getTaskInfo(taskId); } catch (InterruptedException e) { throw new RuntimeException(e); } }); User user = completableUserInfoFuture.get(2 , TimeUnit.SECONDS); Task task = completableTaskInfoFuture.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms" ); } }
CompletableFuture 和 Future 的区别
功能和灵活性
Future:
java.util.concurrent.Future 是 Java 5 引入的接口,用于表示一个异步操作的未来结果。它提供了基本的异步操作支持,可以用于检查是否完成、等待结果以及获取结果,但在处理结果、异常和组合等方面功能有限 。
CompletableFuture:
java.util.concurrent.CompletableFuture 是 Java 8 引入的类,扩展了 Future 的功能,提供了更强大和灵活的异步编程支持。它允许更容易地处理异步操作的结果、异常和组合,以及在操作完成时执行回调等 。
异步编程支持
Future:
Future 只能用于异步操作的基本管理,如提交任务和获取结果 。
CompletableFuture:
CompletableFuture 提供了丰富的操作符、方法和组合功能,可以在异步操作之间进行更复杂的处理,如转换、组合、串联、并行等 。
组合和链式操作
Future:
Future 没有提供内置的操作符来对多个异步操作进行链式组合 。
CompletableFuture:
CompletableFuture 支持在操作完成后进行链式操作,使多个异步操作可以依次执行,以及在其中一个或多个操作完成后执行其他操作 。
异常处理
Future:
Future 的异常处理相对有限,通常需要使用 try-catch 块来捕获操作过程中的异常 。
CompletableFuture:
CompletableFuture 具有更强大的异常处理机制,可以使用 exceptionally()、handle() 等方法来处理操作过程中的异常 。
回调执行
Future:
Future 不支持在操作完成时执行回调操作 。
CompletableFuture:
CompletableFuture 支持使用 thenApply()、thenCompose()、thenCombine() 等方法来在操作完成后执行回调 。
综上所述,CompletableFuture 提供了更丰富的异步编程支持和功能,使处理异步操作的结果、异常和组合变得更加灵活和便捷 。如果使用的是 Java 8 或更高版本,通常建议使用 CompletableFuture 来实现更高级的异步编程需求。
CompletableFuture常用方法 创建异步任务 CompletableFuture创建异步任务,一般有supplyAsync 和runAsync 两个方法
supplyAsync 执行CompletableFuture任务,支持有返回值 。
runAsync 执行CompletableFuture任务,没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class RunAsyncTest { public static void main (String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("这是runAsync方法" ), executor); CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.println("这是supplyAsync方法" ); return "B站:不吃辣的Chris" ; }, executor); System.out.println(runFuture.join()); System.out.println(supplyFuture.join()); executor.shutdown(); } }
1 2 3 4 这是runAsync方法 null 这是supplyAsync方法 B站:不吃辣的Chris
任务异步回调 thenRun/thenRunAsync CompletableFuture的thenRun 方法负责做完第一个任务后,再做第二个任务 。某个任务执行完成后,执行回调方法 ;但是前后两个任务没有参数传递,第二个任务也没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenRunTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第二个CompletableFuture方法任务" ); }); System.out.println(thenRunFuture.get()); } }
1 2 3 4 5 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture方法任务 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第二个CompletableFuture方法任务 null
thenRun 和thenRunAsync有什么区别
1 2 3 4 5 6 7 8 9 10 private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(asyncPool, action); }
如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池 。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThenRunAsyncTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; },executor ); CompletableFuture thenRunFuture = orgFuture.thenRunAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第二个CompletableFuture方法任务" ); }); System.out.println(thenRunFuture.get()); } }
1 2 3 4 5 当前线程名称:pool-1 -thread-1 这是第一个CompletableFuture方法任务 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第二个CompletableFuture方法任务 null
thenAccept/thenAcceptAsync CompletableFuture的thenAccept 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中 ,但是回调方法是没有返回值 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenAcceptTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> { if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + "!" ); } System.out.println("一键三连加关注哦~" ); }); System.out.println(thenAcceptFuture.get()); } }
1 2 3 4 第一个CompletableFuture方法任务 第一个方法的结果是 B站:不吃辣的Chris! 一键三连加关注哦~ null
thenApply/thenApplyAsync CompletableFuture的thenApply 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中 ,并且回调方法是有返回值的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenApplyTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> { if ("B站:不吃辣的Chris" .equals(a)) { return "第一个方法的结果是 " + a + " ,已一键三连加关注!" ; } return "不是 " + a + " 哦~" ; }); System.out.println(thenApplyFuture.get()); } }
1 2 第一个CompletableFuture方法任务 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注!
exceptionally CompletableFuture的exceptionally 方法表示,某个任务执行异常时,执行的回调方法,并且有抛出异常作为参数,传递到回调方法 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ExceptionallyTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "出现异常,请及时处理!" ; }); System.out.println(exceptionFuture.get()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 当前线程名称:ForkJoinPool.commonPool-worker-1 出现异常,请及时处理! java.util.concurrent.CompletionException: java.lang.RuntimeException at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315 ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760 ) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373 ) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182 ) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655 ) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622 ) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165 ) Caused by: java.lang.RuntimeException at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.ExceptionallyTest.lambda$main$0 (ExceptionallyTest.java:12 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768 ) ... 6 more
whenComplete方法 CompletableFuture的whenComplete 方法表示,某个任务执行完成后,执行的回调方法,****无返回值 ;并且whenComplete方法返回的CompletableFuture的result是****上个任务的结果 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class WhenCompleteTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture任务" ); try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("第一个任务执行结束,执行结果为 " + a + " " ); if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!" ); } System.out.println("耶~" ); }); System.out.println(rstFuture.get()); } }
1 2 3 4 5 6 7 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture任务 当前线程名称:ForkJoinPool.commonPool-worker-1 第一个任务执行结束,执行结果为 B站:不吃辣的Chris 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注! 耶~ B站:不吃辣的Chris
handle方法 CompletableFuture的handle 方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的 ,并且handle方法返回的CompletableFuture的result是****回调方法执行的结果 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class HandleTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture任务" ); try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("第一个任务执行结束,执行结果为 " + a + " " ); if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!" ); return "已关注" ; } System.out.println("耶~" ); return null ; }); System.out.println(rstFuture.get()); } }
1 2 3 4 5 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture任务 第一个任务执行结束,执行结果为 B站:不吃辣的Chris 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注! 已关注
任务组合处理 AND组合处理 thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务 。
区别在于:
thenCombine :会将两个任务的执行结果作为方法入参,传递到指定方法中 ,且有返回值
thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中 ,且无返回值
runAfterBoth :不会把执行结果当做方法入参 ,且没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.*;public class ThenCombineAsyncTest { public static void main (String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("这是第一个CompletableFuture任务" ); ExecutorService executor = Executors.newFixedThreadPool(10 ); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "这是第二个CompletableFuture任务" , executor) .thenCombineAsync(first, (s, w) -> { System.out.println(w); System.out.println(s); return "这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合" ; }, executor); System.out.println(future.join()); executor.shutdown(); } }
1 2 3 这是第一个CompletableFuture任务 这是第二个CompletableFuture任务 这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合
OR组合处理 applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务 。
区别在于:
applyToEither :会将已经执行完成的任务,作为方法入参,传递到指定方法中 ,且有返回值 。
acceptEither : 会将已经执行完成的任务,作为方法入参,传递到指定方法中 ,且无返回值 。
runAfterEither :不会把执行结果当做方法入参 ,且没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class AcceptEitherAsyncTest { public static void main (String[] args) { CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000L ); System.out.println("第一个任务执行结束" );} catch (Exception e){ return "第一个任务异常执行出现了异常" ; } return "任务1结果" ; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { System.out.println("第二个任务执行结束" ); return "任务2结果" ;} , executor) .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); } }
AllOf 所有任务都执行完成后,才执行 allOf返回的CompletableFuture 。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class AllOfTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("执行第一个任务" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("执行第二个任务" ); System.out.println(1 /0 ); System.out.println("第二个任务执行结束" ); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("两个任务都执行结束了" ); }); System.out.println(allOfFuture.get()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 执行第一个任务 执行第二个任务 两个任务都执行结束了 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396 ) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073 ) at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.main(AllOfTest.java:20 ) Caused by: java.lang.ArithmeticException: / by zero at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.lambda$main$1 (AllOfTest.java:14 ) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804 ) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796 ) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373 ) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182 ) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655 ) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622 ) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165 )
AnyOf 任意一个任务执行完,就执行anyOf返回的CompletableFuture 。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class AnyOfTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(2000L ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行第一个任务" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("执行第二个任务" ); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("某一个任务执行结束了" ); }); anyOfFuture.join(); } }
thenCompose thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法 ,该方法会返回一个新的CompletableFuture实例 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThenComposeAsyncTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("执行第一个任务" ); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "执行第二个任务" , executor) .thenComposeAsync(data -> { System.out.println(data); return completedFuture; }, executor); System.out.println(future.join()); executor.shutdown(); } }
Future使用实践 ThreadPoolExecutor的submit()方法 Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。3 个 submit() 方法如下:
1 2 3 4 5 6 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
这 3 个 submit() 方法之间的区别在于方法参数不同。
提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口, Runnable 接口的 run() 方法是没有返回值的 ,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了 ,类似于 Thread.join()。
提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且 这个方法是有返回值的 ,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果 。
提交 Runnable 任务及结果引用 submit(Runnable task, T result):假设这个 方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result 。
下面示例一下** submit(Runnable task, T result)的使用,Runnable 接口的实现类 Task 需要声明一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。 result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据**。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Result { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 public class Task implements Runnable { Result result; Task(Result result){ this .result = result; } public void run () { Long resultId = result.getId(); System.out.println(resultId); result.setName("修改后的任务执行结果名称" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class SubmitTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1 ); Result result = new Result(); result.setId(1L ); result.setName("执行结果名称" ); Future<Result> future = executor.submit(new Task(result), result); Result result1 = future.get(); System.out.println(result1.getId()); System.out.println(result1.getName()); executor.shutdown(); } }
Future 接口 submit(Runnable task, T result)的使用实践过了,同时还发现上述三个submit方法返回值都是 Future 接口,Future 接口有 5 个方法,分别是取消任务的方法 cancel()、 判断任务是否已取消的方法 isCancelled()、 判断任务是否已结束的方法 isDone()以及 2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制 。通过 Future 接口提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是: 这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞 ,直到任务执行完才会被唤醒 。
1 2 3 4 5 6 7 8 9 10 boolean cancel (boolean mayInterruptIfRunning) ;boolean isCancelled () ;boolean isDone () ;get(); get(long timeout, TimeUnit unit);
FutureTask FutureTask基本使用 Future 是一个接口,而 FutureTask 是一个工具类,有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。
1 2 3 4 5 6 7 8 9 10 11 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; } public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的实践一下将 FutureTask 对象提交给 ThreadPoolExecutor 执行 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.FutureTask;public class FutureTaskTest { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(()-> 10 +22 ); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(futureTask); Integer result = futureTask.get(); executorService.shutdown(); System.out.println(result); } }
实践一下 FutureTask 对象直接被 Thread 执行,利用 FutureTask 对象可以获取子线程的执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class FutureTaskThreadTest { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(()-> 10 +22 ); Thread t1 = new Thread(futureTask); t1.start(); Integer result = futureTask.get(); System.out.println(result); } }
实践一个需要保证多个任务执行顺序的场景,例如煮茶这个过程可以拆分为:洗水壶、烧水和泡茶三个任务,在泡茶前需要洗茶壶、洗茶杯和准备茶叶三个任务,而洗水壶、烧水这两个任务可以和洗茶壶、洗茶杯和准备茶叶三个任务并行执行,在洗水壶、烧水这两个任务执行完后等待洗茶壶、洗茶杯和准备茶叶三个任务执行完成,然后就可以泡茶了。首先,我们创建了两个 FutureTask——futureTask1 和 futureTask2,futureTask1 完成洗水壶、烧开水、泡茶的任务,futureTask2 完成洗茶壶、洗茶杯、准备茶叶的任务;这里需要注意的是 futureTask1 这个任务在执行泡茶任务前,需要等待 futureTask2 把茶叶拿来,所以 futureTask1 内部需要引用 futureTask2,并在执行泡茶之前,调用 futureTask2 的 get() 方法实现等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;class T1Task implements Callable <String > { FutureTask<String> futureTask2; T1Task(FutureTask<String> futureTask2){ this .futureTask2 = futureTask2; } @Override public String call () throws Exception { System.out.println("T1:洗水壶..." ); TimeUnit.SECONDS.sleep(1 ); System.out.println("T1:烧开水..." ); TimeUnit.SECONDS.sleep(2 ); String futureTask2Result = futureTask2.get(); System.out.println("T1:准备茶叶:" +futureTask2Result); System.out.println("T1:泡茶..." ); return "上茶:" + futureTask2Result; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.Callable;import java.util.concurrent.TimeUnit;class T2Task implements Callable <String > { @Override public String call () throws Exception { System.out.println("T2:洗茶壶..." ); TimeUnit.SECONDS.sleep(1 ); System.out.println("T2:洗茶杯..." ); TimeUnit.SECONDS.sleep(2 ); System.out.println("T2:准备茶叶..." ); TimeUnit.SECONDS.sleep(5 ); return "西湖龙井" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class FutureTaskTea { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask2 = new FutureTask<>(new T2Task()); FutureTask<String> futureTask1 = new FutureTask<>(new T1Task(futureTask2)); Thread T1 = new Thread(futureTask1); T1.start(); Thread T2 = new Thread(futureTask2); T2.start(); System.out.println(futureTask1.get()); } }
1 2 3 4 5 6 7 8 T1:洗水壶... T2:洗茶壶... T2:洗茶杯... T1:烧开水... T2:准备茶叶... T1:准备茶叶:西湖龙井 T1:泡茶... 上茶:西湖龙井
FutureTask结合阻塞队列的使用 没有使用阻塞队列的实践:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.knowledgesharing.knowledgesharing.JavaUtils.CompletionServiceTest;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class TaskIncomeTestBlock { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3 ); Future<Integer> f1 = executor.submit(()-> { Thread.sleep(3000 ); return 1 ; }); Future<Integer> f2 = executor.submit(()->{return 2 ;}); Future<Integer> f3 = executor.submit(()->{return 3 ;}); Integer taskIncome1 = f1.get(); executor.execute(()-> { System.out.println("任务1的收益为:" + taskIncome1 + ",现保存入库" ); } ); Integer taskIncome2 = f2.get(); executor.execute(()-> { System.out.println("任务2的收益为:" + taskIncome2 + ",现保存入库" ); } ); Integer taskIncome3 = f3.get(); executor.execute(()-> { System.out.println("任务3的收益为:" + taskIncome3 + ",现保存入库" ); } ); executor.shutdown(); } }
1 2 3 任务1 的收益为:1 ,现保存入库 任务2 的收益为:2 ,现保存入库 任务3 的收益为:3 ,现保存入库
可以发现任务2和任务3都要等任务1的f1.get()执行结束才可以执行 ,也就是说调用f1.get()的主线程阻塞在了f1.get()处,只有执行结束后才能执行后续的逻辑,没有很好地利用异步执行 ,现结合阻塞队列进行优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import java.util.concurrent.*;public class TaskIncomeTestNotBlock { public static void main (String[] args) throws ExecutionException, InterruptedException { LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>(); ExecutorService executor = Executors.newFixedThreadPool(3 ); Future<Integer> f1 = executor.submit(()-> { Thread.sleep(3000 ); return 1 ; }); Future<Integer> f2 = executor.submit(()->{return 2 ;}); Future<Integer> f3 = executor.submit(()->{return 3 ;}); executor.execute(()-> { try { blockingQueue.put(f1.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); executor.execute(()-> { try { blockingQueue.put(f2.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); executor.execute(()-> { try { blockingQueue.put(f3.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); for (int i=0 ; i<3 ; i++) { Integer r = (Integer) blockingQueue.take(); executor.execute(()-> { System.out.println("任务的收益为:" + r + ",现保存入库" ); }); } executor.shutdown(); } }
1 2 3 任务的收益为:2 ,现保存入库 任务的收益为:3 ,现保存入库 任务的收益为:1 ,现保存入库
此时任务1的执行不会阻塞任务2和任务3的执行。
CompletionService实践 CompletionService基本使用 CompletionService 的实现原理是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,Future中结合阻塞队列的实践是把任务最终的执行结果放入了阻塞队列中。
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public ExecutorCompletionService (Executor executor) { if (executor == null ) throw new NullPointerException(); this .executor = executor; this .aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null ; this .completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService (Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null ) throw new NullPointerException(); this .executor = executor; this .aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null ; this .completionQueue = completionQueue; }
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。下面实践优化一下Future中结合阻塞队列实现3个任务的收益的计算和入库处理。
通过 CompletionService 接口提供的 submit() 方法提交三个任务收益计算的任务,这三个任务将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回任务执行结果了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.*;public class CompletionServiceTest { public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(3 ); CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor); completionService.submit(()-> { Thread.sleep(3000 ); return 1 ; }); completionService.submit(()->{return 2 ;}); completionService.submit(()->{return 3 ;}); for (int i = 0 ; i<3 ; i++) { Integer result = completionService.take().get(); executor.execute(()->{ System.out.println("任务的收益为:" + result + ",现保存入库" ); }); } executor.shutdown(); } }
1 2 3 任务的收益为:3 ,现保存入库 任务的收益为:2 ,现保存入库 任务的收益为:1 ,现保存入库
CompletionService 接口说明 CompletionService 接口提供的方法有** 5 个, submit() 相关的方法有两个, 一个方法参数是Callable, 另外一个方法有两个参数,分别是Runnable task和V result**,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result) ,
CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素 ,区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值 。 poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public Future<V> submit (Callable<V> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit (Runnable task, V result) { if (task == null ) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take () throws InterruptedException { return completionQueue.take(); } public Future<V> poll () { return completionQueue.poll(); } public Future<V> poll (long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
CompletionService实现方法高可用 在集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。比如你需要提供一个查询用户当前位置的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果,就可以直接返回了,也就是说可以容错 2 个地图服务商服务异常,但缺点是消耗的资源偏多。
下面实践一下:首先创建一个线程池 executor 、一个 CompletionService 对象 completionService 和一个Future类型的列表 futures ,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务进行执行 ,执行结束会返回一个 Future 对象到CompletionService的阻塞队列中 ,调用completionService.take()就能拿到并消费队列头部的一个Future ,也就是最快返回的任务,调用completionService.take().get()就能拿到这个任务的执行结果 。同时我们把这些 Future 对象保存在列表 futures 中,用于在finally中取消所有任务,因为我们想要的效果就是只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;public class HighAvailabilityTest { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3 ); CompletionService<String> completionService = new ExecutorCompletionService<>(executor); List<Future<String>> futures = new ArrayList<>(3 ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商1提供的用户地址" ;}) ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商2提供的用户地址" ;}) ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商3提供的用户地址" ;}) ); String loaction = "" ; try { for (int i = 0 ; i < 3 ; ++i) { loaction = completionService.take().get(); if (loaction != null ) { break ; } } } catch (ExecutionException e) { throw new RuntimeException(e); } finally { for (Future<String> future : futures) future.cancel(true ); } System.out.println(loaction); executor.shutdown(); } }
实践可知:每次运行程序都是不同服务商提供的用户地址,说明只要一个服务提供商提供了地址,则直接返回结果,保证了服务的高可用。
小结 当需要批量提交异步任务的时候建议使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单 。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待 。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽然看起来麻烦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
Fork/Join使用实践 分治任务模型 分治任务模型可分为两个阶段:
任务分解:将任务迭代地分解为子任务,直至子任务可以直接计算出结果;
结果合并:逐层合并子任务的执行结果,直至获得最终结果。
在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,往往都采用递归算法。
Fork/Join 的使用 Fork/Join 是一个并行计算的框架,用于支持分治任务模型 ,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解 ,Join 对应的是结果合并 。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool ,另一部分是分治任务 ForkJoinTask 。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask 。
ForkJoinTask 是一个抽象类,它的核心方法是 fork() 方法和 join() 方法 ,其中 fork() 方法会异步地执行一个子任务 ,而 join() 方法则会阻塞当前线程来等待子任务的执行结果 。ForkJoinTask ** 有两个子类—— RecursiveAction** 和 RecursiveTask ,它们都是用递归的方式来处理分治任务的 。这两个子类都**定义了抽象方法 compute()**,不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的 。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
实践一下用 Fork/Join 计算斐波那契数列。首先需要创建一个分治任务线程池 以及计算斐波那契数列的分治任务 ,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务 。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法 ,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.RecursiveTask;public class Fibonacci extends RecursiveTask <Integer > { final int n; Fibonacci(int n){ this .n = n; } protected Integer compute () { if (n <= 1 ) { return n; } Fibonacci f1 = new Fibonacci(n - 1 ); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2 ); return f2.compute() + f1.join(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.ForkJoinPool;public class ForkJoinTest { public static void main (String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(4 ); Fibonacci fibonacci = new Fibonacci(30 ); Integer result = forkJoinPool.invoke(fibonacci); System.out.println(result); } }
ForkJoinPool 工作原理 ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介 ;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程共享一个任务队列 。Fork/Join 并行计算的核心组件是 ForkJoinPool,ForkJoinPool 本质上也是一个生产者 - 消费者的实现 。
ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列 ,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中 。
如果工作线程对应的任务队列空了,也是不会“躺平”的,ForkJoinPool 支持一种叫做“任务窃取 ”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务 ,比如线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务 。如此一来,所有的工作线程都不会闲下来了。ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争 。
统计单词数量实践 现在有统计一个文件里面每个单词的数量的场景,可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果 。
开始实践一下,用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute()) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import java.util.HashMap;import java.util.Map;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class WordStatisticsTest { public static void main (String[] args) { String[] fc = {"hello world" , "I am Chris" , "I am fork" , "I am join" , "We are fork and join" }; ForkJoinPool fjp = new ForkJoinPool(3 ); MR mr = new MR(fc, 0 , fc.length); Map<String, Long> result = fjp.invoke(mr); result.forEach((k, v)-> System.out.println(k+":" +v)); } static class MR extends RecursiveTask <Map <String , Long >> { private String[] fc; private int start, end; MR(String[] fc, int fr, int to){ this .fc = fc; this .start = fr; this .end = to; } @Override protected Map<String, Long> compute () { if (end - start == 1 ) { return calc(fc[start]); } else { int mid = (start + end)/2 ; MR mr1 = new MR(fc, start, mid); mr1.fork(); MR mr2 = new MR(fc, mid, end); return merge(mr2.compute(), mr1.join()); } } private Map<String, Long> merge (Map<String, Long> r1, Map<String, Long> r2) { Map<String, Long> result = new HashMap<>(); result.putAll(r1); r2.forEach((k, v) -> { Long c = result.get(k); if (c != null ) result.put(k, c+v); else result.put(k, v); }); return result; } private Map<String, Long> calc (String line) { Map<String, Long> result = new HashMap<>(); String [] words = line.split("\\s+" ); for (String w : words) { Long v = result.get(w); if (v != null ) result.put(w, v+1 ); else result.put(w, 1L ); } return result; } } }
1 2 3 4 5 6 7 8 9 10 fork:2 world:1 are:1 and:1 Chris:1 I:3 join:2 hello:1 am:3 We:1
小结 Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果 。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool 。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好 。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的 。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数 ;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题 ,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务 。
线程池原理 自定义线程池实践 线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。实践创建一个简单的线程池 MyThreadPool,以辅助理解线程池的工作原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import java.util.ArrayList;import java.util.List;import java.util.concurrent.BlockingQueue;class MyThreadPool { BlockingQueue<Runnable> workQueue; List<WorkerThread> threads = new ArrayList<>(); MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){ this .workQueue = workQueue; for (int idx = 0 ; idx<poolSize; idx++){ WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } void execute (Runnable command) throws InterruptedException { workQueue.put(command); } class WorkerThread extends Thread { public void run () { while (true ){ Runnable task = null ; try { task = workQueue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } task.run(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class ThreadPoolTest { public static void main (String[] args) throws InterruptedException { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2 ); MyThreadPool pool = new MyThreadPool(10 , workQueue); pool.execute(()->{ System.out.println("I am Chris." ); }); } }
在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中 。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码 while 循环部分 。
使用 Java 中的线程池方式 Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,ThreadPoolExecutor 的构造函数有 7 个参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
可以把线程池类比为一个项目组,而线程就是项目组的成员。
corePoolSize :表示线程池保有的最小线程数 。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
maximumPoolSize :表示线程池创建的最大线程数 。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
keepAliveTime & unit :上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了 。
workQueue :工作队列,和上面示例代码的工作队列同义。
threadFactory :通过这个参数你可以自定义如何创建线程 ,例如你可以给线程指定一个有意义的名字。
handler :通过这个参数你可以自定义任务的拒绝策略 。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收 。至于拒绝的策略,你可以通过 handler 这个参数来指定 。ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy :提交任务的线程自己去执行该任务 。
AbortPolicy :默认的拒绝策略,直接抛出异常 throws RejectedExecutionException。
DiscardPolicy :直接丢弃任务,没有任何异常抛出 。
DiscardOldestPolicy :丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列 。
使用线程池注意事项 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors ,最重要的原因是: Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题 。所以强烈建议使用有界队列。
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略,因此默认拒绝策略要慎重使用 。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略,并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题 ,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常 。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理 ,你可以参考下面的示例代码。
1 2 3 4 5 6 7 try { } catch (RuntimeException x) { } catch (Throwable x) { }
CountDownLatch使用实践 有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CountDownLatch优化串行执行查询订单和查询运单的逻辑 。
创建一个 CountDownLatch,计数器的初始值等于 2,之后在模拟从数据库中查询订单的操作orders.add(orderList.get(index));和模拟从数据库中查询运单的操作dispatches.add(dispatchList.get(index));对应的两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用** latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() **来实现对计数器等于 0 的等待。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Dispatch { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class CountDownLatchTest { public static Vector<Order> orders = new Vector<>(); public static Vector<Dispatch> dispatches = new Vector<>(); public static Executor executor = Executors.newFixedThreadPool(2 ); public static void main (String[] args) throws InterruptedException { List<Order> orderList = new ArrayList<>(); List<Dispatch> dispatchList = new ArrayList<>(); orderList.add(new Order(1L , 10 )); orderList.add(new Order(2L , 12 )); dispatchList.add(new Dispatch(1L , 6 )); dispatchList.add(new Dispatch(2L , 8 )); for (int i = 0 ; i < orderList.size();i++) { CountDownLatch latch = new CountDownLatch(2 ); int index = i; executor.execute(()-> { orders.add(orderList.get(index)); latch.countDown(); }); executor.execute(()-> { dispatches.add(dispatchList.get(index)); latch.countDown(); }); latch.await(); System.out.println("执行对账操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index)); System.out.println("对账结果写入数据库操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index)); } } }
1 2 3 4 执行对账操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 对账结果写入数据库操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 执行对账操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 ) 对账结果写入数据库操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 )
CyclicBarrier使用实践 有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CyclicBarrier优化串行执行查询订单和查询运单的逻辑 。
首先创建一个计数器初始值为 2 的 CyclicBarrier,创建 CyclicBarrier 的时候,还需要传入一个回调函数,这样的话当计数器减到 0 的时候,会调用这个回调函数。线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询运单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。比较有意思的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,使用起来十分方便。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Dispatch { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class CyclicBarrierTest { public static Vector<Order> orders = new Vector<>(); public static Vector<Dispatch> dispatches = new Vector<>(); public static Executor executor = Executors.newFixedThreadPool(1 ); public static final CyclicBarrier barrier = new CyclicBarrier(2 , ()->{ executor.execute(()-> check() ); }); public static void main (String[] args) { List<Order> orderList = new ArrayList<>(); List<Dispatch> dispatchList = new ArrayList<>(); orderList.add(new Order(1L , 10 )); orderList.add(new Order(2L , 12 )); dispatchList.add(new Dispatch(1L , 6 )); dispatchList.add(new Dispatch(2L , 8 )); Thread T1 = new Thread(()->{ for (Order order : orderList){ orders.add(order); try { barrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); T1.start(); Thread T2 = new Thread(()->{ for (Dispatch dispatch : dispatchList) { dispatches.add(dispatch); try { barrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); T2.start(); } public static void check () { Order order = orders.remove(0 ); Dispatch dispatch = dispatches.remove(0 ); System.out.println("执行对账操作,order为:" + order + ",dispatch为" + dispatch); System.out.println("对账结果写入数据库,order为:" + order + ",dispatch为" + dispatch); } }
1 2 3 4 执行对账操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 对账结果写入数据库,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 执行对账操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 ) 对账结果写入数据库,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 )
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别:CountDownLatch 主要用来解决一个线程等待多个线程的场景 ,而 CyclicBarrier 是一组线程之间互相等待 。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过 。但** CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外, CyclicBarrier 还可以设置回调函数**。
ReadWriteLock实践 读多写少场景下,适用读写锁保证数据获取的安全和性能,例如缓存元数据、缓存基础数据等。
读写锁是什么 读写锁,某一个语言特有的,是一中设计的思想,所有的读写锁都遵守以下三条基本原则:
允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量 ,而互斥锁是不允许 的 ,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。
自定义一个缓存 实践一下,用 ReadWriteLock 实现一个缓存工具类。缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap 不是线程安全的,这里我们使用读写锁 ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,是支持可重入的。通过 rwl 创建了一把读锁和一把写锁。
Cache 这个工具类,提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法 put()。读缓存需要用到读锁,是 try{}finally{}这个格式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。
假设缓存的源头是数据库,如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,可以调用 w.lock() 来获取写锁。需要注意的是,在获取写锁之后,不能直接去查询数据库,而是在重新验证了一次缓存中是否存在,因为有可能已经有线程从数据库把这条数据写入缓存了,再次验证如果还是不存在,当前线程再执行从数据库写入缓存的操作 。比如高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get() 方法,并且参数 key 也是相同的。那么它们会同时执行到代码w.lock();处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程 T2 和 T3 会再有一个线程能够获取写锁,假设是 T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3 完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;class Cache { final Map<Long,User> m = new HashMap<>(); final ReadWriteLock rwl = new ReentrantReadWriteLock(); final Lock r = rwl.readLock(); final Lock w = rwl.writeLock(); public User get (Long key) { User user = null ; r.lock(); try { user = m.get(key); } finally { r.unlock(); } if (user != null ) { return user; } w.lock(); try { user = m.get(key); if (user == null ){ user = queryUserById(key); m.put(key, user); } } finally { w.unlock(); } return user; } public User put (Long key, User user) { w.lock(); try { user = m.get(key); if (user == null ){ user = queryUserById(key); m.put(key, user); } } finally { w.unlock(); } return user; } public User queryUserById (Long id) { return new User(id, "Chris" ); } }
StampedLock使用实践 使用实践 ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁 、悲观读锁 和乐观读 。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的 。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp 。相关的示例代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final StampedLock sl = new StampedLock(); long stamp = sl.readLock();try { } finally { sl.unlockRead(stamp); } long stamp = sl.writeLock();try { } finally { sl.unlockWrite(stamp); }
StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞 ;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞 。这里是乐观读,而不是乐观读锁,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些 。
下面模拟计算两点之间距离的场景实践一下:在 distanceFromOrigin() 这个方法中,首先通过调用 tryOptimisticRead() 获取了一个 stamp,这里的 tryOptimisticRead() 就是我们前面提到的乐观读。之后将共享变量 x 和 y 读入方法的局部变量中,不过需要注意的是,由于 tryOptimisticRead() 是无锁的,所以共享变量 x 和 y 读入方法局部变量时,x 和 y 有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用 validate(stamp) 来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import java.util.concurrent.locks.StampedLock;class Point { private int x, y; final StampedLock sl = new StampedLock(); int distanceFromOrigin () { long stamp = sl.tryOptimisticRead(); int curX = x; int curY = y; if (!sl.validate(stamp)){ stamp = sl.readLock(); try { curX = x; curY = y; } finally { sl.unlockRead(stamp); } } return (int ) Math.sqrt(curX * curX + curY * curY); } }
如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。升级为悲观读锁,代码简练且不易出错,建议在具体实践时也采用这样的方法。
注意事项 StampedLock 在命名上并没有增加 Reentrant,所以表明 StampedLock 是不可重入的 。StampedLock 的悲观读锁、写锁都不支持条件变量 。如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升 。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.locks.LockSupport;import java.util.concurrent.locks.StampedLock;public class StampedLockInterruptTest { public static void main (String[] args) throws InterruptedException { final StampedLock lock = new StampedLock(); Thread T1 = new Thread(()->{ lock.writeLock(); LockSupport.park(); }); T1.start(); Thread.sleep(100 ); Thread T2 = new Thread(()-> lock.readLock() ); T2.start(); Thread.sleep(100 ); T2.interrupt(); T2.join(); } }
所以,使用 StampedLock 一定不要调用中断操作 ,如果需要支持中断功能,一定**使用可中断的悲观读锁 ****readLockInterruptibly() ****和写锁 ****writeLockInterruptibly()**。
Semaphore实践 基本概念 Semaphore是信号量,比如我们开车时在等的红绿灯,红灯停,绿灯行,根据红绿灯的信号,车和行人决定做出等待还是前行的行为。信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。
init():设置计数器的初始值。
down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
上述三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。
信号量使用 举一个累加器的例子,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。用信号量控制互斥的方法,就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作。下面的实践中,acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import java.util.concurrent.Semaphore;public class SemaphoreBasicTest { static int count = 0 ; static final Semaphore s = new Semaphore(1 ); public static void main (String[] args) throws InterruptedException { addOne(); System.out.println(count); addOne(); System.out.println(count); } static void addOne () throws InterruptedException { s.acquire(); try { count+=1 ; } finally { s.release(); } } }
假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0 ,另外一个线程(T2)则是将计数器减为 -1 。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以****线程 T1 会继续执行 ;对于线程 T2,信号量里面的计数器的值是 -1,小于 0(自己想要执行就得大于0 ),按照信号量模型里对 down() 操作的描述,****线程 T2 将被阻塞 。所以此时只有线程 T1 会进入临界区执行count+=1 ;。当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0(想要唤醒其他线程就得小于等于0 ) ,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒 。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。
实践实现一个限流器 有 Java SDK 里面提供了 Lock,还要提供一个 Semaphore 的原因在于,实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。比如各种池化资源,例如连接池、对象池、线程池等等,比如数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
我们实践一下通过Semaphore 创建一个对象池 ,也就是指的是一次性创建出多个对象,之后所有的线程重复利用这些对象,当然对象在被释放前,也是不允许其他线程使用的 。对象池,可以用 List 保存实例对象,限流器的限流指的是不允许多于 N 个线程同时进入临界区,使用信号量解决这个问题。信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就可以解决对象池的限流问题了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import java.util.List;import java.util.concurrent.Semaphore;import java.util.function.Function;class ObjPool <T , R > { final List<T> pool; final Semaphore semaphore; ObjPool(int size, List<T> list){ pool = list; semaphore = new Semaphore(size); } R exec (Function<T,R> func) throws InterruptedException { T t = null ; semaphore.acquire(); try { t = pool.remove(0 ); return func.apply(t); } finally { pool.add(t); semaphore.release(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SemaphoreTest { public static void main (String[] args) throws InterruptedException { List<User> list = new ArrayList<>(); list.add(new User(1L , "Chris" )); list.add(new User(2L , "Zhangsan" )); list.add(new User(3L , "John" )); ObjPool<User, String> pool = new ObjPool<User, String>(3 , list); ExecutorService executor = Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 3 ; i++) { executor.execute(() -> { try { String result = pool.exec(param -> { try { Thread.sleep(3000 ); return param.getName(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println(result); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } } }
用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 3,信号量的计数器初始化为 3 ,那么前 3 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上 。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
不变性(Immutability)模式 不变性(Immutability)模式就是对象一旦被创建之后,状态就不再发生变化,即变量一旦被赋值,就不允许修改了(没有写操作),没有修改操作,也就是保持了不变性。
实现具备不可变性的类 将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了 。更严格的做法是这个类本身也是 final 的,也就是不允许继承 。因为子类可以覆盖父类的方法,有可能改变不可变性。
Java SDK 里很多类都具备不可变性,例如经常用到的 String 和 Long、Integer、Double 等基础类型的包装类都具备不可变性 ,这些对象的线程安全性都是靠不可变性来保证的 ,它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的 。
例如String 这个类以及它的属性 value[]都是 final 的,而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public String replace (char oldChar, char newChar) { if (oldChar != newChar) { int len = value.length; int i = -1 ; char [] val = value; while (++i < len) { if (val[i] == oldChar) { break ; } } if (i < len) { char buf[] = new char [len]; for (int j = 0 ; j < i; j++) { buf[j] = val[j]; } while (i < len) { char c = val[i]; buf[i] = (c == oldChar) ? newChar : c; i++; } return new String(buf, true ); } } return this ; }
如果具备不可变性的类,需要提供类似修改的功能,具体就是创建一个新的不可变对象 ,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性 。所有的修改操作都创建一个新的不可变对象,如果不利用缓存,则十分浪费内存,因此引出了缓存的设计 。
享元模式避免创建重复对象 利用享元模式可以减少创建对象的数量,从而减少内存占用。Java中 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式 。享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑:创建之前,首先去对象池里看看是不是存在;如果已经存在,就利用对象池里的对象;如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private static class IntegerCache { static final int low = -128 ; static final int high; static final Integer cache[]; static { int h = 127 ; String integerCacheHighPropValue = sun.misc.VM.getSavedProperty("java.lang.Integer.IntegerCache.high" ); if (integerCacheHighPropValue != null ) { try { int i = parseInt(integerCacheHighPropValue); i = Math.max(i, 127 ); h = Math.min(i, Integer.MAX_VALUE - (-low) -1 ); } catch ( NumberFormatException nfe) { } } high = h; cache = new Integer[(high - low) + 1 ]; int j = low; for (int k = 0 ; k < cache.length; k++) cache[k] = new Integer(j++); assert IntegerCache.high >= 127 ; } private IntegerCache () {} } public static Integer valueOf (int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); }
需要注意的是:基本上所有的基础类型的包装类都不适合做锁,因为它们内部用到了享元模式,这会导致看上去私有的锁,其实是共有的 ,实践一下:
1 2 3 4 5 6 7 8 9 10 11 12 public class Student { public Integer num = Integer.valueOf(1 ); public void test () throws InterruptedException { synchronized (num) { Thread.sleep(3000 ); System.out.println("我是Student~" ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 public class User { public Integer num = Integer.valueOf(1 ); public void test () throws InterruptedException { synchronized (num) { Thread.sleep(3000 ); System.out.println("我是User~" ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.*;public class IntegerLockTest { public static void main (String[] args) { Student student = new Student(); User user = new User(); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { try { student.test(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.execute(() -> { try { user.test(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.shutdown(); } }
执行了6s,说明同一时刻,只有一个线程获取到了Integer类型的对象1的锁。
使用不可变性注意事项
对象的所有属性都是 final 的,并不能保证不可变性
不可变对象也需要正确发布
Java 中final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。
1 2 3 4 5 6 7 8 9 10 @Data @AllArgsConstructor @NoArgsConstructor public class Student02 { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor public class School { final Student02 student; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class IntegerLockTest02 { public static void main (String[] args) { School school = new School(new Student02(1L , "Chris" ), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { school.student.setId(2L ); }); pool.execute(() -> { school.student.setId(3L ); }); pool.shutdown(); System.out.println(school.getStudent().getId()); } }
实践证明:虽然School类中Student02为final类型,但Student02类型的成员变量student的值可以在多个线程中被访问和修改 。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性 。
不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。实践一下:Student03 具备不可变性,线程安全,但是类 School02 并不是线程安全的,类 School02 中持有对 Student03 的引用 student,对 student 这个引用的修改在多线程中并不能保证可见性和原子性。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor final public class Student03 { final private Long id; final private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor public class School02 { Student03 student; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class IntegerLockTest03 { public static void main (String[] args) { School02 school = new School02(new Student03(1L , "Chris" ), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { Student03 lucky = new Student03(2L , "Lucky" ); school.setStudent(lucky); }); pool.execute(() -> { Student03 jack = new Student03(3L , "Jack" ); school.setStudent(jack); }); pool.shutdown(); System.out.println(school.getStudent()); } }
1 Student03(id=2 , name=Lucky)
实践可知:多线程会修改School02中对Student03 student的引用 。
如果你的程序仅仅需要 student 保持可见性,无需保证原子性,那么可以将 student 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现,实践一下:
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor final public class Student03 { final private Long id; final private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 import lombok.AllArgsConstructor;import lombok.Data;import java.util.concurrent.atomic.AtomicReference;@Data @AllArgsConstructor public class School02 { final AtomicReference studentAtomicReference; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class IntegerLockTest03 { public static void main (String[] args) throws InterruptedException { School02 school = new School02(new AtomicReference<>(new Student03(1L ,"Chris" )), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); Student03 preStudent = (Student03) school.getStudentAtomicReference().get(); pool.execute(() -> { Student03 lucky = new Student03(2L , "Lucky" ); boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, lucky); System.out.println("执行任务1" ); System.out.println(flag); System.out.println(school.getStudentAtomicReference().get()); }); pool.execute(() -> { Student03 jack = new Student03(3L , "Jack" ); boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, jack); System.out.println("执行任务2" ); System.out.println(flag); System.out.println(school.getStudentAtomicReference().get()); }); pool.shutdown(); } }
1 2 3 4 5 6 执行任务1 true Student03 (id=2 , name=Lucky) 执行任务2 false Student03 (id=2 , name=Lucky)
实践证明:当两个线程都尝试更新studentAtomicReference时,一个线程使用compareAndSet函数更新该成员变量成功后,另一个线程调用compareAndSet函数会失败,因为此时预期的值不再是preStudent,而是首先更新的线程修改后的值 。
具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法 。除了无状态的对象,你可能还听说过无状态的服务、无状态的协议等等。无状态有很多好处,最核心的一点就是性能。在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好;在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。
Guarded Suspension模式:等待唤醒机制实践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import lombok.Data;import java.util.Map;import java.util.concurrent.BlockingDeque;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.function.Predicate;@Data class GuardedObject <T > { T obj; final Lock lock = new ReentrantLock(); final Condition done = lock.newCondition(); final int timeout=2 ; final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>(); final static BlockingDeque<GuardedObject> historyGos = new LinkedBlockingDeque<>(); static <K> GuardedObject create (K key) { GuardedObject go = new GuardedObject(); gos.put(key, go); return go; } static <K, T> void fireEvent (K key, T obj) { GuardedObject go = gos.remove(key); if (go != null ){ go.onChanged(obj); } } T get (Predicate<T> p) { lock.lock(); try { while (!p.test(obj)) { done.await(timeout, TimeUnit.SECONDS); } } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } return obj; } void onChanged (T obj) { lock.lock(); try { this .obj = obj; historyGos.add(this ); done.signalAll(); } finally { lock.unlock(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class GuardedSuspensionTest { public static void main (String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(5 , 10 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { handleWebReq(1L , "第一个任务的消息" ); }); pool.execute(() -> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException(e); } handleWebReq(2L , "第二个任务的消息" ); }); pool.execute(() -> { handleWebReq(3L , "第三个任务的消息" ); }); pool.execute(() -> { Message message1 = null ; try { while (true ) { GuardedObject guardedObject = GuardedObject.historyGos.poll(3 , TimeUnit.SECONDS); if (guardedObject == null ) { System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费" ); break ; } message1 = (Message) guardedObject.get(t -> t != null ); System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent()); } } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.execute(() -> { Message message1 = null ; try { while (true ) { GuardedObject guardedObject = GuardedObject.historyGos.poll(3 , TimeUnit.SECONDS); if (guardedObject == null ) { System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费" ); break ; } message1 = (Message) guardedObject.get(t -> t != null ); System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent()); } } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.shutdown(); } public static void handleWebReq (Long id, String msgContent) { Message msg = new Message(id, msgContent); GuardedObject<Message> go = GuardedObject.create(id); send(msg); } public static void onMessage (Message msg) { GuardedObject.fireEvent(msg.getId(), msg); } public static void send (Message msg) { System.out.println("当前线程为:" + Thread.currentThread().getName() + "," + "发送消息:" + msg.getContent()); onMessage(msg); } }
1 2 3 4 5 6 7 8 当前线程为:pool-1 -thread-1 ,发送消息:第一个任务的消息 当前线程为:pool-1 -thread-3 ,发送消息:第三个任务的消息 当前线程为:pool-1 -thread-5 ,接收到消息:第三个任务的消息 当前线程为:pool-1 -thread-4 ,接收到消息:第一个任务的消息 当前线程为:pool-1 -thread-2 ,发送消息:第二个任务的消息 当前线程为:pool-1 -thread-5 ,接收到消息:第二个任务的消息 当前线程为:pool-1 -thread-4 , 3s内没有消息,停止消费 当前线程为:pool-1 -thread-5 , 3s内没有消息,停止消费
实践证明:3个线程生产消息,2个线程消费消息,保证了一个消息只有一个线程消费,不存在同一个消息被重复消费的情况。