package gobblin.writer;

import com.codahale.metrics.Timer;
import com.github.rholder.retry.RetryerBuilder;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.GobblinMetrics;
import gobblin.stream.RecordEnvelope;
import gobblin.util.Decorator;
import gobblin.util.FinalState;
import gobblin.util.limiter.Limiter;
import gobblin.util.limiter.RateBasedLimiter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-core-0.11.0.jar:gobblin/writer/ThrottleWriter.class */
public class ThrottleWriter<D> extends WriterWrapper<D> implements Decorator, FinalState, Retriable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ThrottleWriter.class);
    public static final String WRITER_THROTTLE_TYPE_KEY = "gobblin.writer.throttle_type";
    public static final String WRITER_LIMIT_RATE_LIMIT_KEY = "gobblin.writer.throttle_rate";
    public static final String WRITES_THROTTLED_TIMER = "gobblin.writer.throttled_time";
    public static final String THROTTLED_TIME_KEY = "ThrottledTime";
    private static final String LOCAL_JOB_LAUNCHER_TYPE = "LOCAL";
    private final State state;
    private final DataWriter<D> writer;
    private final Limiter limiter;
    private final ThrottleType type;
    private final Optional<Timer> throttledTimer;
    private long throttledTime;

    /* loaded from: input_file:WEB-INF/lib/gobblin-core-0.11.0.jar:gobblin/writer/ThrottleWriter$ThrottleType.class */
    public enum ThrottleType {
        QPS,
        Bytes
    }

    public ThrottleWriter(DataWriter<D> dataWriter, State state) {
        Preconditions.checkNotNull(dataWriter, "DataWriter is required.");
        Preconditions.checkNotNull(state, "State is required.");
        this.state = state;
        this.writer = dataWriter;
        this.type = ThrottleType.valueOf(state.getProp(WRITER_THROTTLE_TYPE_KEY));
        LOG.info("Rate limit for each writer: " + computeRateLimit(state) + " " + this.type);
        this.limiter = new RateBasedLimiter(computeRateLimit(state));
        if (GobblinMetrics.isEnabled(state)) {
            this.throttledTimer = Optional.of(Instrumented.getMetricContext(state, getClass()).timer(WRITES_THROTTLED_TIMER));
        } else {
            this.throttledTimer = Optional.absent();
        }
    }

    @Override // gobblin.util.Decorator
    public Object getDecoratedObject() {
        return this.writer;
    }

    private int computeRateLimit(State state) {
        return Math.max(state.getPropAsInt(WRITER_LIMIT_RATE_LIMIT_KEY) / Math.max(Math.min(LOCAL_JOB_LAUNCHER_TYPE.equals(state.getProp(ConfigurationKeys.JOB_LAUNCHER_TYPE_KEY, LOCAL_JOB_LAUNCHER_TYPE)) ? state.getPropAsInt(ConfigurationKeys.TASK_EXECUTOR_THREADPOOL_SIZE_KEY, 2) : state.getPropAsInt(ConfigurationKeys.MR_JOB_MAX_MAPPERS_KEY, 100), state.getPropAsInt(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 20)), 1), 1);
    }

    @Override // gobblin.writer.DataWriter, gobblin.writer.WatermarkAwareWriter
    public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
        try {
            if (ThrottleType.QPS.equals(this.type)) {
                acquirePermits(1L);
            }
            long bytesWritten = this.writer.bytesWritten();
            this.writer.writeEnvelope(recordEnvelope);
            if (ThrottleType.Bytes.equals(this.type)) {
                long bytesWritten2 = this.writer.bytesWritten() - bytesWritten;
                if (bytesWritten2 < 0) {
                    throw new UnsupportedOperationException("Cannot throttle on bytes because " + this.writer.getClass().getSimpleName() + " does not supports bytesWritten");
                }
                if (bytesWritten2 > 0) {
                    acquirePermits(bytesWritten2);
                }
            }
        } catch (InterruptedException e) {
            throw new IOException("Failed while acquiring permits.", e);
        }
    }

    private void acquirePermits(long j) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        this.limiter.acquirePermits(j);
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (this.throttledTimer.isPresent()) {
            Instrumented.updateTimer(this.throttledTimer, currentTimeMillis2, TimeUnit.MILLISECONDS);
        }
        this.throttledTime += currentTimeMillis2;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.writer.close();
    }

    @Override // gobblin.writer.DataWriter
    public void commit() throws IOException {
        this.writer.commit();
    }

    @Override // gobblin.writer.DataWriter
    public void cleanup() throws IOException {
        this.writer.cleanup();
    }

    @Override // gobblin.writer.DataWriter
    public long recordsWritten() {
        return this.writer.recordsWritten();
    }

    @Override // gobblin.writer.DataWriter
    public long bytesWritten() throws IOException {
        return this.writer.bytesWritten();
    }

    @Override // gobblin.writer.Retriable
    public RetryerBuilder<Void> getRetryerBuilder() {
        return this.writer instanceof Retriable ? ((Retriable) this.writer).getRetryerBuilder() : RetryWriter.createRetryBuilder(this.state);
    }

    @Override // gobblin.util.FinalState
    public State getFinalState() {
        State state = new State();
        if (this.writer instanceof FinalState) {
            state.addAll(((FinalState) this.writer).getFinalState());
        } else {
            LOG.warn("Wrapped writer does not implement FinalState: " + this.writer.getClass());
        }
        state.setProp(THROTTLED_TIME_KEY, Long.valueOf(this.throttledTime));
        return state;
    }
}
