package gobblin.instrumented.writer;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.instrumented.Instrumentable;
import gobblin.instrumented.Instrumented;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.MetricContext;
import gobblin.metrics.MetricNames;
import gobblin.metrics.Tag;
import gobblin.util.ExecutorsUtils;
import gobblin.util.FinalState;
import gobblin.writer.DataWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/instrumented/writer/InstrumentedDataWriterBase.class */
public abstract class InstrumentedDataWriterBase<D> implements DataWriter<D>, Instrumentable, Closeable, FinalState {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) InstrumentedDataWriterBase.class);
    private final Optional<ScheduledThreadPoolExecutor> writerMetricsUpdater;
    private final boolean instrumentationEnabled;
    private MetricContext metricContext;
    private Optional<Meter> recordsInMeter;
    private Optional<Meter> successfulWritesMeter;
    private Optional<Meter> failedWritesMeter;
    private Optional<Timer> dataWriterTimer;
    private Optional<Meter> recordsWrittenMeter;
    private Optional<Meter> bytesWrittenMeter;
    protected final Closer closer;
    public static final String WRITER_METRICS_UPDATER_INTERVAL = "gobblin.writer.metrics.updater.interval";
    public static final long DEFAULT_WRITER_METRICS_UPDATER_INTERVAL = 30000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/instrumented/writer/InstrumentedDataWriterBase$WriterMetricsUpdater.class */
    public class WriterMetricsUpdater implements Runnable {
        private WriterMetricsUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            InstrumentedDataWriterBase.this.updateRecordsWrittenMeter();
            InstrumentedDataWriterBase.this.updateBytesWrittenMeter();
        }
    }

    public InstrumentedDataWriterBase(State state) {
        this(state, Optional.absent());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public InstrumentedDataWriterBase(State state, Optional<Class<?>> optional) {
        this.closer = Closer.create();
        this.instrumentationEnabled = GobblinMetrics.isEnabled(state);
        this.metricContext = (MetricContext) this.closer.register(Instrumented.getMetricContext(state, optional.or((Optional<Class<?>>) getClass())));
        if (this.instrumentationEnabled) {
            this.writerMetricsUpdater = Optional.of(buildWriterMetricsUpdater());
            scheduleWriterMetricsUpdater(this.writerMetricsUpdater.get(), getWriterMetricsUpdaterInterval(state));
        } else {
            this.writerMetricsUpdater = Optional.absent();
        }
        regenerateMetrics();
    }

    public void switchMetricContext(List<Tag<?>> list) {
        this.metricContext = (MetricContext) this.closer.register(Instrumented.newContextFromReferenceContext(this.metricContext, list, Optional.absent()));
        regenerateMetrics();
    }

    public void switchMetricContext(MetricContext metricContext) {
        this.metricContext = metricContext;
        regenerateMetrics();
    }

    protected void regenerateMetrics() {
        if (isInstrumentationEnabled()) {
            this.recordsInMeter = Optional.of(this.metricContext.meter(MetricNames.DataWriterMetrics.RECORDS_IN_METER));
            this.successfulWritesMeter = Optional.of(this.metricContext.meter(MetricNames.DataWriterMetrics.SUCCESSFUL_WRITES_METER));
            this.failedWritesMeter = Optional.of(this.metricContext.meter(MetricNames.DataWriterMetrics.FAILED_WRITES_METER));
            setRecordsWrittenMeter(isInstrumentationEnabled());
            setBytesWrittenMeter(isInstrumentationEnabled());
            this.dataWriterTimer = Optional.of(this.metricContext.timer(MetricNames.DataWriterMetrics.WRITE_TIMER));
            return;
        }
        this.recordsInMeter = Optional.absent();
        this.successfulWritesMeter = Optional.absent();
        this.failedWritesMeter = Optional.absent();
        setRecordsWrittenMeter(isInstrumentationEnabled());
        setBytesWrittenMeter(isInstrumentationEnabled());
        this.dataWriterTimer = Optional.absent();
    }

    private synchronized void setRecordsWrittenMeter(boolean z) {
        if (z) {
            this.recordsWrittenMeter = Optional.of(this.metricContext.meter(MetricNames.DataWriterMetrics.RECORDS_WRITTEN_METER));
        } else {
            this.recordsWrittenMeter = Optional.absent();
        }
    }

    private synchronized void setBytesWrittenMeter(boolean z) {
        if (z) {
            this.bytesWrittenMeter = Optional.of(this.metricContext.meter(MetricNames.DataWriterMetrics.BYTES_WRITTEN_METER));
        } else {
            this.bytesWrittenMeter = Optional.absent();
        }
    }

    public List<Tag<?>> generateTags(State state) {
        return Lists.newArrayList();
    }

    public boolean isInstrumentationEnabled() {
        return this.instrumentationEnabled;
    }

    @Override // gobblin.writer.DataWriter
    public void write(D d) throws IOException {
        if (!isInstrumentationEnabled()) {
            writeImpl(d);
            return;
        }
        try {
            long nanoTime = System.nanoTime();
            beforeWrite(d);
            writeImpl(d);
            onSuccessfulWrite(nanoTime);
        } catch (IOException e) {
            onException(e);
            throw e;
        }
    }

    public void beforeWrite(D d) {
        Instrumented.markMeter(this.recordsInMeter);
    }

    public void onSuccessfulWrite(long j) {
        Instrumented.updateTimer(this.dataWriterTimer, System.nanoTime() - j, TimeUnit.NANOSECONDS);
        Instrumented.markMeter(this.successfulWritesMeter);
    }

    public void onException(Exception exc) {
        Instrumented.markMeter(this.failedWritesMeter);
    }

    public abstract void writeImpl(D d) throws IOException;

    public State getFinalState() {
        return new State();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.closer.close();
        } finally {
            if (this.writerMetricsUpdater.isPresent()) {
                ExecutorsUtils.shutdownExecutorService(this.writerMetricsUpdater.get(), Optional.of(log));
            }
        }
    }

    public MetricContext getMetricContext() {
        return this.metricContext;
    }

    @Override // gobblin.writer.DataWriter
    public void commit() throws IOException {
        updateRecordsWrittenMeter();
        updateBytesWrittenMeter();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateRecordsWrittenMeter() {
        if (this.recordsWrittenMeter.isPresent()) {
            this.recordsWrittenMeter.get().mark(recordsWritten() - this.recordsWrittenMeter.get().getCount());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateBytesWrittenMeter() {
        if (this.bytesWrittenMeter.isPresent()) {
            try {
                this.bytesWrittenMeter.get().mark(bytesWritten() - this.bytesWrittenMeter.get().getCount());
            } catch (IOException e) {
                log.error("Cannot get bytesWritten for DataWriter, will not update " + this.bytesWrittenMeter.get().toString(), (Throwable) e);
            }
        }
    }

    private static ScheduledThreadPoolExecutor buildWriterMetricsUpdater() {
        return new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("WriterMetricsUpdater-%d")));
    }

    private static long getWriterMetricsUpdaterInterval(State state) {
        return state.getPropAsLong(WRITER_METRICS_UPDATER_INTERVAL, 30000L);
    }

    private ScheduledFuture<?> scheduleWriterMetricsUpdater(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long j) {
        return scheduledThreadPoolExecutor.scheduleAtFixedRate(new WriterMetricsUpdater(), j, j, TimeUnit.MILLISECONDS);
    }
}
