并发实战题(一)

作者:一粟

实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用,需要一个轻量简洁的实现,大家看看下面的一个实现,然后可以自己写一个实现。

import java.util.Date;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

public class Test {

    final static int MAX_QPS = 10;

    final static Semaphore semaphore = new Semaphore(MAX_QPS);

    public static void main (String ... args) throws Exception {

        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

            @Override

            public void run() {

                semaphore.release(MAX_QPS/2);

            }

        }, 1000, 500, TimeUnit.MILLISECONDS);

        //lots of concurrent calls:100 * 1000
        ExecutorService pool = Executors.newFixedThreadPool(100);

        for (int i=100;i>0;i--) {

            final int x = i;

            pool.submit(new Runnable() {

                @Override

                public void run() {

                    for (int j=1000;j>0;j--) {

                        semaphore.acquireUninterruptibly(1);
                        remoteCall(x, j);

                    }

                }

            });

        }

        pool.shutdown();

        pool.awaitTermination(1, TimeUnit.HOURS);

        System.out.println("DONE");
    }

    private static void remoteCall(int i, int j) {
        System.out.println(String.format("%s - %s: %d %d",new Date(),
            Thread.currentThread(), i, j));
    }

}

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 并发实战题(一)

hugozhu

花名一粟,淘宝资深架构师。

Latest posts by hugozhu (see all)

FavoriteLoading添加本文到我的收藏
  • Trackback are closed
  • Comments (27)
  1. 这个更实际的例子甚好,值得思考是否有更好的实现。

      • Yole
      • 03/07. 2013 10:46am

      同意,有实例代码效果更好,无论例子简单复杂

    • 这个代码中,如果有一秒的请求很少或请求数根本就是0,那么下一秒的许可就可能因为定时release导致许可超过MAX_QPS个,下一秒的请求数就可以超过MAX_QPS次。

        • 杰iter
        • 03/09. 2013 10:47am

        不会把,在Semaphore初始化的时候许可的数量就已经固定了

        • 这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可。看API描述可知,这个值甚至可以是负数。

          • 对的,结果测试会出现大于max的调用次数。

    • Snway
    • 03/07. 2013 10:26am

    不错的例子,学习了!

    • Anonymous
    • 03/07. 2013 1:49pm

    why use MAX_QPS/2 instead of MAX_QPS?

    • nicky
    • 03/07. 2013 9:49pm

    semaphore.acquireUninterruptibly(1);这个是代表的将信号量-1么?

    • Snway
    • 03/08. 2013 10:18am

    前辈,有其他的示例吗?

    • 杰iter
    • 03/09. 2013 11:01am

    各位看这个是否可行?

     private static int MAX_EXE_COUNT = 10;
    
        private static AtomicInteger count = new AtomicInteger(MAX_EXE_COUNT);
    
        public static void main(String[] args) throws InterruptedException,
                IOException
        {
            new Thread()
            {
                @Override
                public void run()
                {
                    while (true)
                    {
                        try
                        {
                            Thread.sleep(1000);
                        }
                        catch (InterruptedException e)
                        {
                        }
                        System.out.println("1秒过去了");
                        count.set(MAX_EXE_COUNT);
                    }
                }
            }.start();
            Executor e = Executors.newFixedThreadPool(100);
            for (int i = 0; i < 100; i++)
            {
                e.execute(new Runnable()
                {
                    public void run()
                    {
                        while (count.getAndDecrement() <= 0)
                        {
                        }
                        System.out.println(Thread.currentThread().getName()
                                + "我调用了一次");
                    }
                });
            }
        }
    
    • Snway
    • 03/10. 2013 10:38pm

    丁 一 :
    这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可。看API描述可知,这个值甚至可以是负数。

    前辈,那该示例程序该如何调整一下,确保线程安全。

    • yl.w
    • 03/24. 2013 11:41am

    蛮好。
    尝试加了个闭锁,如下

    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class TestConcrrent2 {
    
    	final static int MAX_QPS = 10;
    	final static int MAX_TASK_NUM = 100;
    	final static int MAX_THREAD_NUM = 100;
    
    	final static Semaphore semaphore = new Semaphore(MAX_QPS);	
    	final static CountDownLatch overLatch = new CountDownLatch(MAX_TASK_NUM);
    
    	public static void main(String... args) throws Exception {
    		Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    			@Override
    			public void run() {
    				semaphore.release(MAX_QPS / 2);
    			}
    		}, 1000, 500, TimeUnit.MILLISECONDS);
    
    		// lots of concurrent calls:100 * 1000
    		ExecutorService pool = Executors.newFixedThreadPool(MAX_THREAD_NUM);
    		for (int i = MAX_TASK_NUM; i &gt; 0; i--) {
    			final int x = i;
    			pool.submit(new Runnable() {
    				@Override
    				public void run() {
    					for (int j = 5; j &gt; 0; j--) {
    						semaphore.acquireUninterruptibly(1);
    						remoteCall(x, j);
    					}
    					overLatch.countDown();
    				}
    			});
    		}
    		pool.shutdown();		
    //		pool.awaitTermination(1, TimeUnit.HOURS);
    		overLatch.await();
    		System.out.println("DONE");
    
    	}
    
    	private static void remoteCall(int i, int j) {
    		System.out.println(String.format("%s - %s: %d %d", new Date(),Thread.currentThread(), i, j));
    	}
    
    }
    
  2. 稍微优化了下,但无法做到非常精确。
    public class FlowConcurrentController {

    final static int MAX_QPS = 10;

    final static Semaphore semaphore = new Semaphore(MAX_QPS);

    final static AtomicInteger accessCount = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {

    // release semaphore thread
    Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    semaphore.release(accessCount.get());
    accessCount.set(0);
    }
    }, 1000, 1000, TimeUnit.MILLISECONDS);

    // lots of concurrent calls: 100 * 1000
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int i=100;i>0;i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    for (int j=1000;j>0;j–) {
    try {
    Thread.sleep(5);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    semaphore.acquireUninterruptibly(1);
    accessCount.incrementAndGet();

    remoteCall(x, j);
    }
    }
    });
    }

    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.HOURS);
    System.out.println(“DONE”);
    }

    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: %d %d”,new Date(),
    Thread.currentThread(), i, j));
    }
    }

  3. 如大家所说,给出的例子有个小问题;
    ‘杰iter’给出的方案比较漂亮了,不过对于当前没获得许可的请求线程,他采取的是在一个while循环里忙等,这可能不太符合需求;
    ‘淘宝天水’的方案,如果仔细分析一下,其实还是会出现例子中类似的问题,只不过是许可数越来越小,semaphore.release(accessCount.get());accessCount.set(0);这两条语句中间,如果accessCount增大了,那么semaphore的最大值将会减小,因为下一次释放的许可个数将小于请求的许可个数。如果要解决此处的问题,应该改成semaphore.release(accessCount.getAndSet(0));
    如果想让未获取到许可的线程暂阻塞起来,实现一个AbstractQueuedSynchronizer会是一个很漂亮的解决方案,用state来表示当前剩余许可数,重写tryAcquireShared方法,如果state已经小于等于0,则返回-1,如果用compareAndSetState方法对state减1执行成功,则返回1(此处应循环尝试),调用远程方法前,先调用该方法,另外,定时调用setState(MAX_QPS)来重置许可数
    PS:因为目前没有开发环境,没能一一验证,若分析得不对,欢迎指正。

  4. 上面的回复有个小错误,调用远程方法前,不是调用tryAcquireShared,而是调用acquireShared

  5. 有个细节没交待,setState并不会唤醒阻塞者,所以还需要借用releaseShared来完成唤醒,重写tryReleaseShared方法,返回true,调用setState之后再调用releaseShared,也可直接将setState写在tryReleaseShared方法里,直接调用releaseShared就可以了。

    AbstractQueuedSynchronizer是个很强大的工具,同步包里的大多数同步工具都是用它来处理同步问题的。

    PS:话说,怎么没人交流?

    • bobo
    • 05/04. 2014 3:27pm

    一粟的例子里面,将那个重置信号量的线程池,改成1000ms允许一次,里面释放的代码
    int available = semaphore.availablePermits();
    //只释放用掉的许可证数量
    semaphore.release(MAX_QPS-available);
    是不是就可以了。

    另外有个疑问:
    多线程调度下,能精确保证那个重置信号量的线程,每秒钟都得到运行吗?

      • zhonglin
      • 10/16. 2014 12:18pm

      Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {

      @Override
      public void run() {
      // /**
      // * 这里存在问题,即如果每秒访问量没有超过设定数的话,那么信号量的许可数会累积
      // */
      // semaphore.release(MAX_QPS / 2);
      int available = semaphore.availablePermits();
      //只释放用掉的许可证数量
      semaphore.release(MAX_QPS-available);
      }

      }, 1000, 1000, TimeUnit.MILLISECONDS);
      把计时的这个也改下,要不然就成没半秒允许10个了

    • 冬日阳光
    • 08/18. 2015 10:37am

    我也写了一个例子,不过没有采用锁来实现,只使用了一个原子类AtomicInteger来计数,不过在业务上是每秒调用超过10次会采用拒绝的方式来处理 。代码如下:

    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;

    /**
    * 实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用
    *
    * @version : Ver 1.0
    * @author : wenxing
    * @date : 2015年8月18日 上午10:31:47
    */
    public class RestrictedCallTest {
    /**
    * 每秒QPS限制
    */
    final static int MAX_QPS = 10;
    /**
    * 调用计数
    */
    final static AtomicInteger CALL_SIZE = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {

    // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
    ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
    scheduledService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    CALL_SIZE.getAndSet(0);
    }

    }, 1000, 1000, TimeUnit.MILLISECONDS);

    // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的会被拒绝
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int size = 0; size 0; i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    // 判断每秒的调用次数,在实际业务当中,不外就是一个拦截来实现
    if (getCallSize() >= MAX_QPS) {
    System.out.println(String.format(“每秒最多调用%s次该方法,第%d轮拒绝”, MAX_QPS, j));
    } else {
    incCall();
    // 调用业务方法
    remoteCall(x, j);
    }
    }

    });

    }
    // 休息一秒
    TimeUnit.MILLISECONDS.sleep(1000);
    }
    pool.shutdown();
    scheduledService.shutdown();
    }

    /**
    * 业务方法
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @param i
    * @param j
    */
    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
    }

    /**
    * 增加方法调用次数
    * 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储,不过采用redis集中存储,限制的调用次数,可能会超出调用次数最大限制
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @return 返回最新的调用次数
    */
    private static int incCall() {
    return CALL_SIZE.incrementAndGet();
    }

    /**
    * 获取方法的调用次数 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储
    *
    * @author :wenxing 2015年8月18日 上午10:30:01
    * @return 返回当前的调用次数
    */
    private static int getCallSize() {
    return CALL_SIZE.get();
    }
    }

    • 冬日阳光
    • 08/18. 2015 11:03am

    继续写了一个例子,控制业务方法每秒最多MAX_QPS = 10,超过的访问请求等待,下面的代码可以看到,每秒最多只有10次的业务方法调用。
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    /**
    * TODO wenxing 类描述.
    *
    * @version : Ver 1.0
    * @author : wenxing
    * @date : 2015年8月18日 上午10:54:09
    */
    public class SemaphoreTest {

    /**
    * 每秒QPS限制
    */
    final static int MAX_QPS = 10;
    /**
    * 调用计数
    */
    final static Semaphore CALL_SIZE = new Semaphore(MAX_QPS);

    public static void main(String[] args) throws InterruptedException {

    // 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
    ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);

    scheduledService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
    //释放最多MAX_QPS个许可
    CALL_SIZE.release(MAX_QPS-CALL_SIZE.availablePermits());
    }

    }, 1000, 1000, TimeUnit.MILLISECONDS);//执行间隔为1秒,初次延时1秒执行

    // 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的等待下一秒的许可发放
    ExecutorService pool = Executors.newFixedThreadPool(100);
    for (int size = 0; size 0; i–) {
    final int x = i;
    pool.submit(new Runnable() {
    @Override
    public void run() {
    // 调用业务方法
    try {
    //获取许可,最久等待2秒,可以避免等待线程过多
    //CALL_SIZE.tryAcquire(2000, TimeUnit.MILLISECONDS);
    CALL_SIZE.acquire();
    remoteCall(x, j);
    } catch (InterruptedException e) {
    System.out.println(e.getMessage());
    }

    }

    });

    }
    // 休息一秒
    TimeUnit.MILLISECONDS.sleep(1000);
    }
    pool.shutdown();
    //scheduledService.shutdown();
    }

    /**
    * 业务方法
    *
    * @author : wenxing 2015年8月18日 上午10:28:39
    * @param i
    * @param j
    */
    private static void remoteCall(int i, int j) {
    System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
    }

    }

    • 当猪真快活
    • 07/12. 2016 12:21pm

    如果将semaphore.release(MAX_QPS / 2);替换成semaphore.release(MAXQPS – semaphore.availablePermits());会有什么问题吗?

You must be logged in to post a comment.

return top