001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.camel.processor;
018
019 import org.apache.camel.Exchange;
020 import org.apache.camel.Processor;
021
022 /**
023 * A <a href="http://camel.apache.org/throttler.html">Throttler</a>
024 * will set a limit on the maximum number of message exchanges which can be sent
025 * to a processor within a specific time period. <p/> This pattern can be
026 * extremely useful if you have some external system which meters access; such
027 * as only allowing 100 requests per second; or if huge load can cause a
028 * particular systme to malfunction or to reduce its throughput you might want
029 * to introduce some throttling.
030 *
031 * @version $Revision: 788621 $
032 */
033 public class Throttler extends DelayProcessorSupport implements Traceable {
034 private long maximumRequestsPerPeriod;
035 private long timePeriodMillis;
036 private TimeSlot slot;
037
038 public Throttler(Processor processor, long maximumRequestsPerPeriod) {
039 this(processor, maximumRequestsPerPeriod, 1000);
040 }
041
042 public Throttler(Processor processor, long maximumRequestsPerPeriod, long timePeriodMillis) {
043 super(processor);
044 this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
045 this.timePeriodMillis = timePeriodMillis;
046 }
047
048 @Override
049 public String toString() {
050 return "Throttler[requests: " + maximumRequestsPerPeriod + " per: " + timePeriodMillis + " (ms) to: "
051 + getProcessor() + "]";
052 }
053
054 public String getTraceLabel() {
055 return "Throttle[" + maximumRequestsPerPeriod + " per: " + timePeriodMillis + "]";
056 }
057
058 // Properties
059 // -----------------------------------------------------------------------
060 public long getMaximumRequestsPerPeriod() {
061 return maximumRequestsPerPeriod;
062 }
063
064 /**
065 * Sets the maximum number of requests per time period
066 */
067 public void setMaximumRequestsPerPeriod(long maximumRequestsPerPeriod) {
068 this.maximumRequestsPerPeriod = maximumRequestsPerPeriod;
069 }
070
071 public long getTimePeriodMillis() {
072 return timePeriodMillis;
073 }
074
075 /**
076 * Sets the time period during which the maximum number of requests apply
077 */
078 public void setTimePeriodMillis(long timePeriodMillis) {
079 this.timePeriodMillis = timePeriodMillis;
080 }
081
082 // Implementation methods
083 // -----------------------------------------------------------------------
084 protected void delay(Exchange exchange) throws Exception {
085 TimeSlot slot = nextSlot();
086 if (!slot.isActive()) {
087 waitUntil(slot.startTime, exchange);
088 }
089 }
090
091 /*
092 * Determine what the next available time slot is for handling an Exchange
093 */
094 protected synchronized TimeSlot nextSlot() {
095 if (slot == null) {
096 slot = new TimeSlot();
097 }
098 if (slot.isFull()) {
099 slot = slot.next();
100 }
101 slot.assign();
102 return slot;
103 }
104
105 /*
106 * A time slot is capable of handling a number of exchanges within a certain period of time.
107 */
108 protected class TimeSlot {
109
110 private long capacity = Throttler.this.maximumRequestsPerPeriod;
111 private final long duration = Throttler.this.timePeriodMillis;
112 private final long startTime;
113
114 protected TimeSlot() {
115 this(System.currentTimeMillis());
116 }
117
118 protected TimeSlot(long startTime) {
119 this.startTime = startTime;
120 }
121
122 protected void assign() {
123 capacity--;
124 }
125
126 /*
127 * Start the next time slot either now or in the future
128 * (no time slots are being created in the past)
129 */
130 protected TimeSlot next() {
131 return new TimeSlot(Math.max(System.currentTimeMillis(), this.startTime + this.duration));
132 }
133
134 protected boolean isActive() {
135 return startTime <= System.currentTimeMillis();
136 }
137
138 protected boolean isFull() {
139 return capacity <= 0;
140 }
141 }
142 }