专业编程基础技术教程

网站首页 > 基础教程 正文

Java多线程六—JDK提供的线程同步的四个工具类

ccvgpt 2025-03-25 11:25:56 基础教程 19 ℃

同步工具类可以是任何一个对象。

阻塞队列可以用作同步工具类。

Java多线程六—JDK提供的线程同步的四个工具类

生产者现场往队列中存入任务,消费者线程从阻塞队列中获取任务。可以在某种程度上对生产者线程和消费者线程进行解耦。存入任务和获取任务速率不一致的时候,不会导致效率下降。

其他的同步工具类包括:信号量(Semaphore)、栅栏(Barrier、CyclicBarrier)、FutureTask以及闭锁(Latch、CountDownLatch)。

信号量 Semaphore

Semaphore 可以用来控制某些访问资源的操作数量,可以用作限流器,可以用来实现资源池。

Semaphore 管理一组虚拟的许可(permit),可以通过Semaphore构造函数指定。

Semaphore semp = new Semaphore(5);

执行操作时获得许可,使用以后释放许可。如果没有许可,则一直阻塞到有许可为止。通过 acquire() 可以获得一个许可,通过 release() 方法可以释放一个许可。

Semaphore 可以用作数据库连接池。可以构造一个固定长度的资源池,当池为空时,请求资源会失败。

Semaphore 也可以用来实现有界阻塞容器。比如以下代码。添加元素时,先获得一个许可,如果没有许可,则会阻塞直到有许可之后再进行元素添加;删除元素时,释放一个许可。

public class Test0514BoundedHashSet {

    private final Set set;

    private final Semaphore sem;

    public Test0514BoundedHashSet(int bound) {
        this.set = Collections.synchronizedSet(new HashSet());
        sem = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        // 获取一个许可,只有获取到许可才能添加元素
        sem.acquire();
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {
                sem.release();
            }
        }
    }

    public boolean remove(Object o) {
        boolean wasRemoved = set.remove(o);
        if (wasRemoved) {
            // 删除元素,则释放一个许可
            sem.release();
        }
        return wasRemoved;
    }
}

栅栏 Barrier

栅栏的作用时,等待所有线程到达栅栏位置,然后才继续执行。

例如:泡茶,要等待洗茶壶、洗茶杯以及拿茶叶都好了之后,才能执行泡茶动作。(也可以使用CompletableFuture实现)

另一个例子,几个家庭决定在某个地方集合:“所有人6:00在麦当劳碰头,到了以后要等其他人,之后再进行讨论”。

CyclicBarrier 可以让一定数量的参与方反复地在栅栏位置汇集,在并行迭代算法中非常有用。可以参考:王宝令老师的《Java并发编程实战》的“19 CountDownLatch和CyclicBarrier:如何让多线程步调一致?”的对账系统的例子。

而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有的线程都到达了栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏将被重置(parties数量重置为初始值)以便下次使用。

/**
 * 循环执行任务
 * 测试 CyclicBarrier
 * 两个任务比较耗时:查询运单库、查询订单哭
 * 需要执行这两个任务获取返回数据后执行check方法;
 * 

* 这里的优化点:执行check的时候,查询下一次的两个耗时任务:查询运单库、查询订单哭 * */ public class TestCyclicBarrier { // 执行回调的线程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() -> check()); }); public static void main(String[] args) { TestCyclicBarrier barrier = new TestCyclicBarrier(); barrier.checkAll(); } void check() { System.out.println(">>>> check"); } void checkAll() { // 循环查询订单库 Thread T1 = new Thread(() -> { while (true) { try { // 查询订单库 sleep(1000); System.out.println("查询订单哭"); // 等待 barrier.await(); System.out.println(">> count = " + barrier.getNumberWaiting()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); T1.start(); // 循环查询运单库 Thread T2 = new Thread(() -> { while (true) { try { // 查询订单库 System.out.println("查询运单库"); // 查询订单库 sleep(2000); // 等待 barrier.await(); System.out.println(">> count = " + barrier.getNumberWaiting()); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } } }); T2.start(); } }

FutureTask

FutureTask 表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable。当 FutureTask 进入完成状态后,会永远停留在这个状态上。

FutureTask 经常用于一些时间比较长的计算,计算结果在稍后使用,通过提前计算,减少等待结果需要的时间。

/**
 * 使用 FutureTask 来提前加载稍后需要的数据
 * 

* 提前加载数据 * future.get() 的异常处理需要注意 * */ public class Test0512Preloader { private final FutureTask future = new FutureTask(new Callable() { @Override public ProductInfo call() throws DataLoadException { return loadProductInfo(); } }); private final Thread thread = new Thread(future); public void start() { thread.start(); } public ProductInfo get() throws DataLoadException, InterruptedException { try { return future.get(); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof DataLoadException) { throw (DataLoadException) cause; } else { throw launderThrowable(cause); } } } /** * 异常处理封装方法 * * @param t * @return */ private RuntimeException launderThrowable(Throwable t) { if (t instanceof RuntimeException) { return (RuntimeException) t; } else if (t instanceof Error) { throw (Error) t; } else { throw new IllegalStateException("Not unchecked", t); } } public ProductInfo loadProductInfo() { return new ProductInfo(); } }

Latch 闭锁

闭锁相当于一扇门,在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门会打开并允许所有的线程通过。

同样,可以参考:王宝令老师的《Java并发编程实战》的“19 CountDownLatch和CyclicBarrier:如何让多线程步调一致?”的对账系统的例子。

另外一个例子:

public class TestCountDownLatch {

	public static void main(String[] args) {
		final CountDownLatch latch = new CountDownLatch(2);
		new Thread() {
			public void run() {
				try {
					System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
					Thread.sleep(3000);
					System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

			;
		}.start();

		new Thread() {
			public void run() {
				try {
					System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
					Thread.sleep(3000);
					System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}

			;
		}.start();

		try {
			System.out.println("等待2个子线程执行完毕...");
			latch.await();
			System.out.println("2个子线程已经执行完毕");
			System.out.println("继续执行主线程");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

输出

子线程Thread-0正在执行
等待2个子线程执行完毕...
子线程Thread-1正在执行
子线程Thread-0执行完毕
子线程Thread-1执行完毕
2个子线程已经执行完毕
继续执行主线程

栅栏用于等待其他线程,闭锁用于等待事件。

代码地址:
https://github.com/prepared48/Java-learning.git

说明:文中内容不完全整理自《Java并发编程实战》和王宝令老师的《Java并发编程实战》

Tags:

最近发表
标签列表