java并发编程2.3并发工具类——Semaphore、Exchange、FutureTask

2022-10-21 13:57:56

环境:

jdk1.8

摘要说明:

上一章节主要讲述两个工具类CountDownLatch和CyclicBarrier使用及比较;

本章节主要讲述其他几个并发工具类的使用:
Semaphore:主要用来控制同时访问某个特定资源的线程数量,用在流量控制;

Exchange:主要用来进行两个线程间的数据交换;

FutureTask:前面已经介绍过,主要结合Callable、Future来进行线程结果返回;

步骤:

1.Semaphore的使用

Semaphore也被称作计数信号量。从概念上讲,信号量维护一组许可证。如果需要,每个acquire()方法,直到获得许可,然后获取许可。每个release()都添加一个许可证,潜在地释放一个阻塞的等待者。但是,没有使用实际的许可证对象;这个信号量只保留一个可用数字的计数,并相应地进行操作。

信号量通常用于限制线程的数量,而不是访问某些(物理或逻辑)资源。下面这个线程池的示例就可以很好的讲述这一点:

 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x))
       available.release();
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

上述示例在获取一个连接之前,每个线程必须从信号量获得一个许可证,以确保线程池中有连接是可用的。当线程处理完业务后后,连接被返回到池中,并向信号量返回一个许可证,允许另一个线程获取该连接。

请注意,在调用acquire()时没有持有同步锁,因为这会阻止将一个连接返回到池中。信号量封装了限制对池的访问所需的同步,与维护池本身的一致性所需的同步分开。

注:Semaphore的构造函数可选地接受公平性参数。当设置为false时,Semaphore不保证线程获得许可的顺序。特别地,barging(插队)是允许的,也就是说,一个调用acquire()的线程可以在一个已经在等待的线程之前被分配一个许可权——从逻辑上讲,这个新线程把自己放在等待线程队列的前端。当公平性设置为true时,信号量保证调用任何获取方法的线程都被选中,以按照它们对这些方法的调用被处理的顺序(先进先出;先进先出)。请注意,FIFO排序必须应用于这些方法中的特定内部执行点。因此,一个线程可能在另一个线程之前调用acquire,但是在另一个线程之后到达排序点,并且在从方法返回时也是如此。还要注意的是,untimetryacquire方法不遵守公平性设置,但是会接受任何可用的许可。
通常,用于控制资源访问的信号量应该被初始化为fair,以确保没有线程在访问资源时被耗尽。当将信号量用于其他类型的同步控制时,非公平排序的吞吐量优势常常超过公平性考虑。
Semaphore还提供了同时获取和发布多个许可证的便利方法。在使用这些方法时,如果没有将公平性设置为true,就会增加无限期延迟的风险。
内存一致性影响:在调用“release”方法(如release())之前的线程中的操作发生在成功的“acquire”方法(如acquire()之后的操作发生在另一个线程中)。

二进制信号量:一个初始化为1的信号量,它的使用使得它最多只有一个可用的许可,可以作为互斥锁。这通常称为二进制信号量,因为它只有两种状态:一种是可用的许可证,另一种是零可用的许可证。当以这种方式使用时,二进制信号量具有这样的属性(与许多锁实现不同),即“锁”可以由非所有者的线程释放(因为信号量没有所有权的概念)。这在某些特定上下文中非常有用,比如死锁恢复。

示例如下:

package pers.cc.curriculum2.semaphore;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import pers.cc.tools.SleepTools;

/**
 * 主要用来测试计数信号量的使用
 * 
 * @author cc
 *
 */
public class SemaphoreTest {
    /**
     * 定义数量为10
     */
    private static final int MAX_AVAILABLE = 5;

    /**
     * 设置计数信号量获取顺序为false,即不保证线程获得许可的顺序
     */
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, false);

    /**
     * 定义数量为1,即为互斥锁
     */
    private static final int MAX_AVAILABLE_2 = 1;

    private final Semaphore availableLock = new Semaphore(MAX_AVAILABLE_2, true);

    /**
     * 等待获取许可证,可获取多个
     * 
     * @throws InterruptedException
     */
    public void faliExcepleam() throws InterruptedException {
        for (int i = 0; i < 8; i++) {
            if (i == 5) {
                new Thread() {
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getId()
                                    + "准备完成");
                            // 获取3个许可证
                            available.acquire(3);
                            System.out.println(Thread.currentThread().getId()
                                    + "获取许可证");
                            SleepTools.ms(200);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        // 释放3个许可证
                        available.release(3);
                    }
                }.start();
            }
            else {
                new Thread() {
                    public void run() {
                        try {
                            System.out.println(Thread.currentThread().getId()
                                    + "准备完成");
                            available.acquire();
                            System.out.println(Thread.currentThread().getId()
                                    + "获取许可证");
                            SleepTools.ms(250);
                        }
                        catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        available.release();
                        System.out.println(Thread.currentThread().getId()
                                + "释放许可证");
                    }
                }.start();
            }
            // 保证线程按序等待
            SleepTools.ms(10);
        }
        SleepTools.ms(500);

    }

    /**
     * 互斥
     * 
     * @throws InterruptedException
     */
    public void lock() throws InterruptedException {
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getId() + "准备完成");
                    availableLock.acquire();
                    System.out
                            .println(Thread.currentThread().getId() + "获取许可证");
                    SleepTools.ms(500);
                    System.out.println(Thread.currentThread().getId() + "线程结束"
                            + System.currentTimeMillis());
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
        new Thread() {
            public void run() {
                try {
                    System.out.println(Thread.currentThread().getId()
                            + "准备获取许可证" + System.currentTimeMillis());
                    // 线程在给定的等待时间内可用且当前线程未被中断,则从该信号量获得一个许可或者超时直接执行。
                    availableLock.tryAcquire(300, TimeUnit.MILLISECONDS);
                    System.out.println(Thread.currentThread().getId() + "获取许可证"
                            + System.currentTimeMillis());
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

    }

    public static void main(String[] args) throws InterruptedException {
        SemaphoreTest semaphoreTest = new SemaphoreTest();
        System.out
                .println("—————————————————————测试faliExcepleam—————————————————————");
        semaphoreTest.faliExcepleam();
        SleepTools.ms(1000);
        System.out.println("—————————————————————测试lock—————————————————————");
        semaphoreTest.lock();
    }
}

执行结果如下:

—————————————————————测试faliExcepleam—————————————————————
10准备完成
10获取许可证
11准备完成
11获取许可证
12准备完成
12获取许可证
13准备完成
13获取许可证
14准备完成
14获取许可证
15准备完成
16准备完成
17准备完成
10释放许可证
11释放许可证
12释放许可证
15获取许可证
13释放许可证
16获取许可证
14释放许可证
17获取许可证
16释放许可证
17释放许可证
—————————————————————测试lock—————————————————————
18准备完成
18获取许可证
19准备获取许可证1550471785194
19获取许可证1550471785495
18线程结束1550471785694

总结:

构造方法:

Semaphore(int permits):创建具有给定数量的许可和非公平公平设置的信号量。

Semaphore(int permits, boolean fair):创建具有给定数量的许可和给定公平性设置的信号量。

常用方法:

  • void    acquire():从这个信号量获取许可,阻塞,直到一个信号量可用,或者线程被中断。
  • void    acquire(int permits) :从这个信号量获取许可,阻塞,直到permits个信号量可用,或者线程被中断。
  • void    release():释放一个许可证,将其返回给信号量。
  • void    release(int permits):释放permits个许可证,将其返回给信号量。
  • boolean    tryAcquire(long timeout, TimeUnit unit):如果一个信号量在给定的等待时间内可用且当前线程未被中断,则从该信号量获得许可;若超时将取消堵塞;

2.Exchange的使用

Exchange:一个同步点,在该点上线程可以成对地交换元素。每个线程在进入exchange方法时显示一些对象,与伙伴线程匹配,并在返回时接收伙伴的对象。交换器可以看作是同步队列的双向形式。交换器可用于遗传算法和管道设计等应用。

package pers.cc.curriculum2.exchanger;

import java.util.concurrent.Exchanger;

public class ExchangerTest {
    // 创建交换器
    private static Exchanger < Long > exchanger = new Exchanger < Long >();

    /**
     * 将当前线程id塞入交换器,同时获取交换器中的线程id
     */
    public void update() {
        new Thread() {
            public void run() {
                try {
                    Long i = exchanger.exchange(Thread.currentThread().getId());
                    System.out.println(Thread.currentThread().getId()
                            + "进行交换的线程:" + i);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();

    }

    public static void main(String[] args) {
        ExchangerTest exchangerTest = new ExchangerTest();
        for (int i = 0; i < 5; i++) {
            exchangerTest.update();
        }
    }
}

上述运行的结果如下:

10进行交换的线程:11
12进行交换的线程:13
13进行交换的线程:12
11进行交换的线程:10

从结果我们可以看出;交换器是两两线程同步交换结果,若等不到交换线程则堵塞;

总结:

构造方法:

Exchanger():创建一个交换器;

常用方法:

  • V    exchange(V x):等待另一个线程到达这个交换点(除非当前线程被中断),然后将给定的对象传输给它,作为回报接收它的对象。
  • V    exchange(V x, long timeout, TimeUnit unit):等待另一个线程到达此交换点(除非当前线程被中断或指定的等待时间已过),然后将给定的对象传输给它,并接收其返回的对象。

3.FutureTask

首先我们看下下面这个类图:

FutureTask 可取消的异步计算。这个类提供了Future的基本实现,提供了启动和取消计算、查询计算是否完成以及检索计算结果的方法。计算完成后才能检索结果;如果计算尚未完成,get方法将阻塞。一旦计算完成,就不能重新启动或取消计算(除非使用runAndReset()调用计算)。

FutureTask可用于包装可调用或可运行的对象。因为FutureTask实现Runnable,所以可以将FutureTask提交给执行程序执行。

除了作为独立类使用外,该类还提供了在创建自定义任务类时可能有用的受保护功能。

总结:

构造方法:

FutureTask(Callable<V> callable):创建一个FutureTask,它将在运行时执行给定的可调用项。

FutureTask(Runnable runnable, V result):创建一个FutureTask,它将在运行时执行给定的Runnable,并安排get在成功完成时返回给定的结果。

常用方法:

  • boolean    cancel(boolean mayInterruptIfRunning):试图取消此任务的执行。

    如果任务已经完成、已经取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时此任务尚未启动,则此任务永远不会运行。如果任务已经启动,那么mayInterruptIfRunning参数确定执行该任务的线程是否应该在尝试停止该任务时被中断。

    在此方法返回后,对Future.isDone()的后续调用将始终返回true。如果这个方法返回true,那么对future.iscancel()的后续调用将始终返回true。

  • boolean    isDone():如果该任务完成,返回true。完成可能是由于正常终止、异常或取消——在所有这些情况下,该方法将返回true。
  • V    get():如果需要,等待计算完成,然后检索其结果。
  • boolean    isCancelled():如果该任务在正常完成之前被取消,则返回true。

示例:

package pers.cc.curriculum2.futureTask;

import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

import pers.cc.tools.SleepTools;

/**
 * futureTask使用
 * 
 * @author cc
 *
 */
public class FutureTaskTest {

    /* 实现Callable接口,允许有返回值 */
    private static class UseCallable implements Callable < Integer > {

        private int sum;

        @Override
        public Integer call() throws Exception {
            System.out.println("Callable子线程开始计算");
            Thread.sleep(2000);
            for (int i = 0; i < 5000; i++) {
                sum = sum + i;
            }
            System.out.println("Callable子线程计算完成,结果=" + sum);
            return sum;
        }

    }

    public static void main(String[] args) throws InterruptedException,
            ExecutionException {

        UseCallable useCallable = new UseCallable();
        FutureTask < Integer > futureTask = new FutureTask < Integer >(
                useCallable);
        new Thread(futureTask).start();
        Random r = new Random();
        SleepTools.second(1);
        if (r.nextBoolean()) {// 随机决定是获得结果还是终止任务
            System.out.println("Get UseCallable result = " + futureTask.get());
        }
        else {
            System.out.println("中断计算");
            futureTask.cancel(true);
        }

    }
}

4.源码地址:

https://github.com/cc6688211/concurrent-study.git

  • 作者:叶落自飘零
  • 原文链接:https://blog.csdn.net/u010904188/article/details/87602836
    更新时间:2022-10-21 13:57:56