并发实战题(一)

作者:一粟

实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用,需要一个轻量简洁的实现,大家看看下面的一个实现,然后可以自己写一个实现。

[code lang=”java”]
import java.util.Date;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

public class Test {

final static int MAX_QPS = 10;

final static Semaphore semaphore = new Semaphore(MAX_QPS);

public static void main (String … args) throws Exception {

Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

semaphore.release(MAX_QPS/2);

}

}, 1000, 500, TimeUnit.MILLISECONDS);

//lots of concurrent calls:100 * 1000
ExecutorService pool = Executors.newFixedThreadPool(100);

for (int i=100;i>0;i–) {

final int x = i;

pool.submit(new Runnable() {

@Override

public void run() {

for (int j=1000;j>0;j–) {

semaphore.acquireUninterruptibly(1);
remoteCall(x, j);

}

}

});

}

pool.shutdown();

pool.awaitTermination(1, TimeUnit.HOURS);

System.out.println("DONE");
}

private static void remoteCall(int i, int j) {
System.out.println(String.format("%s – %s: %d %d",new Date(),
Thread.currentThread(), i, j));
}

}
[/code]

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 并发实战题(一)

  • Trackback 关闭
  • 评论 (28)
  1. 这个更实际的例子甚好,值得思考是否有更好的实现。

      • Yole
      • 2013/03/07 10:46上午

      同意,有实例代码效果更好,无论例子简单复杂

    • 这个代码中,如果有一秒的请求很少或请求数根本就是0,那么下一秒的许可就可能因为定时release导致许可超过MAX_QPS个,下一秒的请求数就可以超过MAX_QPS次。

        • 杰iter
        • 2013/03/09 10:47上午

        不会把,在Semaphore初始化的时候许可的数量就已经固定了

      • 这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可。看API描述可知,这个值甚至可以是负数。

      • 对的,结果测试会出现大于max的调用次数。

        • Phoenix.
        • 2019/03/04 12:10下午

        是的,有不断堆积许可的问题,导致后续周期有可能超过设定的最大请求数,可以通过semaphore.drainPermits(); 先进行重置再release颁发许可, 重整了下例子,可以通过注释掉重置代码来重现上述问题。

        /**
        * 每秒最大请求数10,发起100个请求.
        */
        public class Test {

        final static int MAX_QPS = 10;

        final static Semaphore semaphore = new Semaphore(0);

        final static AtomicInteger UNIT_CALL_COUNT = new AtomicInteger(0);

        final static AtomicInteger TOTAL_TASK_COUNT = new AtomicInteger(0);

        public static void main(String… args) throws Exception {
        ScheduledExecutorService poolSchedule = Executors.newScheduledThreadPool(1);
        poolSchedule.scheduleAtFixedRate(new Runnable() {
        public void run() {
        semaphore.drainPermits();//重置许可为0
        semaphore.release(MAX_QPS);
        System.out.println(“*****UNIT_CALL_COUNT=” + UNIT_CALL_COUNT.getAndSet(0));
        }
        }, 0, 1000, TimeUnit.MILLISECONDS);

        long startTime = System.currentTimeMillis();
        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 100; i++) {
        if (i == 5) {
        Thread.sleep(1000); //模拟请求量不饱和的周期
        }
        final int n = i;
        pool.submit(new Runnable() {
        public void run() {
        semaphore.acquireUninterruptibly(1);
        remoteCall(n);
        }
        });
        }
        pool.shutdown();
        pool.awaitTermination(1, TimeUnit.HOURS);
        System.out.println(String.format("Done: TOTAL_TASK_COUNT=%d TOTAL_TIME=%d", TOTAL_TASK_COUNT.get(), (System.currentTimeMillis() – startTime)));

        poolSchedule.shutdown();
        }

        private static void remoteCall(int i) {
        UNIT_CALL_COUNT.incrementAndGet();
        TOTAL_TASK_COUNT.incrementAndGet();
        }

        }

    • Snway
    • 2013/03/07 10:26上午

    不错的例子,学习了!

    • 匿名
    • 2013/03/07 1:49下午

    why use MAX_QPS/2 instead of MAX_QPS?

    • you can use it . the post just a demo。

    • MAX_QPS/2 因为period的值是500,如果改成1000,可以设成MAX_QPS。

    • nicky
    • 2013/03/07 9:49下午

    semaphore.acquireUninterruptibly(1);这个是代表的将信号量-1么?

    • 是的,就是从当前Semaphore可用的许可中减一。

    • Snway
    • 2013/03/08 10:18上午

    前辈,有其他的示例吗?

    • 杰iter
    • 2013/03/09 11:01上午

    各位看这个是否可行?

    [code lang=”java”]
    private static int MAX_EXE_COUNT = 10;

    private static AtomicInteger count = new AtomicInteger(MAX_EXE_COUNT);

    public static void main(String[] args) throws InterruptedException,
    IOException
    {
    new Thread()
    {
    @Override
    public void run()
    {
    while (true)
    {
    try
    {
    Thread.sleep(1000);
    }
    catch (InterruptedException e)
    {
    }
    System.out.println("1秒过去了");
    count.set(MAX_EXE_COUNT);
    }
    }
    }.start();
    Executor e = Executors.newFixedThreadPool(100);
    for (int i = 0; i < 100; i++)
    {
    e.execute(new Runnable()
    {
    public void run()
    {
    while (count.getAndDecrement() <= 0)
    {
    }
    System.out.println(Thread.currentThread().getName()
    + "我调用了一次");
    }
    });
    }
    }
    [/code]

    • 可以尝试用Semaphore来实现,然后比较下两者之间的性能。

    • Snway
    • 2013/03/10 10:38下午

    丁 一 :
    这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可。看API描述可知,这个值甚至可以是负数。

    前辈,那该示例程序该如何调整一下,确保线程安全。

    • yl.w
    • 2013/03/24 11:41上午

    蛮好。
    尝试加了个闭锁,如下
    [code lang=”java”]
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;

    public class TestConcrrent2 {

    final static int MAX_QPS = 10;
    final static int MAX_TASK_NUM = 100;
    final static int MAX_THREAD_NUM = 100;

    final static Semaphore semaphore = new Semaphore(MAX_QPS);
    final static CountDownLatch overLatch = new CountDownLatch(MAX_TASK_NUM);

    public static void main(String… args) throws Exception {
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    semaphore.release(MAX_QPS / 2);
    }
    }, 1000, 500, TimeUnit.MILLISECONDS);

    // lots of concurrent calls:100 * 1000
    ExecutorService pool = Executors.newFixedThreadPool(MAX_THREAD_NUM);
    for (int i = MAX_TASK_NUM; i &gt; 0; i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    for (int j = 5; j &gt; 0; j–) {
    semaphore.acquireUninterruptibly(1);
    remoteCall(x, j);
    }
    overLatch.countDown();
    }
    });
    }
    pool.shutdown();
    // pool.awaitTermination(1, TimeUnit.HOURS);
    overLatch.await();
    System.out.println("DONE");

    }

    private static void remoteCall(int i, int j) {
    System.out.println(String.format("%s – %s: %d %d", new Date(),Thread.currentThread(), i, j));
    }

    }
    [/code]

  2. 稍微优化了下,但无法做到非常精确。
    public class FlowConcurrentController {

    final static int MAX_QPS = 10;

    final static Semaphore semaphore = new Semaphore(MAX_QPS);

    final static AtomicInteger accessCount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {

    // release semaphore thread
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    semaphore.release(accessCount.get());
    accessCount.set(0);
    }
    }, 1000, 1000, TimeUnit.MILLISECONDS);

    // lots of concurrent calls: 100 * 1000
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int i=100;i>0;i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    for (int j=1000;j>0;j–) {
    try {
    Thread.sleep(5);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    semaphore.acquireUninterruptibly(1);
    accessCount.incrementAndGet();

    remoteCall(x, j);
    }
    }
    });
    }

    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.HOURS);
    System.out.println(“DONE”);
    }

    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: %d %d”,new Date(),
    Thread.currentThread(), i, j));
    }
    }

  3. 如大家所说,给出的例子有个小问题;
    ‘杰iter’给出的方案比较漂亮了,不过对于当前没获得许可的请求线程,他采取的是在一个while循环里忙等,这可能不太符合需求;
    ‘淘宝天水’的方案,如果仔细分析一下,其实还是会出现例子中类似的问题,只不过是许可数越来越小,semaphore.release(accessCount.get());accessCount.set(0);这两条语句中间,如果accessCount增大了,那么semaphore的最大值将会减小,因为下一次释放的许可个数将小于请求的许可个数。如果要解决此处的问题,应该改成semaphore.release(accessCount.getAndSet(0));
    如果想让未获取到许可的线程暂阻塞起来,实现一个AbstractQueuedSynchronizer会是一个很漂亮的解决方案,用state来表示当前剩余许可数,重写tryAcquireShared方法,如果state已经小于等于0,则返回-1,如果用compareAndSetState方法对state减1执行成功,则返回1(此处应循环尝试),调用远程方法前,先调用该方法,另外,定时调用setState(MAX_QPS)来重置许可数
    PS:因为目前没有开发环境,没能一一验证,若分析得不对,欢迎指正。

  4. 上面的回复有个小错误,调用远程方法前,不是调用tryAcquireShared,而是调用acquireShared

  5. 有个细节没交待,setState并不会唤醒阻塞者,所以还需要借用releaseShared来完成唤醒,重写tryReleaseShared方法,返回true,调用setState之后再调用releaseShared,也可直接将setState写在tryReleaseShared方法里,直接调用releaseShared就可以了。

    AbstractQueuedSynchronizer是个很强大的工具,同步包里的大多数同步工具都是用它来处理同步问题的。

    PS:话说,怎么没人交流?

    • bobo
    • 2014/05/04 3:27下午

    一粟的例子里面,将那个重置信号量的线程池,改成1000ms允许一次,里面释放的代码
    int available = semaphore.availablePermits();
    //只释放用掉的许可证数量
    semaphore.release(MAX_QPS-available);
    是不是就可以了。

    另外有个疑问:
    多线程调度下,能精确保证那个重置信号量的线程,每秒钟都得到运行吗?

      • zhonglin
      • 2014/10/16 12:18下午

      Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

      @Override
      public void run() {
      // /**
      // * 这里存在问题,即如果每秒访问量没有超过设定数的话,那么信号量的许可数会累积
      // */
      // semaphore.release(MAX_QPS / 2);
      int available = semaphore.availablePermits();
      //只释放用掉的许可证数量
      semaphore.release(MAX_QPS-available);
      }

      }, 1000, 1000, TimeUnit.MILLISECONDS);
      把计时的这个也改下,要不然就成没半秒允许10个了

    • 冬日阳光
    • 2015/08/18 10:37上午

    我也写了一个例子,不过没有采用锁来实现,只使用了一个原子类AtomicInteger来计数,不过在业务上是每秒调用超过10次会采用拒绝的方式来处理 。代码如下:

    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * 实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用
    *
    * @version : Ver 1.0
    * @author : wenxing
    * @date : 2015年8月18日 上午10:31:47
    */
    public class RestrictedCallTest {
    /**
    * 每秒QPS限制
    */
    final static int MAX_QPS = 10;
    /**
    * 调用计数
    */
    final static AtomicInteger CALL_SIZE = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

    // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
    ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
    scheduledService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    CALL_SIZE.getAndSet(0);
    }

    }, 1000, 1000, TimeUnit.MILLISECONDS);

    // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的会被拒绝
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int size = 0; size 0; i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    // 判断每秒的调用次数,在实际业务当中,不外就是一个拦截来实现
    if (getCallSize() >= MAX_QPS) {
    System.out.println(String.format(“每秒最多调用%s次该方法,第%d轮拒绝”, MAX_QPS, j));
    } else {
    incCall();
    // 调用业务方法
    remoteCall(x, j);
    }
    }

    });

    }
    // 休息一秒
    TimeUnit.MILLISECONDS.sleep(1000);
    }
    pool.shutdown();
    scheduledService.shutdown();
    }

    /**
    * 业务方法
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @param i
    * @param j
    */
    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
    }

    /**
    * 增加方法调用次数
    * 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储,不过采用redis集中存储,限制的调用次数,可能会超出调用次数最大限制
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @return 返回最新的调用次数
    */
    private static int incCall() {
    return CALL_SIZE.incrementAndGet();
    }

    /**
    * 获取方法的调用次数 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储
    *
    * @author :wenxing 2015年8月18日 上午10:30:01
    * @return 返回当前的调用次数
    */
    private static int getCallSize() {
    return CALL_SIZE.get();
    }
    }

    • 冬日阳光
    • 2015/08/18 11:03上午

    继续写了一个例子,控制业务方法每秒最多MAX_QPS = 10,超过的访问请求等待,下面的代码可以看到,每秒最多只有10次的业务方法调用。
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    /**
    * TODO wenxing 类描述.
    *
    * @version : Ver 1.0
    * @author : wenxing
    * @date : 2015年8月18日 上午10:54:09
    */
    public class SemaphoreTest {

    /**
    * 每秒QPS限制
    */
    final static int MAX_QPS = 10;
    /**
    * 调用计数
    */
    final static Semaphore CALL_SIZE = new Semaphore(MAX_QPS);

    public static void main(String[] args) throws InterruptedException {

    // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
    ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);

    scheduledService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    //释放最多MAX_QPS个许可
    CALL_SIZE.release(MAX_QPS-CALL_SIZE.availablePermits());
    }

    }, 1000, 1000, TimeUnit.MILLISECONDS);//执行间隔为1秒,初次延时1秒执行

    // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的等待下一秒的许可发放
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int size = 0; size 0; i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    // 调用业务方法
    try {
    //获取许可,最久等待2秒,可以避免等待线程过多
    //CALL_SIZE.tryAcquire(2000, TimeUnit.MILLISECONDS);
    CALL_SIZE.acquire();
    remoteCall(x, j);
    } catch (InterruptedException e) {
    System.out.println(e.getMessage());
    }

    }

    });

    }
    // 休息一秒
    TimeUnit.MILLISECONDS.sleep(1000);
    }
    pool.shutdown();
    //scheduledService.shutdown();
    }

    /**
    * 业务方法
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @param i
    * @param j
    */
    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
    }

    }

    • 当猪真快活
    • 2016/07/12 12:21下午

    如果将semaphore.release(MAX_QPS / 2);替换成semaphore.release(MAXQPS – semaphore.availablePermits());会有什么问题吗?

return top