package gobblin.kafka.writer;

import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import gobblin.instrumented.writer.InstrumentedDataWriter;
import gobblin.util.ConfigUtils;
import java.io.IOException;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/kafka/writer/AsyncBestEffortDataWriter.class */
public class AsyncBestEffortDataWriter<D> extends InstrumentedDataWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(AsyncBestEffortDataWriter.class);
    private static final long MILLIS_TO_NANOS = 1000000;
    public static final String ATTEMPTED_RECORDS_METRIC_NAME_DEFAULT = "gobblin.writer.async.records.attempted";
    public static final String SUCCESS_RECORDS_METRIC_NAME_DEFAULT = "gobblin.writer.async.records.success";
    public static final String FAILED_RECORDS_METRIC_NAME_DEFAULT = "gobblin.writer.async.records.failed";
    public static final long COMMIT_TIMEOUT_IN_NANOS_DEFAULT = 60000000000L;
    public static final long COMMIT_STEP_WAITTIME_MILLIS_DEFAULT = 500;
    public static final double FAILURE_ALLOWANCE_DEFAULT = 0.0d;

    @VisibleForTesting
    final Meter recordsAttempted;

    @VisibleForTesting
    final Meter recordsSuccess;

    @VisibleForTesting
    final Meter recordsFailed;
    private final long commitTimeoutInNanos;
    private final long commitStepWaitTimeMillis;
    private final double failureAllowance;
    private final AsyncDataWriter asyncDataWriter;
    private final WriteCallback writeCallback;

    /* loaded from: input_file:gobblin/kafka/writer/AsyncBestEffortDataWriter$AsyncBestEffortDataWriterBuilder.class */
    public static class AsyncBestEffortDataWriterBuilder {
        private Config config = ConfigFactory.empty();
        private String recordsAttemptedMetricName = AsyncBestEffortDataWriter.ATTEMPTED_RECORDS_METRIC_NAME_DEFAULT;
        private String recordsSuccessMetricName = AsyncBestEffortDataWriter.SUCCESS_RECORDS_METRIC_NAME_DEFAULT;
        private String recordsFailedMetricName = AsyncBestEffortDataWriter.FAILED_RECORDS_METRIC_NAME_DEFAULT;
        private long commitTimeoutInNanos = AsyncBestEffortDataWriter.COMMIT_TIMEOUT_IN_NANOS_DEFAULT;
        private long commitStepWaitTimeMillis = 500;
        private double failureAllowance = AsyncBestEffortDataWriter.FAILURE_ALLOWANCE_DEFAULT;
        private AsyncDataWriter asyncDataWriter;

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder config(Config config) {
            this.config = config;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder recordsAttemptedMetricName(String str) {
            this.recordsAttemptedMetricName = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder recordsSuccessMetricName(String str) {
            this.recordsSuccessMetricName = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder recordsFailedMetricName(String str) {
            this.recordsFailedMetricName = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder commitTimeoutInNanos(long j) {
            this.commitTimeoutInNanos = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder commitStepWaitTimeInMillis(long j) {
            this.commitStepWaitTimeMillis = j;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder failureAllowance(double d) {
            Preconditions.checkArgument(d <= 1.0d && d >= AsyncBestEffortDataWriter.FAILURE_ALLOWANCE_DEFAULT, "Failure Allowance must be a ratio between 0 and 1");
            this.failureAllowance = d;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriterBuilder asyncDataWriter(AsyncDataWriter asyncDataWriter) {
            this.asyncDataWriter = asyncDataWriter;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public AsyncBestEffortDataWriter build() {
            return new AsyncBestEffortDataWriter(this.config, this.recordsAttemptedMetricName, this.recordsSuccessMetricName, this.recordsFailedMetricName, this.commitTimeoutInNanos, this.commitStepWaitTimeMillis, this.failureAllowance, this.asyncDataWriter);
        }
    }

    protected AsyncBestEffortDataWriter(Config config, String str, String str2, String str3, long j, long j2, double d, @NonNull AsyncDataWriter asyncDataWriter) {
        super(ConfigUtils.configToState(config));
        if (asyncDataWriter == null) {
            throw new NullPointerException("asyncDataWriter");
        }
        this.recordsAttempted = getMetricContext().meter(str);
        this.recordsSuccess = getMetricContext().meter(str2);
        this.recordsFailed = getMetricContext().meter(str3);
        this.commitTimeoutInNanos = j;
        this.commitStepWaitTimeMillis = j2;
        Preconditions.checkArgument(d <= 1.0d && d >= FAILURE_ALLOWANCE_DEFAULT, "Failure Allowance must be a ratio between 0 and 1");
        this.failureAllowance = d;
        this.asyncDataWriter = asyncDataWriter;
        this.writeCallback = new WriteCallback() { // from class: gobblin.kafka.writer.AsyncBestEffortDataWriter.1
            @Override // gobblin.kafka.writer.WriteCallback
            public void onSuccess() {
                AsyncBestEffortDataWriter.this.recordsSuccess.mark();
            }

            @Override // gobblin.kafka.writer.WriteCallback
            public void onFailure(Exception exc) {
                AsyncBestEffortDataWriter.this.recordsFailed.mark();
            }
        };
        this.asyncDataWriter.setDefaultCallback(this.writeCallback);
    }

    public void writeImpl(D d) throws IOException {
        this.asyncDataWriter.asyncWrite(d);
        this.recordsAttempted.mark();
    }

    public void cleanup() throws IOException {
        this.asyncDataWriter.cleanup();
    }

    public long recordsWritten() {
        return this.recordsSuccess.getCount();
    }

    public long bytesWritten() throws IOException {
        return this.asyncDataWriter.bytesWritten();
    }

    public void close() throws IOException {
        log.debug("Close called");
        try {
            this.asyncDataWriter.close();
        } finally {
            super.close();
        }
    }

    public void commit() throws IOException {
        log.debug("Commit called, will wait for commitTimeout : " + (this.commitTimeoutInNanos / MILLIS_TO_NANOS) + "ms");
        long nanoTime = System.nanoTime();
        while (System.nanoTime() - nanoTime < this.commitTimeoutInNanos && this.recordsAttempted.getCount() != this.recordsSuccess.getCount() + this.recordsFailed.getCount()) {
            log.debug("Commit waiting... records produced: " + this.recordsAttempted.getCount() + " written: " + this.recordsSuccess.getCount() + " failed: " + this.recordsFailed.getCount());
            try {
                Thread.sleep(this.commitStepWaitTimeMillis);
            } catch (InterruptedException e) {
                throw new IOException("Interrupted while waiting for commit to complete", e);
            }
        }
        log.debug("Commit done waiting");
        long count = this.recordsAttempted.getCount();
        long count2 = this.recordsSuccess.getCount();
        long count3 = this.recordsFailed.getCount();
        long j = (count - count2) - count3;
        long j2 = j + count3;
        if (j > 0) {
            log.warn("Timeout waiting for all writes to be acknowledged. Missing " + j + " responses out of " + count);
        }
        if (j2 > 0 && count > 0) {
            String str = "Commit failed to write " + j2 + " records (" + count3 + " failed, " + j + " unacknowledged) out of " + count + " produced.";
            double d = j2 / count;
            if (d > this.failureAllowance) {
                String str2 = str + "\nAborting because this is greater than the failureAllowance percentage: " + (this.failureAllowance * 100.0d);
                log.error(str2);
                throw new IOException(str2);
            }
            log.warn(str + "\nCommitting because failureRatio percentage: " + (d * 100.0d) + " is less than the failureAllowance percentage: " + (this.failureAllowance * 100.0d));
        }
        log.info("Successfully committed " + count2 + " records.");
    }

    public static AsyncBestEffortDataWriterBuilder builder() {
        return new AsyncBestEffortDataWriterBuilder();
    }
}
