并发实战题(一)
作者:一粟
实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用,需要一个轻量简洁的实现,大家看看下面的一个实现,然后可以自己写一个实现。
[code lang=”java”]
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class Test {
final static int MAX_QPS = 10;
final static Semaphore semaphore = new Semaphore(MAX_QPS);
public static void main (String … args) throws Exception {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
semaphore.release(MAX_QPS/2);
}
}, 1000, 500, TimeUnit.MILLISECONDS);
//lots of concurrent calls:100 * 1000
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i=100;i>0;i–) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
for (int j=1000;j>0;j–) {
semaphore.acquireUninterruptibly(1);
remoteCall(x, j);
}
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
System.out.println("DONE");
}
private static void remoteCall(int i, int j) {
System.out.println(String.format("%s – %s: %d %d",new Date(),
Thread.currentThread(), i, j));
}
}
[/code]
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 并发实战题(一)
这个更实际的例子甚好,值得思考是否有更好的实现。
同意,有实例代码效果更好,无论例子简单复杂
这个代码中,如果有一秒的请求很少或请求数根本就是0,那么下一秒的许可就可能因为定时release导致许可超过MAX_QPS个,下一秒的请求数就可以超过MAX_QPS次。
不会把,在Semaphore初始化的时候许可的数量就已经固定了
这只是初始的一个许可,并没有说是最大限,不断的release,就可以不断的累积许可。看API描述可知,这个值甚至可以是负数。
对的,结果测试会出现大于max的调用次数。
是的,有不断堆积许可的问题,导致后续周期有可能超过设定的最大请求数,可以通过semaphore.drainPermits(); 先进行重置再release颁发许可, 重整了下例子,可以通过注释掉重置代码来重现上述问题。
/**
* 每秒最大请求数10,发起100个请求.
*/
public class Test {
final static int MAX_QPS = 10;
final static Semaphore semaphore = new Semaphore(0);
final static AtomicInteger UNIT_CALL_COUNT = new AtomicInteger(0);
final static AtomicInteger TOTAL_TASK_COUNT = new AtomicInteger(0);
public static void main(String… args) throws Exception {
ScheduledExecutorService poolSchedule = Executors.newScheduledThreadPool(1);
poolSchedule.scheduleAtFixedRate(new Runnable() {
public void run() {
semaphore.drainPermits();//重置许可为0
semaphore.release(MAX_QPS);
System.out.println(“*****UNIT_CALL_COUNT=” + UNIT_CALL_COUNT.getAndSet(0));
}
}, 0, 1000, TimeUnit.MILLISECONDS);
long startTime = System.currentTimeMillis();
ExecutorService pool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 100; i++) {
if (i == 5) {
Thread.sleep(1000); //模拟请求量不饱和的周期
}
final int n = i;
pool.submit(new Runnable() {
public void run() {
semaphore.acquireUninterruptibly(1);
remoteCall(n);
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
System.out.println(String.format("Done: TOTAL_TASK_COUNT=%d TOTAL_TIME=%d", TOTAL_TASK_COUNT.get(), (System.currentTimeMillis() – startTime)));
poolSchedule.shutdown();
}
private static void remoteCall(int i) {
UNIT_CALL_COUNT.incrementAndGet();
TOTAL_TASK_COUNT.incrementAndGet();
}
}
不错的例子,学习了!
why use MAX_QPS/2 instead of MAX_QPS?
you can use it . the post just a demo。
MAX_QPS/2 因为period的值是500,如果改成1000,可以设成MAX_QPS。
semaphore.acquireUninterruptibly(1);这个是代表的将信号量-1么?
是的,就是从当前Semaphore可用的许可中减一。
前辈,有其他的示例吗?
可以尝试自己写一个,锻炼下 :)
各位看这个是否可行?
[code lang=”java”]
private static int MAX_EXE_COUNT = 10;
private static AtomicInteger count = new AtomicInteger(MAX_EXE_COUNT);
public static void main(String[] args) throws InterruptedException,
IOException
{
new Thread()
{
@Override
public void run()
{
while (true)
{
try
{
Thread.sleep(1000);
}
catch (InterruptedException e)
{
}
System.out.println("1秒过去了");
count.set(MAX_EXE_COUNT);
}
}
}.start();
Executor e = Executors.newFixedThreadPool(100);
for (int i = 0; i < 100; i++)
{
e.execute(new Runnable()
{
public void run()
{
while (count.getAndDecrement() <= 0)
{
}
System.out.println(Thread.currentThread().getName()
+ "我调用了一次");
}
});
}
}
[/code]
可以尝试用Semaphore来实现,然后比较下两者之间的性能。
前辈,那该示例程序该如何调整一下,确保线程安全。
蛮好。
尝试加了个闭锁,如下
[code lang=”java”]
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class TestConcrrent2 {
final static int MAX_QPS = 10;
final static int MAX_TASK_NUM = 100;
final static int MAX_THREAD_NUM = 100;
final static Semaphore semaphore = new Semaphore(MAX_QPS);
final static CountDownLatch overLatch = new CountDownLatch(MAX_TASK_NUM);
public static void main(String… args) throws Exception {
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
semaphore.release(MAX_QPS / 2);
}
}, 1000, 500, TimeUnit.MILLISECONDS);
// lots of concurrent calls:100 * 1000
ExecutorService pool = Executors.newFixedThreadPool(MAX_THREAD_NUM);
for (int i = MAX_TASK_NUM; i > 0; i–) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
for (int j = 5; j > 0; j–) {
semaphore.acquireUninterruptibly(1);
remoteCall(x, j);
}
overLatch.countDown();
}
});
}
pool.shutdown();
// pool.awaitTermination(1, TimeUnit.HOURS);
overLatch.await();
System.out.println("DONE");
}
private static void remoteCall(int i, int j) {
System.out.println(String.format("%s – %s: %d %d", new Date(),Thread.currentThread(), i, j));
}
}
[/code]
稍微优化了下,但无法做到非常精确。
public class FlowConcurrentController {
final static int MAX_QPS = 10;
final static Semaphore semaphore = new Semaphore(MAX_QPS);
final static AtomicInteger accessCount = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
// release semaphore thread
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
semaphore.release(accessCount.get());
accessCount.set(0);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
// lots of concurrent calls: 100 * 1000
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int i=100;i>0;i–) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
for (int j=1000;j>0;j–) {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.acquireUninterruptibly(1);
accessCount.incrementAndGet();
remoteCall(x, j);
}
}
});
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.HOURS);
System.out.println(“DONE”);
}
private static void remoteCall(int i, int j) {
System.out.println(String.format(“%s – %s: %d %d”,new Date(),
Thread.currentThread(), i, j));
}
}
如大家所说,给出的例子有个小问题;
‘杰iter’给出的方案比较漂亮了,不过对于当前没获得许可的请求线程,他采取的是在一个while循环里忙等,这可能不太符合需求;
‘淘宝天水’的方案,如果仔细分析一下,其实还是会出现例子中类似的问题,只不过是许可数越来越小,semaphore.release(accessCount.get());accessCount.set(0);这两条语句中间,如果accessCount增大了,那么semaphore的最大值将会减小,因为下一次释放的许可个数将小于请求的许可个数。如果要解决此处的问题,应该改成semaphore.release(accessCount.getAndSet(0));
如果想让未获取到许可的线程暂阻塞起来,实现一个AbstractQueuedSynchronizer会是一个很漂亮的解决方案,用state来表示当前剩余许可数,重写tryAcquireShared方法,如果state已经小于等于0,则返回-1,如果用compareAndSetState方法对state减1执行成功,则返回1(此处应循环尝试),调用远程方法前,先调用该方法,另外,定时调用setState(MAX_QPS)来重置许可数
PS:因为目前没有开发环境,没能一一验证,若分析得不对,欢迎指正。
上面的回复有个小错误,调用远程方法前,不是调用tryAcquireShared,而是调用acquireShared
有个细节没交待,setState并不会唤醒阻塞者,所以还需要借用releaseShared来完成唤醒,重写tryReleaseShared方法,返回true,调用setState之后再调用releaseShared,也可直接将setState写在tryReleaseShared方法里,直接调用releaseShared就可以了。
AbstractQueuedSynchronizer是个很强大的工具,同步包里的大多数同步工具都是用它来处理同步问题的。
PS:话说,怎么没人交流?
一粟的例子里面,将那个重置信号量的线程池,改成1000ms允许一次,里面释放的代码
int available = semaphore.availablePermits();
//只释放用掉的许可证数量
semaphore.release(MAX_QPS-available);
是不是就可以了。
另外有个疑问:
多线程调度下,能精确保证那个重置信号量的线程,每秒钟都得到运行吗?
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// /**
// * 这里存在问题,即如果每秒访问量没有超过设定数的话,那么信号量的许可数会累积
// */
// semaphore.release(MAX_QPS / 2);
int available = semaphore.availablePermits();
//只释放用掉的许可证数量
semaphore.release(MAX_QPS-available);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
把计时的这个也改下,要不然就成没半秒允许10个了
我也写了一个例子,不过没有采用锁来实现,只使用了一个原子类AtomicInteger来计数,不过在业务上是每秒调用超过10次会采用拒绝的方式来处理 。代码如下:
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 实现一个流控程序。控制客户端每秒调用某个远程服务不超过N次,客户端是会多线程并发调用
*
* @version : Ver 1.0
* @author : wenxing
* @date : 2015年8月18日 上午10:31:47
*/
public class RestrictedCallTest {
/**
* 每秒QPS限制
*/
final static int MAX_QPS = 10;
/**
* 调用计数
*/
final static AtomicInteger CALL_SIZE = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
// 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
scheduledService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
CALL_SIZE.getAndSet(0);
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
// 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的会被拒绝
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int size = 0; size 0; i–) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
// 判断每秒的调用次数,在实际业务当中,不外就是一个拦截来实现
if (getCallSize() >= MAX_QPS) {
System.out.println(String.format(“每秒最多调用%s次该方法,第%d轮拒绝”, MAX_QPS, j));
} else {
incCall();
// 调用业务方法
remoteCall(x, j);
}
}
});
}
// 休息一秒
TimeUnit.MILLISECONDS.sleep(1000);
}
pool.shutdown();
scheduledService.shutdown();
}
/**
* 业务方法
*
* @author : wenxing 2015年8月18日 上午10:28:39
* @param i
* @param j
*/
private static void remoteCall(int i, int j) {
System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
}
/**
* 增加方法调用次数
* 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储,不过采用redis集中存储,限制的调用次数,可能会超出调用次数最大限制
*
* @author : wenxing 2015年8月18日 上午10:28:39
* @return 返回最新的调用次数
*/
private static int incCall() {
return CALL_SIZE.incrementAndGet();
}
/**
* 获取方法的调用次数 如果是限制集群中的方法每秒调用次数,可以采用redis进行集中存储
*
* @author :wenxing 2015年8月18日 上午10:30:01
* @return 返回当前的调用次数
*/
private static int getCallSize() {
return CALL_SIZE.get();
}
}
继续写了一个例子,控制业务方法每秒最多MAX_QPS = 10,超过的访问请求等待,下面的代码可以看到,每秒最多只有10次的业务方法调用。
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* TODO wenxing 类描述.
*
* @version : Ver 1.0
* @author : wenxing
* @date : 2015年8月18日 上午10:54:09
*/
public class SemaphoreTest {
/**
* 每秒QPS限制
*/
final static int MAX_QPS = 10;
/**
* 调用计数
*/
final static Semaphore CALL_SIZE = new Semaphore(MAX_QPS);
public static void main(String[] args) throws InterruptedException {
// 如果是限制集群中的方法每秒调用次数,这种每秒定时重置的方法就需要独立到单独的应用中去,或者采用redis的过期机制
ScheduledExecutorService scheduledService = Executors.newScheduledThreadPool(1);
scheduledService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
//释放最多MAX_QPS个许可
CALL_SIZE.release(MAX_QPS-CALL_SIZE.availablePermits());
}
}, 1000, 1000, TimeUnit.MILLISECONDS);//执行间隔为1秒,初次延时1秒执行
// 模拟每1秒调用100次,连续模拟5次,但是只有10次可以正确调用,其它的等待下一秒的许可发放
ExecutorService pool = Executors.newFixedThreadPool(100);
for (int size = 0; size 0; i–) {
final int x = i;
pool.submit(new Runnable() {
@Override
public void run() {
// 调用业务方法
try {
//获取许可,最久等待2秒,可以避免等待线程过多
//CALL_SIZE.tryAcquire(2000, TimeUnit.MILLISECONDS);
CALL_SIZE.acquire();
remoteCall(x, j);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
});
}
// 休息一秒
TimeUnit.MILLISECONDS.sleep(1000);
}
pool.shutdown();
//scheduledService.shutdown();
}
/**
* 业务方法
*
* @author : wenxing 2015年8月18日 上午10:28:39
* @param i
* @param j
*/
private static void remoteCall(int i, int j) {
System.out.println(String.format(“%s – %s: 第%d轮调用,第%d次执行 “, new Date(), Thread.currentThread(), j, i));
}
}
如果将semaphore.release(MAX_QPS / 2);替换成semaphore.release(MAXQPS – semaphore.availablePermits());会有什么问题吗?