ConcurrentHashMap和LongAdder()的配合使用 使用 ConcurrentHashMap 来统计,Key 的范围是 10。使用最多 10 个并发,循环操作 1000 万次,每次操作累加随机的 Key。如果 Key 不存在的话,首次设置值为 1。
normaluse()方法直接使用synchronized加锁的方式保证线程安全,锁的粒度很大,虽然能够保证完成业务需求,且线程安全,但性能低下 。
gooduse()方法使用 ConcurrentHashMap 的原子性方法 computeIfAbsent 来做复合逻辑操作,判断 Key 是否存在 Value,如果不存在则把 Lambda 表达式运行后的结果放入 Map 作为 Value,也就是新创建一个 LongAdder 对象,最后返回 Value 。
由于 computeIfAbsent 方法返回的 Value 是 LongAdder,是一个线程安全的累加器,因此可以直接调用其 increment 方法进行累加。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ThreadLocalRandom;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.LongAdder;import java.util.stream.Collectors;import java.util.stream.IntStream;public class ConcurrentHashMapTest { private static int LOOP_COUNT = 10000000 ; private static int THREAD_COUNT = 10 ; private static int ITEM_COUNT = 10 ; private Map<String, Long> normaluse () throws InterruptedException { ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1 , LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); synchronized (freqs) { if (freqs.containsKey(key)) { freqs.put(key, freqs.get(key) + 1 ); } else { freqs.put(key, 1L ); } } } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1 , TimeUnit.HOURS); return freqs; } private Map<String, Long> gooduse () throws InterruptedException { ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT); ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT); forkJoinPool.execute(() -> IntStream.rangeClosed(1 , LOOP_COUNT).parallel().forEach(i -> { String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT); freqs.computeIfAbsent(key, k -> new LongAdder()).increment(); } )); forkJoinPool.shutdown(); forkJoinPool.awaitTermination(1 , TimeUnit.HOURS); return freqs.entrySet().stream() .collect(Collectors.toMap( e -> e.getKey(), e -> e.getValue().longValue()) ); } public static void main (String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest(); concurrentHashMapTest.gooduse(); long endTime = System.currentTimeMillis(); long durationInMilli = endTime - startTime; System.out.println("方法执行时间(毫秒):" + durationInMilli); } }
normaluse()方法执行用时:
gooduse()方法执行用时:
优化后的代码,相比使用锁来操作 ConcurrentHashMap 的方式,性能提升了 4 倍。Java 自带的 Unsafe 实现的 CAS,在虚拟机层面确保了写入数据的原子性,比加锁的效率高得多。
CopyOnWriteArrayList的合理使用 在 Java 中,CopyOnWriteArrayList 虽然是一个线程安全的 ArrayList ,但其实现方式是,每次修改数据时都会复制一份数据出来 ,所以仅适用于读多写少或者说希望无锁读的场景 。
如果读写比例均衡或者有大量写操作的话,使用 CopyOnWriteArrayList 的性能会十分低下。
下面我们写一个demo比较一下使用 CopyOnWriteArrayList 和使用普通加锁方式的 ArrayList 的读写性能,
在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一定次数的写或读操作的耗时。
pom文件中加入依赖,才可以使用统计耗时的StopWatch工具类:
1 2 3 4 5 <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>5.3 .19 </version> </dependency>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 import org.springframework.util.StopWatch;import java.util.*;import java.util.concurrent.CopyOnWriteArrayList;import java.util.concurrent.ThreadLocalRandom;import java.util.stream.Collectors;import java.util.stream.IntStream;public class CopyOnWriteTest { public Map testWrite () { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); StopWatch stopWatch = new StopWatch(); int loopCount = 100000 ; stopWatch.start("写操作:copyOnWriteArrayList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); stopWatch.start("写操作:synchronizedList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount))); stopWatch.stop(); System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList" , copyOnWriteArrayList.size()); result.put("synchronizedList" , synchronizedList.size()); return result; } private void addAll (List<Integer> list) { list.addAll(IntStream.rangeClosed(1 , 1000000 ).boxed().collect(Collectors.toList())); } public Map testRead () { List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>(); List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>()); addAll(copyOnWriteArrayList); addAll(synchronizedList); StopWatch stopWatch = new StopWatch(); int loopCount = 1000000 ; int count = copyOnWriteArrayList.size(); stopWatch.start("读操作:copyOnWriteArrayList" ); IntStream.rangeClosed(1 , loopCount).parallel().forEach(c -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); stopWatch.start("读操作:synchronizedList" ); IntStream.range(1 , loopCount).parallel().forEach(c -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count))); stopWatch.stop(); System.out.println("输出代码执行耗时,以及执行时间百分比:" + stopWatch.prettyPrint()); Map result = new HashMap(); result.put("copyOnWriteArrayList" , copyOnWriteArrayList.size()); result.put("synchronizedList" , synchronizedList.size()); return result; } public static void main (String[] args) { CopyOnWriteTest copyOnWriteTest = new CopyOnWriteTest(); Map map = copyOnWriteTest.testWrite(); Map map1 = copyOnWriteTest.testRead(); System.out.println(map.toString()); System.out.println(map1.toString()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 输出代码执行耗时,以及执行时间百分比:StopWatch '' : running time = 2943391700 ns --------------------------------------------- ns % Task name --------------------------------------------- 2932055700 100 % 写操作:copyOnWriteArrayList011336000 000 % 写操作:synchronizedList输出代码执行耗时,以及执行时间百分比:StopWatch '' : running time = 122014900 ns --------------------------------------------- ns % Task name --------------------------------------------- 015497400 013 % 读操作:copyOnWriteArrayList 106517500 087% 读操作:synchronizedList{copyOnWriteArrayList=100000 , synchronizedList=100000 } {copyOnWriteArrayList=1000000 , synchronizedList=1000000 }
从实践结果可以看出,
大量写的场景(10 万次 add 操作),CopyOnWriteArray 比 ArrayList 慢一百倍不止,
而读操作CopyOnWriteArray性能优于 ArrayList,是ArrayList性能的6.7倍。
原因在于CopyOnWriteArrayList 中的 add 方法,在每次 add 时,都会用 Arrays.copyOf 创建一个新数组,频繁 add 时内存的申请释放消耗会很大:
1 2 3 4 5 6 7 8 9 10 public boolean add (E e) { synchronized (lock) { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1 ); newElements[len] = e; setArray(newElements); return true ; } }
综上所述,使用 CopyOnWriteArrayList 要注意使用场景是否为读多写少的场景或者说希望无锁读的场景 。
@Data使用注意事项 将@Data注解加在类上可以帮助我们根据类中的字段自动生成get()、set()、equals() 和 hashcode()方法,使用@Data注解需要导入lombok依赖:
1 2 3 4 5 6 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18 .20 </version> <scope>provided</scope> </dependency>
我们实践一下这个注解是否自动生成equals() 和 hashcode()方法,并能正确判断两个对象是否相同,创建一个Person类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import lombok.Data;@Data class Person { private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Marry" ,"123456" ); System.out.println(person1.equals(person2)); } }
由于两个对象的name字段的值不同,所以两个对象不相同,name改成一致的,再测试:
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Chris" ,"123456" ); System.out.println(person1.equals(person2)); } }
此时两个name和identity两个字段都相同,返回true,符合我们的预期,已实践验证。
如果你希望只要identity字段值一致就认为是同一个Person的话,可以使用 @EqualsAndHashCode.Exclude 注解来修饰 name 字段,从 equals 和 hashCode 的实现中排除 name 字段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import lombok.Data;import lombok.EqualsAndHashCode;@Data class Person { @EqualsAndHashCode .Exclude private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Person person1 = new Person("Chris" ,"123456" ); Person person2 = new Person("Marry" ,"123456" ); System.out.println(person1.equals(person2)); } }
实践验证了此时只要identity值相同,则两个对象相同,在equals 和 hashCode 的实现中排除了name字段。
如果类之间有继承关系上述实践还会生效吗?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import lombok.Data;import lombok.EqualsAndHashCode;@Data class Person { @EqualsAndHashCode .Exclude private String name; private String identity; public Person (String name, String identity) { this .name = name; this .identity = identity; } public Person () { } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import lombok.Data;@Data class Employee extends Person { private String company; public Employee (String name, String identity, String company) { super (name, identity); this .company = company; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Employee employee1 = new Employee("Chris" ,"123456" , "sky" ); Employee employee2 = new Employee("Marry" ,"666666" , "sky" ); System.out.println(employee1.equals(employee2)); } }
实践得知,子类Employee在equals 和 hashCode 的实现中默认只考虑了自身特有的字段company,父类Person的identity字段没有考虑在内(name刚才已经通过注解排除了),
为解决这个问题,我们可以手动设置@EqualsAndHashCode注解的配置 callSuper 开关为 true,来修改这种默认行为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import lombok.Data;import lombok.EqualsAndHashCode;@Data @EqualsAndHashCode(callSuper = true) class Employee extends Person { private String company; public Employee (String name, String identity, String company) { super (name, identity); this .company = company; } }
1 2 3 4 5 6 7 8 9 public class DataTest { public static void main (String[] args) { Employee employee1 = new Employee("Chris" ,"123456" , "sky" ); Employee employee2 = new Employee("Marry" ,"666666" , "sky" ); System.out.println(employee1.equals(employee2)); } }
修改后的代码,实现了同时以子类的属性 company 加上父类中的属性 identity,作为 equals 和 hashCode 方法的实现条件 (实现上其实是调用了父类的 equals 和 hashCode)。
实现排序的 Comparable 和 Comparator
Comparable 接口有一个 compareTo(Object obj)方法用来排序
Comparator 接口有一个compare(Object obj1, Object obj2)方法用来排序
CompletableFuture使用实践 CompletableFuture 和 Future使用实践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Task { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;public class TaskService { public User getUserInfo (Long userId) throws InterruptedException { Thread.sleep(200 ); return new User(1L , "Chris" ); } public Task getTaskInfo (Long taskId) throws InterruptedException { Thread.sleep(300 ); return new Task(1L , "学习" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import org.example.ideaspringboottest.JavaTest.DesensitizationTest.UserInfo;import java.util.concurrent.*;public class FutureTest { public static void main (String[] args) throws ExecutionException, InterruptedException, ExecutionException { ExecutorService executorService = Executors.newFixedThreadPool(10 ); TaskService taskService = new TaskService(); Long userId =1L ; Long taskId =1L ; Long startTime = System.currentTimeMillis(); FutureTask<User> userInfoFutureTask = new FutureTask<>(new Callable<User>() { @Override public User call () throws Exception { return taskService.getUserInfo(userId); } }); executorService.submit(userInfoFutureTask); FutureTask<Task> taskInfoFutureTask = new FutureTask<>(new Callable<Task>() { @Override public Task call () throws Exception { return taskService.getTaskInfo(taskId); } }); executorService.submit(taskInfoFutureTask); User user = userInfoFutureTask.get(); Task task = taskInfoFutureTask.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.TimeUnit;import java.util.concurrent.TimeoutException;public class CompletableFutureTest { public static void main (String[] args) throws InterruptedException, ExecutionException, TimeoutException { TaskService taskService = new TaskService(); Long userId =1L ; Long taskId =1L ; Long startTime = System.currentTimeMillis(); CompletableFuture<User> completableUserInfoFuture = CompletableFuture.supplyAsync(() -> { try { return taskService.getUserInfo(userId); } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture<Task> completableTaskInfoFuture = CompletableFuture.supplyAsync(() -> { try { return taskService.getTaskInfo(taskId); } catch (InterruptedException e) { throw new RuntimeException(e); } }); User user = completableUserInfoFuture.get(2 , TimeUnit.SECONDS); Task task = completableTaskInfoFuture.get(); System.out.println("总共用时" + (System.currentTimeMillis() - startTime) + "ms" ); } }
CompletableFuture 和 Future 的区别
功能和灵活性
Future:
java.util.concurrent.Future 是 Java 5 引入的接口,
用于表示一个异步操作的未来结果。它提供了基本的异步操作支持,可以用于检查是否完成、等待结果以及获取结果,但在处理结果、异常和组合等方面功能有限 。
CompletableFuture:
java.util.concurrent.CompletableFuture 是 Java 8 引入的类,
扩展了 Future 的功能,提供了更强大和灵活的异步编程支持。它允许更容易地处理异步操作的结果、异常和组合,以及在操作完成时执行回调等 。
异步编程支持
Future:
Future 只能用于异步操作的基本管理,如提交任务和获取结果 。
CompletableFuture:
CompletableFuture 提供了丰富的操作符、方法和组合功能,可以在异步操作之间进行更复杂的处理,如转换、组合、串联、并行等 。
组合和链式操作
Future:
Future 没有提供内置的操作符来对多个异步操作进行链式组合 。
CompletableFuture:
CompletableFuture 支持在操作完成后进行链式操作,使多个异步操作可以依次执行,以及在其中一个或多个操作完成后执行其他操作 。
异常处理
Future:
Future 的异常处理相对有限,通常需要使用 try-catch 块来捕获操作过程中的异常 。
CompletableFuture:
CompletableFuture 具有更强大的异常处理机制,可以使用 exceptionally()、handle() 等方法来处理操作过程中的异常 。
回调执行
Future:
Future 不支持在操作完成时执行回调操作 。
CompletableFuture:
CompletableFuture 支持使用 thenApply()、thenCompose()、thenCombine() 等方法来在操作完成后执行回调 。
综上所述,CompletableFuture 提供了更丰富的异步编程支持和功能,使处理异步操作的结果、异常和组合变得更加灵活和便捷 。
如果使用的是 Java 8 或更高版本,通常建议使用 CompletableFuture 来实现更高级的异步编程需求。
CompletableFuture常用方法 创建异步任务 CompletableFuture创建异步任务,一般有supplyAsync 和runAsync 两个方法
supplyAsync 执行CompletableFuture任务,支持有返回值 。
runAsync 执行CompletableFuture任务,没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class RunAsyncTest { public static void main (String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("这是runAsync方法" ), executor); CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> { System.out.println("这是supplyAsync方法" ); return "B站:不吃辣的Chris" ; }, executor); System.out.println(runFuture.join()); System.out.println(supplyFuture.join()); executor.shutdown(); } }
1 2 3 4 这是runAsync方法 null 这是supplyAsync方法 B站:不吃辣的Chris
任务异步回调 thenRun/thenRunAsync CompletableFuture的thenRun 方法负责做完第一个任务后,再做第二个任务 。某个任务执行完成后,执行回调方法 ;但是前后两个任务没有参数传递,第二个任务也没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenRunTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture thenRunFuture = orgFuture.thenRun(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第二个CompletableFuture方法任务" ); }); System.out.println(thenRunFuture.get()); } }
1 2 3 4 5 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture方法任务 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第二个CompletableFuture方法任务 null
thenRun 和thenRunAsync有什么区别
1 2 3 4 5 6 7 8 9 10 private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); public CompletableFuture<Void> thenRun (Runnable action) { return uniRunStage(null , action); } public CompletableFuture<Void> thenRunAsync (Runnable action) { return uniRunStage(asyncPool, action); }
如果你执行第一个任务的时候,传入了一个自定义线程池:
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池 。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThenRunAsyncTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; },executor ); CompletableFuture thenRunFuture = orgFuture.thenRunAsync(() -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第二个CompletableFuture方法任务" ); }); System.out.println(thenRunFuture.get()); } }
1 2 3 4 5 当前线程名称:pool-1 -thread-1 这是第一个CompletableFuture方法任务 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第二个CompletableFuture方法任务 null
thenAccept/thenAcceptAsync CompletableFuture的thenAccept 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中 ,但是回调方法是没有返回值 的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenAcceptTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> { if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + "!" ); } System.out.println("一键三连加关注哦~" ); }); System.out.println(thenAcceptFuture.get()); } }
1 2 3 4 第一个CompletableFuture方法任务 第一个方法的结果是 B站:不吃辣的Chris! 一键三连加关注哦~ null
thenApply/thenApplyAsync CompletableFuture的thenApply 方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中 ,并且回调方法是有返回值的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ThenApplyTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("第一个CompletableFuture方法任务" ); return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> thenApplyFuture = orgFuture.thenApply((a) -> { if ("B站:不吃辣的Chris" .equals(a)) { return "第一个方法的结果是 " + a + " ,已一键三连加关注!" ; } return "不是 " + a + " 哦~" ; }); System.out.println(thenApplyFuture.get()); } }
1 2 第一个CompletableFuture方法任务 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注!
exceptionally CompletableFuture的exceptionally 方法表示,某个任务执行异常时,执行的回调方法,并且有抛出异常作为参数,传递到回调方法 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class ExceptionallyTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); return "出现异常,请及时处理!" ; }); System.out.println(exceptionFuture.get()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 当前线程名称:ForkJoinPool.commonPool-worker-1 出现异常,请及时处理! java.util.concurrent.CompletionException: java.lang.RuntimeException at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315 ) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1760 ) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373 ) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182 ) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655 ) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622 ) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165 ) Caused by: java.lang.RuntimeException at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.ExceptionallyTest.lambda$main$0 (ExceptionallyTest.java:12 ) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768 ) ... 6 more
whenComplete方法 CompletableFuture的whenComplete 方法表示,某个任务执行完成后,执行的回调方法,****无返回值 ;并且whenComplete方法返回的CompletableFuture的result是****上个任务的结果 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class WhenCompleteTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture任务" ); try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("第一个任务执行结束,执行结果为 " + a + " " ); if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!" ); } System.out.println("耶~" ); }); System.out.println(rstFuture.get()); } }
1 2 3 4 5 6 7 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture任务 当前线程名称:ForkJoinPool.commonPool-worker-1 第一个任务执行结束,执行结果为 B站:不吃辣的Chris 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注! 耶~ B站:不吃辣的Chris
handle方法 CompletableFuture的handle 方法表示,某个任务执行完成后,执行回调方法,并且是有返回值的 ,并且handle方法返回的CompletableFuture的result是****回调方法执行的结果 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class HandleTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync( ()->{ System.out.println("当前线程名称:" + Thread.currentThread().getName()); System.out.println("这是第一个CompletableFuture任务" ); try { Thread.sleep(1000L ); } catch (InterruptedException e) { e.printStackTrace(); } return "B站:不吃辣的Chris" ; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("第一个任务执行结束,执行结果为 " + a + " " ); if ("B站:不吃辣的Chris" .equals(a)) { System.out.println("第一个方法的结果是 " + a + " ,已一键三连加关注!" ); return "已关注" ; } System.out.println("耶~" ); return null ; }); System.out.println(rstFuture.get()); } }
1 2 3 4 5 当前线程名称:ForkJoinPool.commonPool-worker-1 这是第一个CompletableFuture任务 第一个任务执行结束,执行结果为 B站:不吃辣的Chris 第一个方法的结果是 B站:不吃辣的Chris ,已一键三连加关注! 已关注
任务组合处理 AND组合处理 thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务 。
区别在于:
thenCombine :会将两个任务的执行结果作为方法入参,传递到指定方法中 ,且有返回值
thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中 ,且无返回值
runAfterBoth :不会把执行结果当做方法入参 ,且没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.*;public class ThenCombineAsyncTest { public static void main (String[] args) throws InterruptedException, ExecutionException, TimeoutException { CompletableFuture<String> first = CompletableFuture.completedFuture("这是第一个CompletableFuture任务" ); ExecutorService executor = Executors.newFixedThreadPool(10 ); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "这是第二个CompletableFuture任务" , executor) .thenCombineAsync(first, (s, w) -> { System.out.println(w); System.out.println(s); return "这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合" ; }, executor); System.out.println(future.join()); executor.shutdown(); } }
1 2 3 这是第一个CompletableFuture任务 这是第二个CompletableFuture任务 这是第三个CompletableFuture任务,走到这里说明两个异步任务进行了组合
OR组合处理 applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务 。
区别在于:
applyToEither :会将已经执行完成的任务,作为方法入参,传递到指定方法中 ,且有返回值 。
acceptEither : 会将已经执行完成的任务,作为方法入参,传递到指定方法中 ,且无返回值 。
runAfterEither :不会把执行结果当做方法入参 ,且没有返回值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class AcceptEitherAsyncTest { public static void main (String[] args) { CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{ try { Thread.sleep(1000L ); System.out.println("第一个任务执行结束" );} catch (Exception e){ return "第一个任务异常执行出现了异常" ; } return "任务1结果" ; }); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> { System.out.println("第二个任务执行结束" ); return "任务2结果" ;} , executor) .acceptEitherAsync(first, System.out::println, executor); executor.shutdown(); } }
AllOf 所有任务都执行完成后,才执行 allOf返回的CompletableFuture 。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class AllOfTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ System.out.println("执行第一个任务" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("执行第二个任务" ); System.out.println(1 /0 ); System.out.println("第二个任务执行结束" ); }); CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(a, b).whenComplete((m,k)->{ System.out.println("两个任务都执行结束了" ); }); System.out.println(allOfFuture.get()); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 执行第一个任务 执行第二个任务 两个任务都执行结束了 Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396 ) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2073 ) at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.main(AllOfTest.java:20 ) Caused by: java.lang.ArithmeticException: / by zero at org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest.AllOfTest.lambda$main$1 (AllOfTest.java:14 ) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804 ) at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796 ) at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373 ) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182 ) at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655 ) at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622 ) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165 )
AnyOf 任意一个任务执行完,就执行anyOf返回的CompletableFuture 。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class AnyOfTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> a = CompletableFuture.runAsync(()->{ try { Thread.sleep(2000L ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("执行第一个任务" ); }); CompletableFuture<Void> b = CompletableFuture.runAsync(() -> { System.out.println("执行第二个任务" ); }); CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((m,k)->{ System.out.println("某一个任务执行结束了" ); }); anyOfFuture.join(); } }
thenCompose thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法 ,该方法会返回一个新的CompletableFuture实例 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 package org.example.ideaspringboottest.JavaTest.CompletableFutureTest.FunctionTest;import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThenComposeAsyncTest { public static void main (String[] args) throws ExecutionException, InterruptedException { CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("执行第一个任务" ); ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture<String> future = CompletableFuture .supplyAsync(() -> "执行第二个任务" , executor) .thenComposeAsync(data -> { System.out.println(data); return completedFuture; }, executor); System.out.println(future.join()); executor.shutdown(); } }
Future使用实践 ThreadPoolExecutor的submit()方法 Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。3 个 submit() 方法如下:
1 2 3 4 5 6 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
这 3 个 submit() 方法之间的区别在于方法参数不同。
提交 Runnable 任务 submit(Runnable task) :这个方法的参数是一个 Runnable 接口, Runnable 接口的 run() 方法是没有返回值的 ,所以 submit(Runnable task) 这个方法返回的 Future 仅可以用来断言任务已经结束了 ,类似于 Thread.join()。
提交 Callable 任务 submit(Callable task):这个方法的参数是一个 Callable 接口,它只有一个 call() 方法,并且 这个方法是有返回值的 ,所以这个方法返回的 Future 对象可以通过调用其 get() 方法来获取任务的执行结果 。
提交 Runnable 任务及结果引用 submit(Runnable task, T result):假设这个 方法返回的 Future 对象是 f,f.get() 的返回值就是传给 submit() 方法的参数 result 。
下面示例一下** submit(Runnable task, T result)的使用,Runnable 接口的实现类 Task 需要声明一个有参构造函数 Task(Result r) ,创建 Task 对象的时候传入了 result 对象,这样就能在类 Task 的 run() 方法中对 result 进行各种操作了。 result 相当于主线程和子线程之间的桥梁,通过它主子线程可以共享数据**。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Result { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 public class Task implements Runnable { Result result; Task(Result result){ this .result = result; } public void run () { Long resultId = result.getId(); System.out.println(resultId); result.setName("修改后的任务执行结果名称" ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class SubmitTest { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(1 ); Result result = new Result(); result.setId(1L ); result.setName("执行结果名称" ); Future<Result> future = executor.submit(new Task(result), result); Result result1 = future.get(); System.out.println(result1.getId()); System.out.println(result1.getName()); executor.shutdown(); } }
Future 接口 submit(Runnable task, T result)的使用实践过了,同时还发现上述三个submit方法返回值都是 Future 接口,Future 接口有 5 个方法,分别是取消任务的方法 cancel()、 判断任务是否已取消的方法 isCancelled()、 判断任务是否已结束的方法 isDone()以及 2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制 。通过 Future 接口提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是: 这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞 ,直到任务执行完才会被唤醒 。
1 2 3 4 5 6 7 8 9 10 boolean cancel (boolean mayInterruptIfRunning) ;boolean isCancelled () ;boolean isDone () ;get(); get(long timeout, TimeUnit unit);
FutureTask FutureTask基本使用 Future 是一个接口,而 FutureTask 是一个工具类,有两个构造函数,它们的参数和前面介绍的 submit() 方法类似。
1 2 3 4 5 6 7 8 9 10 11 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; } public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的实践一下将 FutureTask 对象提交给 ThreadPoolExecutor 执行 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.FutureTask;public class FutureTaskTest { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(()-> 10 +22 ); ExecutorService executorService = Executors.newCachedThreadPool(); executorService.submit(futureTask); Integer result = futureTask.get(); executorService.shutdown(); System.out.println(result); } }
实践一下 FutureTask 对象直接被 Thread 执行,利用 FutureTask 对象可以获取子线程的执行结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class FutureTaskThreadTest { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(()-> 10 +22 ); Thread t1 = new Thread(futureTask); t1.start(); Integer result = futureTask.get(); System.out.println(result); } }
实践一个需要保证多个任务执行顺序的场景,例如煮茶这个过程可以拆分为:洗水壶、烧水和泡茶三个任务,在泡茶前需要洗茶壶、洗茶杯和准备茶叶三个任务,而洗水壶、烧水这两个任务可以和洗茶壶、洗茶杯和准备茶叶三个任务并行执行,在洗水壶、烧水这两个任务执行完后等待洗茶壶、洗茶杯和准备茶叶三个任务执行完成,然后就可以泡茶了。首先,我们创建了两个 FutureTask——futureTask1 和 futureTask2,futureTask1 完成洗水壶、烧开水、泡茶的任务,futureTask2 完成洗茶壶、洗茶杯、准备茶叶的任务;这里需要注意的是 futureTask1 这个任务在执行泡茶任务前,需要等待 futureTask2 把茶叶拿来,所以 futureTask1 内部需要引用 futureTask2,并在执行泡茶之前,调用 futureTask2 的 get() 方法实现等待。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.Callable;import java.util.concurrent.FutureTask;import java.util.concurrent.TimeUnit;class T1Task implements Callable <String > { FutureTask<String> futureTask2; T1Task(FutureTask<String> futureTask2){ this .futureTask2 = futureTask2; } @Override public String call () throws Exception { System.out.println("T1:洗水壶..." ); TimeUnit.SECONDS.sleep(1 ); System.out.println("T1:烧开水..." ); TimeUnit.SECONDS.sleep(2 ); String futureTask2Result = futureTask2.get(); System.out.println("T1:准备茶叶:" +futureTask2Result); System.out.println("T1:泡茶..." ); return "上茶:" + futureTask2Result; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.Callable;import java.util.concurrent.TimeUnit;class T2Task implements Callable <String > { @Override public String call () throws Exception { System.out.println("T2:洗茶壶..." ); TimeUnit.SECONDS.sleep(1 ); System.out.println("T2:洗茶杯..." ); TimeUnit.SECONDS.sleep(2 ); System.out.println("T2:准备茶叶..." ); TimeUnit.SECONDS.sleep(5 ); return "西湖龙井" ; } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;public class FutureTaskTea { public static void main (String[] args) throws ExecutionException, InterruptedException { FutureTask<String> futureTask2 = new FutureTask<>(new T2Task()); FutureTask<String> futureTask1 = new FutureTask<>(new T1Task(futureTask2)); Thread T1 = new Thread(futureTask1); T1.start(); Thread T2 = new Thread(futureTask2); T2.start(); System.out.println(futureTask1.get()); } }
1 2 3 4 5 6 7 8 T1:洗水壶... T2:洗茶壶... T2:洗茶杯... T1:烧开水... T2:准备茶叶... T1:准备茶叶:西湖龙井 T1:泡茶... 上茶:西湖龙井
FutureTask结合阻塞队列的使用 没有使用阻塞队列的实践:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.knowledgesharing.knowledgesharing.JavaUtils.CompletionServiceTest;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class TaskIncomeTestBlock { public static void main (String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3 ); Future<Integer> f1 = executor.submit(()-> { Thread.sleep(3000 ); return 1 ; }); Future<Integer> f2 = executor.submit(()->{return 2 ;}); Future<Integer> f3 = executor.submit(()->{return 3 ;}); Integer taskIncome1 = f1.get(); executor.execute(()-> { System.out.println("任务1的收益为:" + taskIncome1 + ",现保存入库" ); } ); Integer taskIncome2 = f2.get(); executor.execute(()-> { System.out.println("任务2的收益为:" + taskIncome2 + ",现保存入库" ); } ); Integer taskIncome3 = f3.get(); executor.execute(()-> { System.out.println("任务3的收益为:" + taskIncome3 + ",现保存入库" ); } ); executor.shutdown(); } }
1 2 3 任务1 的收益为:1 ,现保存入库 任务2 的收益为:2 ,现保存入库 任务3 的收益为:3 ,现保存入库
可以发现任务2和任务3都要等任务1的f1.get()执行结束才可以执行 ,也就是说调用f1.get()的主线程阻塞在了f1.get()处,只有执行结束后才能执行后续的逻辑,没有很好地利用异步执行 ,现结合阻塞队列进行优化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 import java.util.concurrent.*;public class TaskIncomeTestNotBlock { public static void main (String[] args) throws ExecutionException, InterruptedException { LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>(); ExecutorService executor = Executors.newFixedThreadPool(3 ); Future<Integer> f1 = executor.submit(()-> { Thread.sleep(3000 ); return 1 ; }); Future<Integer> f2 = executor.submit(()->{return 2 ;}); Future<Integer> f3 = executor.submit(()->{return 3 ;}); executor.execute(()-> { try { blockingQueue.put(f1.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); executor.execute(()-> { try { blockingQueue.put(f2.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); executor.execute(()-> { try { blockingQueue.put(f3.get()); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } }); for (int i=0 ; i<3 ; i++) { Integer r = (Integer) blockingQueue.take(); executor.execute(()-> { System.out.println("任务的收益为:" + r + ",现保存入库" ); }); } executor.shutdown(); } }
1 2 3 任务的收益为:2 ,现保存入库 任务的收益为:3 ,现保存入库 任务的收益为:1 ,现保存入库
此时任务1的执行不会阻塞任务2和任务3的执行。
CompletionService实践 CompletionService基本使用 CompletionService 的实现原理是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future 对象加入到阻塞队列中,Future中结合阻塞队列的实践是把任务最终的执行结果放入了阻塞队列中。
CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public ExecutorCompletionService (Executor executor) { if (executor == null ) throw new NullPointerException(); this .executor = executor; this .aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null ; this .completionQueue = new LinkedBlockingQueue<Future<V>>(); } public ExecutorCompletionService (Executor executor, BlockingQueue<Future<V>> completionQueue) { if (executor == null || completionQueue == null ) throw new NullPointerException(); this .executor = executor; this .aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null ; this .completionQueue = completionQueue; }
这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用无界的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到 completionQueue 中。下面实践优化一下Future中结合阻塞队列实现3个任务的收益的计算和入库处理。
通过 CompletionService 接口提供的 submit() 方法提交三个任务收益计算的任务,这三个任务将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回任务执行结果了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.*;public class CompletionServiceTest { public static void main (String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(3 ); CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor); completionService.submit(()-> { Thread.sleep(3000 ); return 1 ; }); completionService.submit(()->{return 2 ;}); completionService.submit(()->{return 3 ;}); for (int i = 0 ; i<3 ; i++) { Integer result = completionService.take().get(); executor.execute(()->{ System.out.println("任务的收益为:" + result + ",现保存入库" ); }); } executor.shutdown(); } }
1 2 3 任务的收益为:3 ,现保存入库 任务的收益为:2 ,现保存入库 任务的收益为:1 ,现保存入库
CompletionService 接口说明 CompletionService 接口提供的方法有** 5 个, submit() 相关的方法有两个, 一个方法参数是Callable, 另外一个方法有两个参数,分别是Runnable task和V result**,这个方法类似于 ThreadPoolExecutor 的 Future submit(Runnable task, T result) ,
CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素 ,区别在于如果阻塞队列是空的,那么调用 take() 方法的线程会被阻塞,而 poll() 方法会返回 null 值 。 poll(long timeout, TimeUnit unit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public Future<V> submit (Callable<V> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future<V> submit (Runnable task, V result) { if (task == null ) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; } public Future<V> take () throws InterruptedException { return completionQueue.take(); } public Future<V> poll () { return completionQueue.poll(); } public Future<V> poll (long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
CompletionService实现方法高可用 在集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。比如你需要提供一个查询用户当前位置的服务,为了保证该服务的高可用和性能,你可以并行地调用 3 个地图服务商的 API,然后只要有 1 个正确返回了结果,就可以直接返回了,也就是说可以容错 2 个地图服务商服务异常,但缺点是消耗的资源偏多。
下面实践一下:首先创建一个线程池 executor 、一个 CompletionService 对象 completionService 和一个Future类型的列表 futures ,每次通过调用 CompletionService 的 submit() 方法提交一个异步任务进行执行 ,执行结束会返回一个 Future 对象到CompletionService的阻塞队列中 ,调用completionService.take()就能拿到并消费队列头部的一个Future ,也就是最快返回的任务,调用completionService.take().get()就能拿到这个任务的执行结果 。同时我们把这些 Future 对象保存在列表 futures 中,用于在finally中取消所有任务,因为我们想要的效果就是只要我们拿到一个正确返回的结果,就可以取消所有任务并且返回最终结果了 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;public class HighAvailabilityTest { public static void main (String[] args) throws InterruptedException { ExecutorService executor = Executors.newFixedThreadPool(3 ); CompletionService<String> completionService = new ExecutorCompletionService<>(executor); List<Future<String>> futures = new ArrayList<>(3 ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商1提供的用户地址" ;}) ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商2提供的用户地址" ;}) ); futures.add(completionService.submit(()-> { Thread.sleep((long ) (Math.random() * 1000 ) + 1 ); return "服务商3提供的用户地址" ;}) ); String loaction = "" ; try { for (int i = 0 ; i < 3 ; ++i) { loaction = completionService.take().get(); if (loaction != null ) { break ; } } } catch (ExecutionException e) { throw new RuntimeException(e); } finally { for (Future<String> future : futures) future.cancel(true ); } System.out.println(loaction); executor.shutdown(); } }
实践可知:每次运行程序都是不同服务商提供的用户地址,说明只要一个服务提供商提供了地址,则直接返回结果,保证了服务的高可用。
小结 当需要批量提交异步任务的时候建议使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单 。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待 。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽然看起来麻烦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
Fork/Join使用实践 分治任务模型 分治任务模型可分为两个阶段:
任务分解:将任务迭代地分解为子任务,直至子任务可以直接计算出结果;
结果合并:逐层合并子任务的执行结果,直至获得最终结果。
在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,往往都采用递归算法。
Fork/Join 的使用 Fork/Join 是一个并行计算的框架,用于支持分治任务模型 ,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解 ,Join 对应的是结果合并 。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool ,另一部分是分治任务 ForkJoinTask 。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask 。
ForkJoinTask 是一个抽象类,它的核心方法是 fork() 方法和 join() 方法 ,其中 fork() 方法会异步地执行一个子任务 ,而 join() 方法则会阻塞当前线程来等待子任务的执行结果 。ForkJoinTask ** 有两个子类—— RecursiveAction** 和 RecursiveTask ,它们都是用递归的方式来处理分治任务的 。这两个子类都**定义了抽象方法 compute()**,不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的 。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
实践一下用 Fork/Join 计算斐波那契数列。首先需要创建一个分治任务线程池 以及计算斐波那契数列的分治任务 ,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务 。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务 Fibonacci 需要实现 compute() 方法 ,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork() 这条语句实现的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import java.util.concurrent.RecursiveTask;public class Fibonacci extends RecursiveTask <Integer > { final int n; Fibonacci(int n){ this .n = n; } protected Integer compute () { if (n <= 1 ) { return n; } Fibonacci f1 = new Fibonacci(n - 1 ); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2 ); return f2.compute() + f1.join(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import java.util.concurrent.ForkJoinPool;public class ForkJoinTest { public static void main (String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(4 ); Fibonacci fibonacci = new Fibonacci(30 ); Integer result = forkJoinPool.invoke(fibonacci); System.out.println(result); } }
ForkJoinPool 工作原理 ThreadPoolExecutor 本质上是一个生产者 - 消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介 ;ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程共享一个任务队列 。Fork/Join 并行计算的核心组件是 ForkJoinPool,ForkJoinPool 本质上也是一个生产者 - 消费者的实现 。
ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列 ,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中 。
如果工作线程对应的任务队列空了,也是不会“躺平”的,ForkJoinPool 支持一种叫做“任务窃取 ”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务 ,比如线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务 。如此一来,所有的工作线程都不会闲下来了。ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争 。
统计单词数量实践 现在有统计一个文件里面每个单词的数量的场景,可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果 。
开始实践一下,用一个字符串数组 String[] fc 来模拟文件内容,fc 里面的元素与文件里面的行数据一一对应。关键的代码在 compute() 这个方法里面,这是一个递归方法,前半部分数据 fork 一个递归任务去处理(关键代码 mr1.fork()),后半部分数据则在当前任务中递归处理(mr2.compute()) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 import java.util.HashMap;import java.util.Map;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class WordStatisticsTest { public static void main (String[] args) { String[] fc = {"hello world" , "I am Chris" , "I am fork" , "I am join" , "We are fork and join" }; ForkJoinPool fjp = new ForkJoinPool(3 ); MR mr = new MR(fc, 0 , fc.length); Map<String, Long> result = fjp.invoke(mr); result.forEach((k, v)-> System.out.println(k+":" +v)); } static class MR extends RecursiveTask <Map <String , Long >> { private String[] fc; private int start, end; MR(String[] fc, int fr, int to){ this .fc = fc; this .start = fr; this .end = to; } @Override protected Map<String, Long> compute () { if (end - start == 1 ) { return calc(fc[start]); } else { int mid = (start + end)/2 ; MR mr1 = new MR(fc, start, mid); mr1.fork(); MR mr2 = new MR(fc, mid, end); return merge(mr2.compute(), mr1.join()); } } private Map<String, Long> merge (Map<String, Long> r1, Map<String, Long> r2) { Map<String, Long> result = new HashMap<>(); result.putAll(r1); r2.forEach((k, v) -> { Long c = result.get(k); if (c != null ) result.put(k, c+v); else result.put(k, v); }); return result; } private Map<String, Long> calc (String line) { Map<String, Long> result = new HashMap<>(); String [] words = line.split("\\s+" ); for (String w : words) { Long v = result.get(w); if (v != null ) result.put(w, v+1 ); else result.put(w, 1L ); } return result; } } }
1 2 3 4 5 6 7 8 9 10 fork:2 world:1 are:1 and:1 Chris:1 I:3 join:2 hello:1 am:3 We:1
小结 Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果 。
Fork/Join 并行计算框架的核心组件是 ForkJoinPool 。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好 。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的 。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数 ;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题 ,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务 。
线程池原理 自定义线程池实践 线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。实践创建一个简单的线程池 MyThreadPool,以辅助理解线程池的工作原理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import java.util.ArrayList;import java.util.List;import java.util.concurrent.BlockingQueue;class MyThreadPool { BlockingQueue<Runnable> workQueue; List<WorkerThread> threads = new ArrayList<>(); MyThreadPool(int poolSize, BlockingQueue<Runnable> workQueue){ this .workQueue = workQueue; for (int idx = 0 ; idx<poolSize; idx++){ WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } void execute (Runnable command) throws InterruptedException { workQueue.put(command); } class WorkerThread extends Thread { public void run () { while (true ){ Runnable task = null ; try { task = workQueue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } task.run(); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public class ThreadPoolTest { public static void main (String[] args) throws InterruptedException { BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2 ); MyThreadPool pool = new MyThreadPool(10 , workQueue); pool.execute(()->{ System.out.println("I am Chris." ); }); } }
在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中 。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码 while 循环部分 。
使用 Java 中的线程池方式 Java 提供的线程池相关的工具类中,最核心的是 ThreadPoolExecutor,ThreadPoolExecutor 的构造函数有 7 个参数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0 ) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null ) throw new NullPointerException(); this .acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this .corePoolSize = corePoolSize; this .maximumPoolSize = maximumPoolSize; this .workQueue = workQueue; this .keepAliveTime = unit.toNanos(keepAliveTime); this .threadFactory = threadFactory; this .handler = handler; }
可以把线程池类比为一个项目组,而线程就是项目组的成员。
corePoolSize :表示线程池保有的最小线程数 。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
maximumPoolSize :表示线程池创建的最大线程数 。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
keepAliveTime & unit :上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了 。
workQueue :工作队列,和上面示例代码的工作队列同义。
threadFactory :通过这个参数你可以自定义如何创建线程 ,例如你可以给线程指定一个有意义的名字。
handler :通过这个参数你可以自定义任务的拒绝策略 。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收 。至于拒绝的策略,你可以通过 handler 这个参数来指定 。ThreadPoolExecutor 已经提供了以下 4 种策略。
CallerRunsPolicy :提交任务的线程自己去执行该任务 。
AbortPolicy :默认的拒绝策略,直接抛出异常 throws RejectedExecutionException。
DiscardPolicy :直接丢弃任务,没有任何异常抛出 。
DiscardOldestPolicy :丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列 。
使用线程池注意事项 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors ,最重要的原因是: Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题 。所以强烈建议使用有界队列。
使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略,因此默认拒绝策略要慎重使用 。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略,并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
使用线程池,还要注意异常处理的问题 ,例如通过 ThreadPoolExecutor 对象的 execute() 方法提交任务时,如果任务在执行的过程中出现运行时异常,会导致执行任务的线程终止;不过,最致命的是任务虽然异常了,但是你却获取不到任何通知,这会让你误以为任务都执行得很正常 。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理 ,你可以参考下面的示例代码。
1 2 3 4 5 6 7 try { } catch (RuntimeException x) { } catch (Throwable x) { }
CountDownLatch使用实践 有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CountDownLatch优化串行执行查询订单和查询运单的逻辑 。
创建一个 CountDownLatch,计数器的初始值等于 2,之后在模拟从数据库中查询订单的操作orders.add(orderList.get(index));和模拟从数据库中查询运单的操作dispatches.add(dispatchList.get(index));对应的两条语句的后面对计数器执行减 1 操作,这个对计数器减 1 的操作是通过调用** latch.countDown(); 来实现的。在主线程中,我们通过调用 latch.await() **来实现对计数器等于 0 的等待。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Dispatch { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.CountDownLatch;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class CountDownLatchTest { public static Vector<Order> orders = new Vector<>(); public static Vector<Dispatch> dispatches = new Vector<>(); public static Executor executor = Executors.newFixedThreadPool(2 ); public static void main (String[] args) throws InterruptedException { List<Order> orderList = new ArrayList<>(); List<Dispatch> dispatchList = new ArrayList<>(); orderList.add(new Order(1L , 10 )); orderList.add(new Order(2L , 12 )); dispatchList.add(new Dispatch(1L , 6 )); dispatchList.add(new Dispatch(2L , 8 )); for (int i = 0 ; i < orderList.size();i++) { CountDownLatch latch = new CountDownLatch(2 ); int index = i; executor.execute(()-> { orders.add(orderList.get(index)); latch.countDown(); }); executor.execute(()-> { dispatches.add(dispatchList.get(index)); latch.countDown(); }); latch.await(); System.out.println("执行对账操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index)); System.out.println("对账结果写入数据库操作,order为:" + orderList.get(index) + ",dispatch为" + dispatchList.get(index)); } } }
1 2 3 4 执行对账操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 对账结果写入数据库操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 执行对账操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 ) 对账结果写入数据库操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 )
CyclicBarrier使用实践 有一个场景是网购平台一个订单对应一个运单,定期需要核对一批订单和运单,查询订单和查询运单的任务可以并行执行以提高效率,现在实践一下使用CyclicBarrier优化串行执行查询订单和查询运单的逻辑 。
首先创建一个计数器初始值为 2 的 CyclicBarrier,创建 CyclicBarrier 的时候,还需要传入一个回调函数,这样的话当计数器减到 0 的时候,会调用这个回调函数。线程 T1 负责查询订单,当查出一条时,调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;线程 T2 负责查询运单,当查出一条时,也调用 barrier.await() 来将计数器减 1,同时等待计数器变成 0;当 T1 和 T2 都调用 barrier.await() 的时候,计数器会减到 0,此时 T1 和 T2 就可以执行下一条语句了,同时会调用 barrier 的回调函数来执行对账操作。比较有意思的是,CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,使用起来十分方便。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Dispatch { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class Order { private Long id; private Integer money; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import java.util.ArrayList;import java.util.List;import java.util.Vector;import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;import java.util.concurrent.Executor;import java.util.concurrent.Executors;public class CyclicBarrierTest { public static Vector<Order> orders = new Vector<>(); public static Vector<Dispatch> dispatches = new Vector<>(); public static Executor executor = Executors.newFixedThreadPool(1 ); public static final CyclicBarrier barrier = new CyclicBarrier(2 , ()->{ executor.execute(()-> check() ); }); public static void main (String[] args) { List<Order> orderList = new ArrayList<>(); List<Dispatch> dispatchList = new ArrayList<>(); orderList.add(new Order(1L , 10 )); orderList.add(new Order(2L , 12 )); dispatchList.add(new Dispatch(1L , 6 )); dispatchList.add(new Dispatch(2L , 8 )); Thread T1 = new Thread(()->{ for (Order order : orderList){ orders.add(order); try { barrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); T1.start(); Thread T2 = new Thread(()->{ for (Dispatch dispatch : dispatchList) { dispatches.add(dispatch); try { barrier.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (BrokenBarrierException e) { throw new RuntimeException(e); } } }); T2.start(); } public static void check () { Order order = orders.remove(0 ); Dispatch dispatch = dispatches.remove(0 ); System.out.println("执行对账操作,order为:" + order + ",dispatch为" + dispatch); System.out.println("对账结果写入数据库,order为:" + order + ",dispatch为" + dispatch); } }
1 2 3 4 执行对账操作,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 对账结果写入数据库,order为:Order(id=1 , money=10 ),dispatch为Dispatch(id=1 , money=6 ) 执行对账操作,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 ) 对账结果写入数据库,order为:Order(id=2 , money=12 ),dispatch为Dispatch(id=2 , money=8 )
CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别:CountDownLatch 主要用来解决一个线程等待多个线程的场景 ,而 CyclicBarrier 是一组线程之间互相等待 。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过 。但** CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外, CyclicBarrier 还可以设置回调函数**。
ReadWriteLock实践 读多写少场景下,适用读写锁保证数据获取的安全和性能,例如缓存元数据、缓存基础数据等。
读写锁是什么 读写锁,某一个语言特有的,是一中设计的思想,所有的读写锁都遵守以下三条基本原则:
允许多个线程同时读共享变量
只允许一个线程写共享变量
如果一个写线程正在执行写操作,此时禁止读线程读共享变量
读写锁与互斥锁的一个重要区别就是读写锁允许多个线程同时读共享变量 ,而互斥锁是不允许 的 ,这是读写锁在读多写少场景下性能优于互斥锁的关键。但读写锁的写操作是互斥的,当一个线程在写共享变量的时候,是不允许其他线程执行写操作和读操作。
自定义一个缓存 实践一下,用 ReadWriteLock 实现一个缓存工具类。缓存的数据保存在 Cache 类内部的 HashMap 里面,HashMap 不是线程安全的,这里我们使用读写锁 ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是 ReentrantReadWriteLock,是支持可重入的。通过 rwl 创建了一把读锁和一把写锁。
Cache 这个工具类,提供了两个方法,一个是读缓存方法 get(),另一个是写缓存方法 put()。读缓存需要用到读锁,是 try{}finally{}这个格式。写缓存则需要用到写锁,写锁的使用和读锁是类似的。
假设缓存的源头是数据库,如果缓存中没有缓存目标对象,那么就需要从数据库中加载,然后写入缓存,写缓存需要用到写锁,可以调用 w.lock() 来获取写锁。需要注意的是,在获取写锁之后,不能直接去查询数据库,而是在重新验证了一次缓存中是否存在,因为有可能已经有线程从数据库把这条数据写入缓存了,再次验证如果还是不存在,当前线程再执行从数据库写入缓存的操作 。比如高并发的场景下,有可能会有多线程竞争写锁。假设缓存是空的,没有缓存任何东西,如果此时有三个线程 T1、T2 和 T3 同时调用 get() 方法,并且参数 key 也是相同的。那么它们会同时执行到代码w.lock();处,但此时只有一个线程能够获得写锁,假设是线程 T1,线程 T1 获取写锁之后查询数据库并更新缓存,最终释放写锁。此时线程 T2 和 T3 会再有一个线程能够获取写锁,假设是 T2,如果不采用再次验证的方式,此时 T2 会再次查询数据库。T2 释放写锁之后,T3 也会再次查询一次数据库。而实际上线程 T1 已经把缓存的值设置好了,T2、T3 完全没有必要再次查询数据库。所以,再次验证的方式,能够避免高并发场景下重复查询数据的问题 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;class Cache { final Map<Long,User> m = new HashMap<>(); final ReadWriteLock rwl = new ReentrantReadWriteLock(); final Lock r = rwl.readLock(); final Lock w = rwl.writeLock(); public User get (Long key) { User user = null ; r.lock(); try { user = m.get(key); } finally { r.unlock(); } if (user != null ) { return user; } w.lock(); try { user = m.get(key); if (user == null ){ user = queryUserById(key); m.put(key, user); } } finally { w.unlock(); } return user; } public User put (Long key, User user) { w.lock(); try { user = m.get(key); if (user == null ){ user = queryUserById(key); m.put(key, user); } } finally { w.unlock(); } return user; } public User queryUserById (Long id) { return new User(id, "Chris" ); } }
StampedLock使用实践 使用实践 ReadWriteLock 支持两种模式:一种是读锁,一种是写锁。而 StampedLock 支持三种模式,分别是:写锁 、悲观读锁 和乐观读 。其中,写锁、悲观读锁的语义和 ReadWriteLock 的写锁、读锁的语义非常类似,允许多个线程同时获取悲观读锁,但是只允许一个线程获取写锁,写锁和悲观读锁是互斥的 。不同的是:StampedLock 里的写锁和悲观读锁加锁成功之后,都会返回一个 stamp;然后解锁的时候,需要传入这个 stamp 。相关的示例代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final StampedLock sl = new StampedLock(); long stamp = sl.readLock();try { } finally { sl.unlockRead(stamp); } long stamp = sl.writeLock();try { } finally { sl.unlockWrite(stamp); }
StampedLock 的性能之所以比 ReadWriteLock 还要好,其关键是 StampedLock 支持乐观读的方式。ReadWriteLock 支持多个线程同时读,但是当多个线程同时读的时候,所有的写操作会被阻塞 ;而 StampedLock 提供的乐观读,是允许一个线程获取写锁的,也就是说不是所有的写操作都被阻塞 。这里是乐观读,而不是乐观读锁,乐观读这个操作是无锁的,所以相比较 ReadWriteLock 的读锁,乐观读的性能更好一些 。
下面模拟计算两点之间距离的场景实践一下:在 distanceFromOrigin() 这个方法中,首先通过调用 tryOptimisticRead() 获取了一个 stamp,这里的 tryOptimisticRead() 就是我们前面提到的乐观读。之后将共享变量 x 和 y 读入方法的局部变量中,不过需要注意的是,由于 tryOptimisticRead() 是无锁的,所以共享变量 x 和 y 读入方法局部变量时,x 和 y 有可能被其他线程修改了。因此最后读完之后,还需要再次验证一下是否存在写操作,这个验证操作是通过调用 validate(stamp) 来实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import java.util.concurrent.locks.StampedLock;class Point { private int x, y; final StampedLock sl = new StampedLock(); int distanceFromOrigin () { long stamp = sl.tryOptimisticRead(); int curX = x; int curY = y; if (!sl.validate(stamp)){ stamp = sl.readLock(); try { curX = x; curY = y; } finally { sl.unlockRead(stamp); } } return (int ) Math.sqrt(curX * curX + curY * curY); } }
如果执行乐观读操作的期间,存在写操作,会把乐观读升级为悲观读锁。这个做法挺合理的,否则你就需要在一个循环里反复执行乐观读,直到执行乐观读操作的期间没有写操作(只有这样才能保证 x 和 y 的正确性和一致性),而循环读会浪费大量的 CPU。升级为悲观读锁,代码简练且不易出错,建议在具体实践时也采用这样的方法。
注意事项 StampedLock 在命名上并没有增加 Reentrant,所以表明 StampedLock 是不可重入的 。StampedLock 的悲观读锁、写锁都不支持条件变量 。如果线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升 。例如下面的代码中,线程 T1 获取写锁之后将自己阻塞,线程 T2 尝试获取悲观读锁,也会阻塞;如果此时调用线程 T2 的 interrupt() 方法来中断线程 T2 的话,你会发现线程 T2 所在 CPU 会飙升到 100%。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.locks.LockSupport;import java.util.concurrent.locks.StampedLock;public class StampedLockInterruptTest { public static void main (String[] args) throws InterruptedException { final StampedLock lock = new StampedLock(); Thread T1 = new Thread(()->{ lock.writeLock(); LockSupport.park(); }); T1.start(); Thread.sleep(100 ); Thread T2 = new Thread(()-> lock.readLock() ); T2.start(); Thread.sleep(100 ); T2.interrupt(); T2.join(); } }
所以,使用 StampedLock 一定不要调用中断操作 ,如果需要支持中断功能,一定**使用可中断的悲观读锁 ****readLockInterruptibly() ****和写锁 ****writeLockInterruptibly()**。
Semaphore实践 基本概念 Semaphore是信号量,比如我们开车时在等的红绿灯,红灯停,绿灯行,根据红绿灯的信号,车和行人决定做出等待还是前行的行为。信号量模型可以简单概括为:一个计数器,一个等待队列,三个方法。在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问它们,这三个方法分别是:init()、down() 和 up()。
init():设置计数器的初始值。
down():计数器的值减 1;如果此时计数器的值小于 0,则当前线程将被阻塞,否则当前线程可以继续执行。
up():计数器的值加 1;如果此时计数器的值小于或者等于 0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。
上述三个方法都是原子性的,并且这个原子性是由信号量模型的实现方保证的。在 Java SDK 里面,信号量模型是由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。
信号量使用 举一个累加器的例子,count+=1 操作是个临界区,只允许一个线程执行,也就是说要保证互斥。用信号量控制互斥的方法,就像我们用互斥锁一样,只需要在进入临界区之前执行一下 down() 操作,退出临界区之前执行一下 up() 操作。下面的实践中,acquire() 就是信号量里的 down() 操作,release() 就是信号量里的 up() 操作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import java.util.concurrent.Semaphore;public class SemaphoreBasicTest { static int count = 0 ; static final Semaphore s = new Semaphore(1 ); public static void main (String[] args) throws InterruptedException { addOne(); System.out.println(count); addOne(); System.out.println(count); } static void addOne () throws InterruptedException { s.acquire(); try { count+=1 ; } finally { s.release(); } } }
假设两个线程 T1 和 T2 同时访问 addOne() 方法,当它们同时调用 acquire() 的时候,由于 acquire() 是一个原子操作,所以只能有一个线程(假设 T1)把信号量里的计数器减为 0 ,另外一个线程(T2)则是将计数器减为 -1 。对于线程 T1,信号量里面的计数器的值是 0,大于等于 0,所以****线程 T1 会继续执行 ;对于线程 T2,信号量里面的计数器的值是 -1,小于 0(自己想要执行就得大于0 ),按照信号量模型里对 down() 操作的描述,****线程 T2 将被阻塞 。所以此时只有线程 T1 会进入临界区执行count+=1 ;。当线程 T1 执行 release() 操作,也就是 up() 操作的时候,信号量里计数器的值是 -1,加 1 之后的值是 0,小于等于 0(想要唤醒其他线程就得小于等于0 ) ,按照信号量模型里对 up() 操作的描述,此时等待队列中的 T2 将会被唤醒 。于是 T2 在 T1 执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。
实践实现一个限流器 有 Java SDK 里面提供了 Lock,还要提供一个 Semaphore 的原因在于,实现一个互斥锁,仅仅是 Semaphore 的部分功能,Semaphore 还有一个功能是 Lock 不容易实现的,那就是:Semaphore 可以允许多个线程访问一个临界区。比如各种池化资源,例如连接池、对象池、线程池等等,比如数据库连接池,在同一时刻,一定是允许多个线程同时使用连接池的,当然,每个连接在被释放前,是不允许其他线程使用的。
我们实践一下通过Semaphore 创建一个对象池 ,也就是指的是一次性创建出多个对象,之后所有的线程重复利用这些对象,当然对象在被释放前,也是不允许其他线程使用的 。对象池,可以用 List 保存实例对象,限流器的限流指的是不允许多于 N 个线程同时进入临界区,使用信号量解决这个问题。信号量的计数器,在上面的例子中,我们设置成了 1,这个 1 表示只允许一个线程进入临界区,但如果我们把计数器的值设置成对象池里对象的个数 N,就可以解决对象池的限流问题了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import java.util.List;import java.util.concurrent.Semaphore;import java.util.function.Function;class ObjPool <T , R > { final List<T> pool; final Semaphore semaphore; ObjPool(int size, List<T> list){ pool = list; semaphore = new Semaphore(size); } R exec (Function<T,R> func) throws InterruptedException { T t = null ; semaphore.acquire(); try { t = pool.remove(0 ); return func.apply(t); } finally { pool.add(t); semaphore.release(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;@Data @AllArgsConstructor @NoArgsConstructor public class User { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import java.util.ArrayList;import java.util.List;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class SemaphoreTest { public static void main (String[] args) throws InterruptedException { List<User> list = new ArrayList<>(); list.add(new User(1L , "Chris" )); list.add(new User(2L , "Zhangsan" )); list.add(new User(3L , "John" )); ObjPool<User, String> pool = new ObjPool<User, String>(3 , list); ExecutorService executor = Executors.newFixedThreadPool(3 ); for (int i = 0 ; i < 3 ; i++) { executor.execute(() -> { try { String result = pool.exec(param -> { try { Thread.sleep(3000 ); return param.getName(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println(result); } catch (InterruptedException e) { throw new RuntimeException(e); } }); } } }
用一个 List来保存对象实例,用 Semaphore 实现限流器。关键的代码是 ObjPool 里面的 exec() 方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用 acquire() 方法(与之匹配的是在 finally 里面调用 release() 方法),假设对象池的大小是 3,信号量的计数器初始化为 3 ,那么前 3 个线程调用 acquire() 方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在 acquire() 方法上 。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过 pool.remove(0) 实现的),分配完之后会执行一个回调函数 func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过 pool.add(t) 实现的),同时调用 release() 方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于 0,那么说明有线程在等待,此时会自动唤醒等待的线程。
不变性(Immutability)模式 不变性(Immutability)模式就是对象一旦被创建之后,状态就不再发生变化,即变量一旦被赋值,就不允许修改了(没有写操作),没有修改操作,也就是保持了不变性。
实现具备不可变性的类 将一个类所有的属性都设置成 final 的,并且只允许存在只读方法,那么这个类基本上就具备不可变性了 。更严格的做法是这个类本身也是 final 的,也就是不允许继承 。因为子类可以覆盖父类的方法,有可能改变不可变性。
Java SDK 里很多类都具备不可变性,例如经常用到的 String 和 Long、Integer、Double 等基础类型的包装类都具备不可变性 ,这些对象的线程安全性都是靠不可变性来保证的 ,它们都严格遵守不可变类的三点要求:类和属性都是 final 的,所有方法均是只读的 。
例如String 这个类以及它的属性 value[]都是 final 的,而 replace() 方法的实现,就的确没有修改 value[],而是将替换后的字符串作为返回值返回了 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public String replace (char oldChar, char newChar) { if (oldChar != newChar) { int len = value.length; int i = -1 ; char [] val = value; while (++i < len) { if (val[i] == oldChar) { break ; } } if (i < len) { char buf[] = new char [len]; for (int j = 0 ; j < i; j++) { buf[j] = val[j]; } while (i < len) { char c = val[i]; buf[i] = (c == oldChar) ? newChar : c; i++; } return new String(buf, true ); } } return this ; }
如果具备不可变性的类,需要提供类似修改的功能,具体就是创建一个新的不可变对象 ,这是与可变对象的一个重要区别,可变对象往往是修改自己的属性 。所有的修改操作都创建一个新的不可变对象,如果不利用缓存,则十分浪费内存,因此引出了缓存的设计 。
享元模式避免创建重复对象 利用享元模式可以减少创建对象的数量,从而减少内存占用。Java中 Long、Integer、Short、Byte 等这些基本数据类型的包装类都用到了享元模式 。享元模式本质上其实就是一个对象池,利用享元模式创建对象的逻辑:创建之前,首先去对象池里看看是不是存在;如果已经存在,就利用对象池里的对象;如果不存在,就会新创建一个对象,并且把这个新创建出来的对象放进对象池里 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 private static class IntegerCache { static final int low = -128 ; static final int high; static final Integer cache[]; static { int h = 127 ; String integerCacheHighPropValue = sun.misc.VM.getSavedProperty("java.lang.Integer.IntegerCache.high" ); if (integerCacheHighPropValue != null ) { try { int i = parseInt(integerCacheHighPropValue); i = Math.max(i, 127 ); h = Math.min(i, Integer.MAX_VALUE - (-low) -1 ); } catch ( NumberFormatException nfe) { } } high = h; cache = new Integer[(high - low) + 1 ]; int j = low; for (int k = 0 ; k < cache.length; k++) cache[k] = new Integer(j++); assert IntegerCache.high >= 127 ; } private IntegerCache () {} } public static Integer valueOf (int i) { if (i >= IntegerCache.low && i <= IntegerCache.high) return IntegerCache.cache[i + (-IntegerCache.low)]; return new Integer(i); }
需要注意的是:基本上所有的基础类型的包装类都不适合做锁,因为它们内部用到了享元模式,这会导致看上去私有的锁,其实是共有的 ,实践一下:
1 2 3 4 5 6 7 8 9 10 11 12 public class Student { public Integer num = Integer.valueOf(1 ); public void test () throws InterruptedException { synchronized (num) { Thread.sleep(3000 ); System.out.println("我是Student~" ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 public class User { public Integer num = Integer.valueOf(1 ); public void test () throws InterruptedException { synchronized (num) { Thread.sleep(3000 ); System.out.println("我是User~" ); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import java.util.concurrent.*;public class IntegerLockTest { public static void main (String[] args) { Student student = new Student(); User user = new User(); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { try { student.test(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.execute(() -> { try { user.test(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.shutdown(); } }
执行了6s,说明同一时刻,只有一个线程获取到了Integer类型的对象1的锁。
使用不可变性注意事项
对象的所有属性都是 final 的,并不能保证不可变性
不可变对象也需要正确发布
Java 中final 修饰的属性一旦被赋值,就不可以再修改,但是如果属性的类型是普通对象,那么这个普通对象的属性是可以被修改的。
1 2 3 4 5 6 7 8 9 10 @Data @AllArgsConstructor @NoArgsConstructor public class Student02 { private Long id; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor public class School { final Student02 student; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class IntegerLockTest02 { public static void main (String[] args) { School school = new School(new Student02(1L , "Chris" ), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { school.student.setId(2L ); }); pool.execute(() -> { school.student.setId(3L ); }); pool.shutdown(); System.out.println(school.getStudent().getId()); } }
实践证明:虽然School类中Student02为final类型,但Student02类型的成员变量student的值可以在多个线程中被访问和修改 。所以,在使用 Immutability 模式的时候一定要确认保持不变性的边界在哪里,是否要求属性对象也具备不可变性 。
不可变对象虽然是线程安全的,但是并不意味着引用这些不可变对象的对象就是线程安全的。实践一下:Student03 具备不可变性,线程安全,但是类 School02 并不是线程安全的,类 School02 中持有对 Student03 的引用 student,对 student 这个引用的修改在多线程中并不能保证可见性和原子性。
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor final public class Student03 { final private Long id; final private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor public class School02 { Student03 student; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class IntegerLockTest03 { public static void main (String[] args) { School02 school = new School02(new Student03(1L , "Chris" ), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { Student03 lucky = new Student03(2L , "Lucky" ); school.setStudent(lucky); }); pool.execute(() -> { Student03 jack = new Student03(3L , "Jack" ); school.setStudent(jack); }); pool.shutdown(); System.out.println(school.getStudent()); } }
1 Student03(id=2 , name=Lucky)
实践可知:多线程会修改School02中对Student03 student的引用 。
如果你的程序仅仅需要 student 保持可见性,无需保证原子性,那么可以将 student 声明为 volatile 变量,这样就能保证可见性。如果你的程序需要保证原子性,那么可以通过原子类来实现,实践一下:
1 2 3 4 5 6 7 8 9 10 11 12 import lombok.AllArgsConstructor;import lombok.Data;@Data @AllArgsConstructor final public class Student03 { final private Long id; final private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 import lombok.AllArgsConstructor;import lombok.Data;import java.util.concurrent.atomic.AtomicReference;@Data @AllArgsConstructor public class School02 { final AtomicReference studentAtomicReference; private String name; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicReference;public class IntegerLockTest03 { public static void main (String[] args) throws InterruptedException { School02 school = new School02(new AtomicReference<>(new Student03(1L ,"Chris" )), "阳光小学" ); ThreadPoolExecutor pool = new ThreadPoolExecutor(2 , 3 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); Student03 preStudent = (Student03) school.getStudentAtomicReference().get(); pool.execute(() -> { Student03 lucky = new Student03(2L , "Lucky" ); boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, lucky); System.out.println("执行任务1" ); System.out.println(flag); System.out.println(school.getStudentAtomicReference().get()); }); pool.execute(() -> { Student03 jack = new Student03(3L , "Jack" ); boolean flag = school.getStudentAtomicReference().compareAndSet(preStudent, jack); System.out.println("执行任务2" ); System.out.println(flag); System.out.println(school.getStudentAtomicReference().get()); }); pool.shutdown(); } }
1 2 3 4 5 6 执行任务1 true Student03 (id=2 , name=Lucky) 执行任务2 false Student03 (id=2 , name=Lucky)
实践证明:当两个线程都尝试更新studentAtomicReference时,一个线程使用compareAndSet函数更新该成员变量成功后,另一个线程调用compareAndSet函数会失败,因为此时预期的值不再是preStudent,而是首先更新的线程修改后的值 。
具备不变性的对象,只有一种状态,这个状态由对象内部所有的不变属性共同决定。其实还有一种更简单的不变性对象,那就是无状态。无状态对象内部没有属性,只有方法 。除了无状态的对象,你可能还听说过无状态的服务、无状态的协议等等。无状态有很多好处,最核心的一点就是性能。在多线程领域,无状态对象没有线程安全问题,无需同步处理,自然性能很好;在分布式领域,无状态意味着可以无限地水平扩展,所以分布式领域里面性能的瓶颈一定不是出在无状态的服务节点上。
Guarded Suspension模式:等待唤醒机制实践 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 import lombok.Data;import java.util.Map;import java.util.concurrent.BlockingDeque;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.LinkedBlockingDeque;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;import java.util.function.Predicate;@Data class GuardedObject <T > { T obj; final Lock lock = new ReentrantLock(); final Condition done = lock.newCondition(); final int timeout=2 ; final static Map<Object, GuardedObject> gos = new ConcurrentHashMap<>(); final static BlockingDeque<GuardedObject> historyGos = new LinkedBlockingDeque<>(); static <K> GuardedObject create (K key) { GuardedObject go = new GuardedObject(); gos.put(key, go); return go; } static <K, T> void fireEvent (K key, T obj) { GuardedObject go = gos.remove(key); if (go != null ){ go.onChanged(obj); } } T get (Predicate<T> p) { lock.lock(); try { while (!p.test(obj)) { done.await(timeout, TimeUnit.SECONDS); } } catch (InterruptedException e) { throw new RuntimeException(e); }finally { lock.unlock(); } return obj; } void onChanged (T obj) { lock.lock(); try { this .obj = obj; historyGos.add(this ); done.signalAll(); } finally { lock.unlock(); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;public class GuardedSuspensionTest { public static void main (String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(5 , 10 , 60L , TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 )); pool.execute(() -> { handleWebReq(1L , "第一个任务的消息" ); }); pool.execute(() -> { try { Thread.sleep(2000 ); } catch (InterruptedException e) { throw new RuntimeException(e); } handleWebReq(2L , "第二个任务的消息" ); }); pool.execute(() -> { handleWebReq(3L , "第三个任务的消息" ); }); pool.execute(() -> { Message message1 = null ; try { while (true ) { GuardedObject guardedObject = GuardedObject.historyGos.poll(3 , TimeUnit.SECONDS); if (guardedObject == null ) { System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费" ); break ; } message1 = (Message) guardedObject.get(t -> t != null ); System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent()); } } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.execute(() -> { Message message1 = null ; try { while (true ) { GuardedObject guardedObject = GuardedObject.historyGos.poll(3 , TimeUnit.SECONDS); if (guardedObject == null ) { System.out.println("当前线程为:" + Thread.currentThread().getName() + ", 3s内没有消息,停止消费" ); break ; } message1 = (Message) guardedObject.get(t -> t != null ); System.out.println("当前线程为:" + Thread.currentThread().getName() + ",接收到消息:" + message1.getContent()); } } catch (InterruptedException e) { throw new RuntimeException(e); } }); pool.shutdown(); } public static void handleWebReq (Long id, String msgContent) { Message msg = new Message(id, msgContent); GuardedObject<Message> go = GuardedObject.create(id); send(msg); } public static void onMessage (Message msg) { GuardedObject.fireEvent(msg.getId(), msg); } public static void send (Message msg) { System.out.println("当前线程为:" + Thread.currentThread().getName() + "," + "发送消息:" + msg.getContent()); onMessage(msg); } }
1 2 3 4 5 6 7 8 当前线程为:pool-1 -thread-1 ,发送消息:第一个任务的消息 当前线程为:pool-1 -thread-3 ,发送消息:第三个任务的消息 当前线程为:pool-1 -thread-5 ,接收到消息:第三个任务的消息 当前线程为:pool-1 -thread-4 ,接收到消息:第一个任务的消息 当前线程为:pool-1 -thread-2 ,发送消息:第二个任务的消息 当前线程为:pool-1 -thread-5 ,接收到消息:第二个任务的消息 当前线程为:pool-1 -thread-4 , 3s内没有消息,停止消费 当前线程为:pool-1 -thread-5 , 3s内没有消息,停止消费
实践证明:3个线程生产消息,2个线程消费消息,保证了一个消息只有一个线程消费,不存在同一个消息被重复消费的情况。