package gobblin.writer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.io.Closer;
import gobblin.commit.SpeculativeAttemptAwareConstruct;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.State;
import gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import gobblin.records.ControlMessageHandler;
import gobblin.source.extractor.CheckpointableWatermark;
import gobblin.stream.ControlMessage;
import gobblin.stream.RecordEnvelope;
import gobblin.util.AvroUtils;
import gobblin.util.FinalState;
import gobblin.util.Id;
import gobblin.writer.partitioner.WriterPartitioner;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-core-0.11.0.jar:gobblin/writer/PartitionedDataWriter.class */
public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter<D> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PartitionedDataWriter.class);
    private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());
    private final String baseWriterId;
    private final Optional<WriterPartitioner> partitioner;
    private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
    private final Optional<PartitionAwareDataWriterBuilder> builder;
    private final boolean shouldPartition;
    private boolean isSpeculativeAttemptSafe;
    private boolean isWatermarkCapable;
    private int writerIdSuffix = 0;
    private final Closer closer = Closer.create();

    /* loaded from: input_file:WEB-INF/lib/gobblin-core-0.11.0.jar:gobblin/writer/PartitionedDataWriter$PartitionDataWriterMessageHandler.class */
    private class PartitionDataWriterMessageHandler implements ControlMessageHandler {
        private PartitionDataWriterMessageHandler() {
        }

        @Override // gobblin.records.ControlMessageHandler
        public void handleMessage(ControlMessage controlMessage) {
            Iterator it = PartitionedDataWriter.this.partitionWriters.asMap().values().iterator();
            while (it.hasNext()) {
                ((DataWriter) it.next()).getMessageHandler().handleMessage((ControlMessage) controlMessage.getClone());
            }
        }
    }

    public PartitionedDataWriter(DataWriterBuilder<S, D> dataWriterBuilder, final State state) throws IOException {
        this.isSpeculativeAttemptSafe = true;
        this.isWatermarkCapable = true;
        this.baseWriterId = dataWriterBuilder.getWriterId();
        this.partitionWriters = (LoadingCache<GenericRecord, DataWriter<D>>) CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() { // from class: gobblin.writer.PartitionedDataWriter.1
            @Override // com.google.common.cache.CacheLoader
            public DataWriter<D> load(GenericRecord genericRecord) throws Exception {
                return (DataWriter) PartitionedDataWriter.this.closer.register(new InstrumentedPartitionedDataWriterDecorator(PartitionedDataWriter.this.createPartitionWriter(genericRecord), state, genericRecord));
            }
        });
        if (state.contains(ConfigurationKeys.WRITER_PARTITIONER_CLASS)) {
            Preconditions.checkArgument(dataWriterBuilder instanceof PartitionAwareDataWriterBuilder, String.format("%s was specified but the writer %s does not support partitioning.", ConfigurationKeys.WRITER_PARTITIONER_CLASS, dataWriterBuilder.getClass().getCanonicalName()));
            try {
                this.shouldPartition = true;
                this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(dataWriterBuilder));
                this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils.invokeConstructor(Class.forName(state.getProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS)), state, Integer.valueOf(dataWriterBuilder.getBranches()), Integer.valueOf(dataWriterBuilder.getBranch()))));
                Preconditions.checkArgument(this.builder.get().validatePartitionSchema(this.partitioner.get().partitionSchema()), String.format("Writer %s does not support schema from partitioner %s", dataWriterBuilder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
                return;
            } catch (ReflectiveOperationException e) {
                throw new IOException(e);
            }
        }
        this.shouldPartition = false;
        DataWriter<D> build2 = dataWriterBuilder.build2();
        InstrumentedDataWriterDecorator instrumentedDataWriterDecorator = (InstrumentedDataWriterDecorator) this.closer.register(new InstrumentedDataWriterDecorator(build2, state));
        this.isSpeculativeAttemptSafe = isDataWriterForPartitionSafe(build2);
        this.isWatermarkCapable = isDataWriterWatermarkCapable(build2);
        this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, instrumentedDataWriterDecorator);
        this.partitioner = Optional.absent();
        this.builder = Optional.absent();
    }

    private boolean isDataWriterWatermarkCapable(DataWriter<D> dataWriter) {
        return (dataWriter instanceof WatermarkAwareWriter) && ((WatermarkAwareWriter) dataWriter).isWatermarkCapable();
    }

    @Override // gobblin.writer.DataWriter, gobblin.writer.WatermarkAwareWriter
    public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
        try {
            getDataWriterForRecord(recordEnvelope.getRecord()).writeEnvelope(recordEnvelope);
        } catch (ExecutionException e) {
            throw new IOException(e);
        }
    }

    private DataWriter<D> getDataWriterForRecord(D d) throws ExecutionException {
        return this.partitionWriters.get(this.shouldPartition ? this.partitioner.get().partitionForRecord(d) : NON_PARTITIONED_WRITER_KEY);
    }

    @Override // gobblin.writer.DataWriter
    public void commit() throws IOException {
        int i = 0;
        for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
            try {
                entry.getValue().commit();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to commit writer for partition %s.", entry.getKey()), th);
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to commit all writers.");
        }
    }

    @Override // gobblin.writer.DataWriter
    public void cleanup() throws IOException {
        int i = 0;
        for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
            try {
                entry.getValue().cleanup();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to cleanup writer for partition %s.", entry.getKey()));
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to clean up all writers.");
        }
    }

    @Override // gobblin.writer.DataWriter
    public long recordsWritten() {
        long j = 0;
        Iterator<Map.Entry<GenericRecord, DataWriter<D>>> it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += it.next().getValue().recordsWritten();
        }
        return j;
    }

    @Override // gobblin.writer.DataWriter
    public long bytesWritten() throws IOException {
        long j = 0;
        Iterator<Map.Entry<GenericRecord, DataWriter<D>>> it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += it.next().getValue().bytesWritten();
        }
        return j;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.closer.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataWriter<D> createPartitionWriter(GenericRecord genericRecord) throws IOException {
        if (!this.builder.isPresent()) {
            throw new IOException("Writer builder not found. This is an error in the code.");
        }
        PartitionAwareDataWriterBuilder<S, D> forPartition = this.builder.get().forPartition(genericRecord);
        StringBuilder append = new StringBuilder().append(this.baseWriterId).append(Id.SEPARATOR);
        int i = this.writerIdSuffix;
        this.writerIdSuffix = i + 1;
        DataWriter<D> build2 = forPartition.withWriterId(append.append(i).toString()).build2();
        this.isSpeculativeAttemptSafe = this.isSpeculativeAttemptSafe && isDataWriterForPartitionSafe(build2);
        this.isWatermarkCapable = this.isWatermarkCapable && isDataWriterWatermarkCapable(build2);
        return build2;
    }

    @Override // gobblin.util.FinalState
    public State getFinalState() {
        State state = new State();
        try {
            for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
                if (entry.getValue() instanceof FinalState) {
                    State finalState = ((FinalState) entry.getValue()).getFinalState();
                    if (this.shouldPartition) {
                        for (String str : finalState.getPropertyNames()) {
                            finalState.setProp(str + Id.SEPARATOR + AvroUtils.serializeAsPath(entry.getKey(), false, true), finalState.getProp(str));
                        }
                    }
                    state.addAll(finalState);
                }
            }
            state.setProp("RecordsWritten", Long.valueOf(recordsWritten()));
            state.setProp("BytesWritten", Long.valueOf(bytesWritten()));
        } catch (Exception e) {
            log.warn("Failed to get final state." + e.getMessage());
        }
        return state;
    }

    @Override // gobblin.commit.SpeculativeAttemptAwareConstruct
    public boolean isSpeculativeAttemptSafe() {
        return this.isSpeculativeAttemptSafe;
    }

    private boolean isDataWriterForPartitionSafe(DataWriter dataWriter) {
        return (dataWriter instanceof SpeculativeAttemptAwareConstruct) && ((SpeculativeAttemptAwareConstruct) dataWriter).isSpeculativeAttemptSafe();
    }

    @Override // gobblin.writer.WatermarkAwareWriter
    public boolean isWatermarkCapable() {
        return this.isWatermarkCapable;
    }

    @Override // gobblin.writer.WatermarkAwareWriter
    public Map<String, CheckpointableWatermark> getCommittableWatermark() {
        MultiWriterWatermarkTracker multiWriterWatermarkTracker = new MultiWriterWatermarkTracker();
        for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
            if (entry.getValue() instanceof WatermarkAwareWriter) {
                Map<String, CheckpointableWatermark> committableWatermark = ((WatermarkAwareWriter) entry.getValue()).getCommittableWatermark();
                if (!committableWatermark.isEmpty()) {
                    multiWriterWatermarkTracker.committedWatermarks(committableWatermark);
                }
                Map<String, CheckpointableWatermark> unacknowledgedWatermark = ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
                if (!unacknowledgedWatermark.isEmpty()) {
                    multiWriterWatermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
                }
            }
        }
        return multiWriterWatermarkTracker.getAllCommitableWatermarks();
    }

    @Override // gobblin.writer.WatermarkAwareWriter
    public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
        MultiWriterWatermarkTracker multiWriterWatermarkTracker = new MultiWriterWatermarkTracker();
        Iterator<Map.Entry<GenericRecord, DataWriter<D>>> it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            Map<String, CheckpointableWatermark> unacknowledgedWatermark = ((WatermarkAwareWriter) it.next().getValue()).getUnacknowledgedWatermark();
            if (!unacknowledgedWatermark.isEmpty()) {
                multiWriterWatermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
            }
        }
        return multiWriterWatermarkTracker.getAllUnacknowledgedWatermarks();
    }

    @Override // gobblin.writer.DataWriter
    public ControlMessageHandler getMessageHandler() {
        return new PartitionDataWriterMessageHandler();
    }
}
