package org.apache.gobblin.writer;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.gobblin.annotation.Alpha;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alpha
/* loaded from: input_file:org/apache/gobblin/writer/BufferedAsyncDataWriter.class */
public abstract class BufferedAsyncDataWriter<D> implements AsyncDataWriter<D> {
    private BufferedAsyncDataWriter<D>.RecordProcessor<D> processor;
    private BatchAccumulator<D> accumulator;
    private ExecutorService service = Executors.newFixedThreadPool(1);
    private volatile boolean running = true;
    private final long startTime = System.currentTimeMillis();
    private static final Logger LOG = LoggerFactory.getLogger(BufferedAsyncDataWriter.class);
    private static final WriteResponseMapper<RecordMetadata> WRITE_RESPONSE_WRAPPER = new WriteResponseMapper<RecordMetadata>() { // from class: org.apache.gobblin.writer.BufferedAsyncDataWriter.1
        @Override // org.apache.gobblin.writer.WriteResponseMapper
        public WriteResponse wrap(final RecordMetadata recordMetadata) {
            return new WriteResponse<RecordMetadata>() { // from class: org.apache.gobblin.writer.BufferedAsyncDataWriter.1.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.gobblin.writer.WriteResponse
                public RecordMetadata getRawResponse() {
                    return recordMetadata;
                }

                @Override // org.apache.gobblin.writer.WriteResponse
                public String getStringResponse() {
                    return recordMetadata.toString();
                }

                @Override // org.apache.gobblin.writer.WriteResponse
                public long bytesWritten() {
                    return -1L;
                }
            };
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/writer/BufferedAsyncDataWriter$RecordProcessor.class */
    public class RecordProcessor<D> implements Runnable, Closeable {
        BatchAccumulator<D> accumulator;
        BatchAsyncDataWriter<D> writer;

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.writer.close();
        }

        public RecordProcessor(BatchAccumulator<D> batchAccumulator, BatchAsyncDataWriter<D> batchAsyncDataWriter) {
            this.accumulator = batchAccumulator;
            this.writer = batchAsyncDataWriter;
        }

        @Override // java.lang.Runnable
        public void run() {
            BufferedAsyncDataWriter.LOG.info("Start iterating accumulator");
            while (BufferedAsyncDataWriter.this.running) {
                Batch<D> nextAvailableBatch = this.accumulator.getNextAvailableBatch();
                if (nextAvailableBatch != null) {
                    this.writer.write(nextAvailableBatch, createBatchCallback(nextAvailableBatch));
                }
            }
            this.accumulator.waitClose();
            BufferedAsyncDataWriter.LOG.info("Start to process remaining batches");
            while (true) {
                Batch<D> nextAvailableBatch2 = this.accumulator.getNextAvailableBatch();
                if (nextAvailableBatch2 == null) {
                    this.accumulator.flush();
                    return;
                }
                this.writer.write(nextAvailableBatch2, createBatchCallback(nextAvailableBatch2));
            }
        }

        private WriteCallback createBatchCallback(final Batch<D> batch) {
            return new WriteCallback<Object>() { // from class: org.apache.gobblin.writer.BufferedAsyncDataWriter.RecordProcessor.1
                @Override // org.apache.gobblin.async.Callback
                public void onSuccess(WriteResponse writeResponse) {
                    BufferedAsyncDataWriter.LOG.info("Batch " + batch.getId() + " is on success with size " + batch.getCurrentSizeInByte() + " num of record " + batch.getRecords().size());
                    batch.onSuccess(writeResponse);
                    batch.done();
                    RecordProcessor.this.accumulator.deallocate(batch);
                }

                @Override // org.apache.gobblin.async.Callback
                public void onFailure(Throwable th) {
                    BufferedAsyncDataWriter.LOG.info("Batch " + batch.getId() + " is on failure");
                    batch.onFailure(th);
                    batch.done();
                    RecordProcessor.this.accumulator.deallocate(batch);
                }
            };
        }
    }

    public BufferedAsyncDataWriter(BatchAccumulator<D> batchAccumulator, BatchAsyncDataWriter<D> batchAsyncDataWriter) {
        this.processor = new RecordProcessor<>(batchAccumulator, batchAsyncDataWriter);
        this.accumulator = batchAccumulator;
        try {
            this.service.execute(this.processor);
            this.service.shutdown();
        } catch (Exception e) {
            LOG.error("Cannot start internal thread to consume the data");
        }
    }

    @Override // org.apache.gobblin.writer.AsyncDataWriter
    public Future<WriteResponse> write(D d, @Nullable WriteCallback writeCallback) {
        try {
            return new WriteResponseFuture(this.accumulator.append(d, writeCallback), WRITE_RESPONSE_WRAPPER);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.gobblin.writer.AsyncDataWriter
    public void flush() throws IOException {
        this.accumulator.flush();
    }

    public void forceClose() {
        LOG.info("Force to close the buffer data writer (not supported)");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.running = false;
            this.accumulator.close();
            if (this.service.awaitTermination(60L, TimeUnit.SECONDS)) {
                LOG.info("Closed properly: elapsed " + (System.currentTimeMillis() - this.startTime) + " milliseconds");
            } else {
                forceClose();
            }
        } catch (InterruptedException e) {
            LOG.error("Interruption happened during close " + e.toString());
        } finally {
            this.processor.close();
        }
    }
}
