package gobblin.writer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.io.Closer;
import gobblin.configuration.State;
import gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import gobblin.writer.partitioner.WriterPartitioner;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/writer/PartitionedDataWriter.class */
public class PartitionedDataWriter<S, D> implements DataWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedDataWriter.class);
    private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record((Schema) SchemaBuilder.record("Dummy").fields().endRecord());
    private final String baseWriterId;
    private final Optional<WriterPartitioner> partitioner;
    private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
    private final Optional<PartitionAwareDataWriterBuilder> builder;
    private final boolean shouldPartition;
    private int writerIdSuffix = 0;
    private final Closer closer = Closer.create();

    public PartitionedDataWriter(DataWriterBuilder<S, D> dataWriterBuilder, final State state) throws IOException {
        this.baseWriterId = dataWriterBuilder.getWriterId();
        this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() { // from class: gobblin.writer.PartitionedDataWriter.1
            public DataWriter<D> load(GenericRecord genericRecord) throws Exception {
                return PartitionedDataWriter.this.closer.register(new InstrumentedPartitionedDataWriterDecorator(PartitionedDataWriter.this.createPartitionWriter(genericRecord), state, genericRecord));
            }
        });
        if (!state.contains("writer.partitioner.class")) {
            this.shouldPartition = false;
            this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, (InstrumentedDataWriterDecorator) this.closer.register(new InstrumentedDataWriterDecorator(dataWriterBuilder.build(), state)));
            this.partitioner = Optional.absent();
            this.builder = Optional.absent();
            return;
        }
        Preconditions.checkArgument(dataWriterBuilder instanceof PartitionAwareDataWriterBuilder, String.format("%s was specified but the writer %s does not support partitioning.", "writer.partitioner.class", dataWriterBuilder.getClass().getCanonicalName()));
        try {
            this.shouldPartition = true;
            this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(dataWriterBuilder));
            this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils.invokeConstructor(Class.forName(state.getProp("writer.partitioner.class")), new Object[]{state, Integer.valueOf(dataWriterBuilder.getBranches()), Integer.valueOf(dataWriterBuilder.getBranch())})));
            Preconditions.checkArgument(((PartitionAwareDataWriterBuilder) this.builder.get()).validatePartitionSchema(((WriterPartitioner) this.partitioner.get()).partitionSchema()), String.format("Writer %s does not support schema from partitioner %s", dataWriterBuilder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
        } catch (ReflectiveOperationException e) {
            throw new IOException(e);
        }
    }

    public void write(D d) throws IOException {
        try {
            ((DataWriter) this.partitionWriters.get(this.shouldPartition ? ((WriterPartitioner) this.partitioner.get()).partitionForRecord(d) : NON_PARTITIONED_WRITER_KEY)).write(d);
        } catch (ExecutionException e) {
            throw new IOException(e);
        }
    }

    public void commit() throws IOException {
        int i = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter) entry.getValue()).commit();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to commit writer for partition %s.", entry.getKey()), th);
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to commit all writers.");
        }
    }

    public void cleanup() throws IOException {
        int i = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter) entry.getValue()).cleanup();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to cleanup writer for partition %s.", entry.getKey()));
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to clean up all writers.");
        }
    }

    public long recordsWritten() {
        long j = 0;
        Iterator it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += ((DataWriter) ((Map.Entry) it.next()).getValue()).recordsWritten();
        }
        return j;
    }

    public long bytesWritten() throws IOException {
        long j = 0;
        Iterator it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += ((DataWriter) ((Map.Entry) it.next()).getValue()).bytesWritten();
        }
        return j;
    }

    public void close() throws IOException {
        this.closer.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataWriter<D> createPartitionWriter(GenericRecord genericRecord) throws IOException {
        if (!this.builder.isPresent()) {
            throw new IOException("Writer builder not found. This is an error in the code.");
        }
        PartitionAwareDataWriterBuilder<S, D> forPartition = ((PartitionAwareDataWriterBuilder) this.builder.get()).forPartition(genericRecord);
        StringBuilder append = new StringBuilder().append(this.baseWriterId).append("_");
        int i = this.writerIdSuffix;
        this.writerIdSuffix = i + 1;
        return forPartition.withWriterId(append.append(i).toString()).build();
    }
}
