001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import org.apache.camel.Exchange;
020 import org.apache.camel.Processor;
021
022 /**
023 * A <a href="http://activemq.apache.org/camel/throttler.html">Throttler</a>
024 * will set a limit on the maximum number of message exchanges which can be sent
025 * to a processor within a specific time period. <p/> This pattern can be
026 * extremely useful if you have some external system which meters access; such
027 * as only allowing 100 requests per second; or if huge load can cause a
028 * particular systme to malfunction or to reduce its throughput you might want
029 * to introduce some throttling.
030 *
031 * @version $Revision: 630591 $
032 */
033 public class Throttler extends DelayProcessorSupport {
034 private long maximumRequestsPerPeriod;
035 private long timePeriodMillis;
036 private long startTimeMillis;
037 private long requestCount;
038
039 public Throttler(Processor processor, long maximumRequestsPerPeriod) {
040 this(processor, maximumRequestsPerPeriod, 1000);
041 }
042
043 public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
044 super(processor);
045 this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
046 this.timePeriodMillis = timePeriodMillis;
047 }
048
049 @Override
050 public String toString() {
051 return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: "
052 + getProcessor() + "]";
053 }
054
055 // Properties
056 // -----------------------------------------------------------------------
057 public long getMaximumRequestsPerPeriod() {
058 return maximumRequestsPerPeriod;
059 }
060
061 /**
062 * Sets the maximum number of requests per time period
063 */
064 public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
065 this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
066 }
067
068 public long getTimePeriodMillis() {
069 return timePeriodMillis;
070 }
071
072 /**
073 * Sets the time period during which the maximum number of requests apply
074 */
075 public void setTimePeriodMillis(long timePeriodMillis) {
076 this.timePeriodMillis = timePeriodMillis;
077 }
078
079 /**
080 * The number of requests which have taken place so far within this time
081 * period
082 */
083 public long getRequestCount() {
084 return requestCount;
085 }
086
087 /**
088 * The start time when this current period began
089 */
090 public long getStartTimeMillis() {
091 return startTimeMillis;
092 }
093
094 // Implementation methods
095 // -----------------------------------------------------------------------
096 protected void delay(Exchange exchange) throws Exception {
097 long now = currentSystemTime();
098 if (startTimeMillis == 0) {
099 startTimeMillis = now;
100 }
101 if (now - startTimeMillis > timePeriodMillis) {
102 // we're at the start of a new time period
103 // so lets reset things
104 requestCount = 1;
105 startTimeMillis = now;
106 } else {
107 if (++requestCount > maximumRequestsPerPeriod) {
108 // lets sleep until the start of the next time period
109 long time = startTimeMillis + timePeriodMillis;
110 waitUntil(time, exchange);
111 }
112 }
113 }
114 }