Java JUC (java.util.concurrent) 2 - Atomic Variables and Concurrent Collection

Atomic varaibles are important parts of java concurrency API. It allows multiple threads safely access the same variables. The java.util.concurrent.atomic classes heavily used compare-and-swap (CAS) technique. CAS happens at CPU instruction level and is much more light weighted than synchronizing through locks or synchronized key word.

AtomicInteger, AtomicLong, LongAdder, AtomicBoolean, and AtomicReference are some of the most used Atomic classes.

AtomicInteger

// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static AtomicInteger count = new AtomicInteger(0);

public static void main(String[] args) throws Exception {
		ExecutorService executorService = Executors.newCachedThreadPool();
		final Semaphore semaphore = new Semaphore(threadTotal);
		final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
		for (int i = 0; i < clientTotal ; i++) {
				executorService.execute(() -> {
						try {
								semaphore.acquire();
								add();
								semaphore.release();
						} catch (Exception e) {
								log.error("exception", e);
						}
						countDownLatch.countDown();
				});
		}
		countDownLatch.await();
		executorService.shutdown();
		log.info("count:{}", count.get());
}

private static void add() {
		count.incrementAndGet();
		// count.getAndIncrement();
}

Another way to acheive correct counting result like above is synchronized keyword.

// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
public static int count = 0;

public static void main(String[] args) throws Exception {
		ExecutorService executorService = Executors.newCachedThreadPool();
		final Semaphore semaphore = new Semaphore(threadTotal);
		final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
		for (int i = 0; i < clientTotal ; i++) {
				executorService.execute(() -> {
						try {
								semaphore.acquire();
								add();
								semaphore.release();
						} catch (Exception e) {
								log.error("exception", e);
						}
						countDownLatch.countDown();
				});
		}
		countDownLatch.await();
		executorService.shutdown();
		log.info("count:{}", count);
}

private synchronized static void add() {
		count++;
}

However, volatile keyword as in public static volatile int count = 0; will not be able to ensure threadsafety to due CPU instruction reorder.

ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, CopyOnWriteArraySet are some typical concurrency collection classes in Java 8.

CopyOnWriteArrayList

// 请求总数
public static int clientTotal = 5000;
// 同时并发执行的线程数
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();

public static void main(String[] args) throws Exception {
		ExecutorService executorService = Executors.newCachedThreadPool();
		final Semaphore semaphore = new Semaphore(threadTotal);
		final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
		for (int i = 0; i < clientTotal; i++) {
				final int count = i;
				executorService.execute(() -> {
						try {
								semaphore.acquire();
								update(count);
								semaphore.release();
						} catch (Exception e) {
								log.error("exception", e);
						}
						countDownLatch.countDown();
				});
		}
		countDownLatch.await();
		executorService.shutdown();
		log.info("size:{}", list.size());
}

private static void update(int i) {
		list.add(i);
}