本文共 5549 字,大约阅读时间需要 18 分钟。
CountDownLatch
是一组线程等待其他的线程完成工作以后在执行,加强版join
await用来等待,countDown负责计数器的减一
package com.caojiulu;import java.util.concurrent.CountDownLatch;import com.caojiulu.SleepTools;/** *@author caojiulu * *类说明:演示CountDownLatch,有5个初始化的线程,6个扣除点, *扣除完毕以后,主线程和业务线程才能继续自己的工作 */public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); //初始化线程(只有一步,有4个) private static class InitThread implements Runnable{ @Override public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次; for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } //业务线程 private static class BusiThread implements Runnable{ @Override public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { //单独的初始化线程,初始化分为2步,需要扣减两次 new Thread(new Runnable() { @Override public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown();//每完成一步初始化工作,扣减一次 System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown();//每完成一步初始化工作,扣减一次 } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); }}
CyclicBarrier
让一组线程达到某个屏障,被阻塞,一直到组内最后一个线程达到屏障时,屏障开放,所有被阻塞的线程会继续运行CyclicBarrier(int parties)
CyclicBarrier(int parties, Runnable barrierAction),屏障开放,barrierAction定义的任务会执行
package com.caojiulu;import java.util.Map;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.CyclicBarrier;/** *@author caojiulu * *类说明:CyclicBarrier的使用 */public class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(5,new CollectThread()); private static ConcurrentHashMapresultMap = new ConcurrentHashMap<>();//存放子线程工作结果的容器 public static void main(String[] args) { for(int i=0;i<=4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } //负责屏障开放以后的工作 private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } //工作线程 private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId();//线程本身的处理结果 resultMap.put(Thread.currentThread().getId()+"",id); Random r = new Random();//随机决定工作线程的是否睡眠 try { if(r.nextBoolean()) { Thread.sleep(2000+id); System.out.println("Thread_"+id+" ....do something "); } System.out.println(id+"....is await"); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); } catch (Exception e) { e.printStackTrace(); } } }}
CountDownLatch和CyclicBarrier的区别
1、countdownlatch放行由第三者控制,CyclicBarrier放行由一组线程本身控制
2、countdownlatch放行条件》=线程数,CyclicBarrier放行条件=线程数
Semaphore
控制同时访问某个特定资源的线程数量,用在流量控制
以 前面数据库连接池的例子,改为Semaphore版本
package com.caojiulu;import java.sql.Connection;import java.util.LinkedList;import java.util.concurrent.Semaphore;/** *@author caojiulu * *类说明:演示Semaphore用法,一个数据库连接池的实现 */public class DBPoolSemaphore { private final static int POOL_SIZE = 10; private final Semaphore useful,useless;//useful表示可用的数据库连接,useless表示已用的数据库连接 public DBPoolSemaphore() { this. useful = new Semaphore(POOL_SIZE); this.useless = new Semaphore(0); } //存放数据库连接的容器 private static LinkedListpool = new LinkedList (); //初始化池 static { for (int i = 0; i < POOL_SIZE; i++) { pool.addLast(SqlConnectImpl.fetchConnection()); } } /*归还连接*/ public void returnConnect(Connection connection) throws InterruptedException { if(connection!=null) { System.out.println("当前有"+useful.getQueueLength()+"个线程等待数据库连接!!" +"可用连接数:"+useful.availablePermits()); useless.acquire(); synchronized (pool) { pool.addLast(connection); } useful.release(); } } /*从池子拿连接*/ public Connection takeConnect() throws InterruptedException { useful.acquire(); Connection conn; synchronized (pool) { conn = pool.removeFirst(); } useless.release(); return conn; } }
Exchange
两个线程间的数据交换
这个用得比较少,有需要了解的请再去查查例子,谢谢。
转载地址:http://axyyb.baihongyu.com/