Java JUC (java.util.concurrent) 1 - AQS:AbstractQueuedSynchronizer

AbstractQueuedSynchronizer lays the foundation of java JUC programming. Concurrency classes are extensively using Sync class which extends from AQS as shown below.

Some example code snippets of CountDownLatch, CyclicBarrier, Semaphore, ForkJoinTask, and FutureTask are explained here.

CountDownLatch

public static void main(String[] args) throws Exception {
		ExecutorService exec = Executors.newCachedThreadPool();
		final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
		for (int i = 0; i < threadCount; i++) {
				final int threadNum = i;
				exec.execute(() -> {
						try {
								test(threadNum);
						} catch (Exception e) {
								log.error("exception", e);
						} finally {
								countDownLatch.countDown();
						}
				});
		}
		countDownLatch.await(10, TimeUnit.MILLISECONDS);
		log.info("finish");
		exec.shutdown();
}

private static void test(int threadNum) throws Exception {
		Thread.sleep(100);
		log.info("{}", threadNum);
}

CyclicBarrier

private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {
		ExecutorService executor = Executors.newCachedThreadPool();
		for (int i = 0; i < 10; i++) {
				final int threadNum = i;
				Thread.sleep(1000);
				executor.execute(() -> {
						try {
								race(threadNum);
						} catch (Exception e) {
								log.error("exception", e);
						}
				});
		}
		executor.shutdown();
}

private static void race(int threadNum) throws Exception {
		Thread.sleep(1000);
		log.info("{} is ready", threadNum);
		barrier.await();
		log.info("{} continue", threadNum);
}

Semaphore

ExecutorService exec = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(3);

for (int i = 0; i < threadCount; i++) {
		final int threadNum = i;
		exec.execute(() -> {
				try {
						if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
								test(threadNum);
								semaphore.release(); // 释放一个许可
						}
				} catch (Exception e) {
						log.error("exception", e);
				}
		});
}
exec.shutdown();
}

private static void test(int threadNum) throws Exception {
log.info("{}", threadNum);
Thread.sleep(1000);
}

ForkJoinTask

public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;

    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

Future

static class MyCallable implements Callable<String> {

        @Override
        public String call() throws Exception {
            log.info("do something in callable");
            Thread.sleep(5000);
            return "Done";
        }
    }

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(new MyCallable());
        log.info("do something in main");
        Thread.sleep(1000);
        String result = future.get();
        log.info("result:{}", result);
    }

FutureTask

public static void main(String[] args) throws Exception {
		FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
				@Override
				public String call() throws Exception {
						log.info("do something in callable");
						Thread.sleep(5000);
						return "Done";
				}
		});

		new Thread(futureTask).start();
		log.info("do something in main");
		Thread.sleep(1000);
		String result = futureTask.get();
		log.info("result:{}", result);
}