001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.util.concurrent;
018
019 import java.util.concurrent.CountDownLatch;
020 import java.util.concurrent.TimeUnit;
021 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
022
023 /**
024 * An alternative to a {@link CountDownLatch} -- this implementation also supports incrementing
025 * the latch count while counting down. It can also be used to count up to 0 from a negative integer.
026 */
027 public class CountingLatch {
028
029 @SuppressWarnings("serial")
030 private final class Sync extends AbstractQueuedSynchronizer {
031
032 private Sync() {
033 super();
034 }
035
036 int getCount() {
037 return getState();
038 }
039
040 public int tryAcquireShared(int acquires) {
041 return getState() == 0 ? 1 : -1;
042 }
043
044 public boolean tryReleaseShared(int delta) {
045 // Decrement count; signal when transition to zero
046 for (;;) {
047 int c = getState();
048 int nextc = c + delta;
049 if (compareAndSetState(c, nextc)) {
050 return nextc == 0;
051 }
052 }
053 }
054 }
055
056 private final Sync sync;
057
058 /**
059 * Create a new counting latch (starting count is 0)
060 */
061 public CountingLatch() {
062 super();
063 this.sync = new Sync();
064 }
065
066 /**
067 * Get the current count
068 */
069 public int getCount() {
070 return sync.getCount();
071 }
072
073 /**
074 * Increment the count with 1
075 */
076 public void increment() {
077 sync.releaseShared(+1);
078 }
079
080 /**
081 * Decrement the count with 1
082 */
083 public void decrement() {
084 sync.releaseShared(-1);
085 }
086
087 /**
088 * Await the latch reaching the count of 0
089 *
090 * @throws InterruptedException if the threads gets interrupted while waiting
091 */
092 public void await() throws InterruptedException {
093 sync.acquireSharedInterruptibly(1);
094 }
095
096 /**
097 * Wait for a given timeout while checking if the latch reached the count of 0
098 *
099 * @param timeout the value of the timeout
100 * @param unit the unit in which the timeout is expressed
101 * @return <code>true</code> if the latch has reached the count of 0 in the given time
102 * @throws InterruptedException if the thread gets interrupted while waiting
103 */
104 public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
105 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
106 }
107
108 }