package org.apache.gobblin.writer;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.dataset.PartitionDescriptor;
import org.apache.gobblin.instrumented.writer.InstrumentedDataWriterDecorator;
import org.apache.gobblin.instrumented.writer.InstrumentedPartitionedDataWriterDecorator;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.source.extractor.CheckpointableWatermark;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.MetadataUpdateControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.stream.StreamEntity;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.FinalState;
import org.apache.gobblin.writer.partitioner.WriterPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/writer/PartitionedDataWriter.class */
public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements FinalState, SpeculativeAttemptAwareConstruct, WatermarkAwareWriter<D> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedDataWriter.class);
    private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record((Schema) SchemaBuilder.record("Dummy").fields().endRecord());
    private final String baseWriterId;
    private final State state;
    private final int branchId;
    private final Optional<WriterPartitioner> partitioner;
    private final LoadingCache<GenericRecord, DataWriter<D>> partitionWriters;
    private final Optional<PartitionAwareDataWriterBuilder> builder;
    private final DataWriterBuilder writerBuilder;
    private final boolean shouldPartition;
    private boolean isSpeculativeAttemptSafe;
    private boolean isWatermarkCapable;
    private int writerIdSuffix = 0;
    private final Closer closer = Closer.create();
    private final ControlMessageHandler controlMessageHandler = new PartitionDataWriterMessageHandler();

    /* loaded from: input_file:org/apache/gobblin/writer/PartitionedDataWriter$PartitionDataWriterMessageHandler.class */
    private class PartitionDataWriterMessageHandler implements ControlMessageHandler {
        private PartitionDataWriterMessageHandler() {
        }

        public void handleMessage(ControlMessage controlMessage) {
            StreamEntity.ForkCloner forkCloner = controlMessage.forkCloner();
            if (controlMessage instanceof MetadataUpdateControlMessage) {
                PartitionedDataWriter.this.writerBuilder.withSchema(((MetadataUpdateControlMessage) controlMessage).getGlobalMetadata().getSchema());
            }
            for (DataWriter dataWriter : PartitionedDataWriter.this.partitionWriters.asMap().values()) {
                dataWriter.getMessageHandler().handleMessage(forkCloner.getClone());
            }
            forkCloner.close();
        }
    }

    public PartitionedDataWriter(final DataWriterBuilder<S, D> dataWriterBuilder, final State state) throws IOException {
        this.state = state;
        this.branchId = dataWriterBuilder.branch;
        this.isSpeculativeAttemptSafe = true;
        this.isWatermarkCapable = true;
        this.baseWriterId = dataWriterBuilder.getWriterId();
        this.writerBuilder = dataWriterBuilder;
        this.partitionWriters = CacheBuilder.newBuilder().build(new CacheLoader<GenericRecord, DataWriter<D>>() { // from class: org.apache.gobblin.writer.PartitionedDataWriter.1
            public DataWriter<D> load(final GenericRecord genericRecord) throws Exception {
                return PartitionedDataWriter.this.closer.register(new InstrumentedPartitionedDataWriterDecorator(new CloseOnFlushWriterWrapper(new Supplier<DataWriter<D>>() { // from class: org.apache.gobblin.writer.PartitionedDataWriter.1.1
                    @Override // java.util.function.Supplier
                    public DataWriter<D> get() {
                        try {
                            return PartitionedDataWriter.this.createPartitionWriter(genericRecord);
                        } catch (IOException e) {
                            throw new RuntimeException("Error creating writer", e);
                        }
                    }
                }, state), state, genericRecord));
            }
        });
        if (state.contains("writer.partitioner.class")) {
            Preconditions.checkArgument(dataWriterBuilder instanceof PartitionAwareDataWriterBuilder, String.format("%s was specified but the writer %s does not support partitioning.", "writer.partitioner.class", dataWriterBuilder.getClass().getCanonicalName()));
            try {
                this.shouldPartition = true;
                this.builder = Optional.of(PartitionAwareDataWriterBuilder.class.cast(dataWriterBuilder));
                this.partitioner = Optional.of(WriterPartitioner.class.cast(ConstructorUtils.invokeConstructor(Class.forName(state.getProp("writer.partitioner.class")), new Object[]{state, Integer.valueOf(dataWriterBuilder.getBranches()), Integer.valueOf(dataWriterBuilder.getBranch())})));
                Preconditions.checkArgument(((PartitionAwareDataWriterBuilder) this.builder.get()).validatePartitionSchema(((WriterPartitioner) this.partitioner.get()).partitionSchema()), String.format("Writer %s does not support schema from partitioner %s", dataWriterBuilder.getClass().getCanonicalName(), this.partitioner.getClass().getCanonicalName()));
                return;
            } catch (ReflectiveOperationException e) {
                throw new IOException(e);
            }
        }
        this.shouldPartition = false;
        CloseOnFlushWriterWrapper closeOnFlushWriterWrapper = new CloseOnFlushWriterWrapper(new Supplier<DataWriter<D>>() { // from class: org.apache.gobblin.writer.PartitionedDataWriter.2
            @Override // java.util.function.Supplier
            public DataWriter<D> get() {
                try {
                    return dataWriterBuilder.withWriterId(PartitionedDataWriter.this.baseWriterId + "_" + PartitionedDataWriter.access$408(PartitionedDataWriter.this)).build();
                } catch (IOException e2) {
                    throw new RuntimeException("Error creating writer", e2);
                }
            }
        }, state);
        DataWriter<D> dataWriter = (DataWriter) closeOnFlushWriterWrapper.getDecoratedObject();
        InstrumentedDataWriterDecorator register = this.closer.register(new InstrumentedDataWriterDecorator(closeOnFlushWriterWrapper, state));
        this.isSpeculativeAttemptSafe = isDataWriterForPartitionSafe(dataWriter);
        this.isWatermarkCapable = isDataWriterWatermarkCapable(dataWriter);
        this.partitionWriters.put(NON_PARTITIONED_WRITER_KEY, register);
        this.partitioner = Optional.absent();
        this.builder = Optional.absent();
    }

    private boolean isDataWriterWatermarkCapable(DataWriter<D> dataWriter) {
        return (dataWriter instanceof WatermarkAwareWriter) && ((WatermarkAwareWriter) dataWriter).isWatermarkCapable();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
        try {
            getDataWriterForRecord(recordEnvelope.getRecord()).writeEnvelope(recordEnvelope);
        } catch (ExecutionException e) {
            throw new IOException(e);
        }
    }

    private DataWriter<D> getDataWriterForRecord(D d) throws ExecutionException {
        return (DataWriter) this.partitionWriters.get(this.shouldPartition ? ((WriterPartitioner) this.partitioner.get()).partitionForRecord(d) : NON_PARTITIONED_WRITER_KEY);
    }

    public void commit() throws IOException {
        int i = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter) entry.getValue()).commit();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to commit writer for partition %s.", entry.getKey()), th);
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to commit all writers.");
        }
    }

    public void cleanup() throws IOException {
        int i = 0;
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            try {
                ((DataWriter) entry.getValue()).cleanup();
                i++;
            } catch (Throwable th) {
                log.error(String.format("Failed to cleanup writer for partition %s.", entry.getKey()));
            }
        }
        if (i < this.partitionWriters.asMap().size()) {
            throw new IOException("Failed to clean up all writers.");
        }
    }

    public long recordsWritten() {
        long j = 0;
        Iterator it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += ((DataWriter) ((Map.Entry) it.next()).getValue()).recordsWritten();
        }
        return j;
    }

    public long bytesWritten() throws IOException {
        long j = 0;
        Iterator it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            j += ((DataWriter) ((Map.Entry) it.next()).getValue()).bytesWritten();
        }
        return j;
    }

    public void close() throws IOException {
        try {
            serializePartitionInfoToState();
        } finally {
            this.closer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataWriter<D> createPartitionWriter(GenericRecord genericRecord) throws IOException {
        if (!this.builder.isPresent()) {
            throw new IOException("Writer builder not found. This is an error in the code.");
        }
        PartitionAwareDataWriterBuilder forPartition = ((PartitionAwareDataWriterBuilder) this.builder.get()).forPartition(genericRecord);
        StringBuilder append = new StringBuilder().append(this.baseWriterId).append("_");
        int i = this.writerIdSuffix;
        this.writerIdSuffix = i + 1;
        DataWriter<D> build = forPartition.withWriterId(append.append(i).toString()).build();
        this.isSpeculativeAttemptSafe = this.isSpeculativeAttemptSafe && isDataWriterForPartitionSafe(build);
        this.isWatermarkCapable = this.isWatermarkCapable && isDataWriterWatermarkCapable(build);
        return build;
    }

    public State getFinalState() {
        State state = new State();
        try {
            for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
                if (entry.getValue() instanceof FinalState) {
                    State finalState = ((FinalState) entry.getValue()).getFinalState();
                    if (this.shouldPartition) {
                        for (String str : finalState.getPropertyNames()) {
                            finalState.setProp(str + "_" + AvroUtils.serializeAsPath((GenericRecord) entry.getKey(), false, true), finalState.getProp(str));
                        }
                    }
                    state.addAll(finalState);
                }
            }
            state.setProp("RecordsWritten", Long.valueOf(recordsWritten()));
            state.setProp("BytesWritten", Long.valueOf(bytesWritten()));
        } catch (Exception e) {
            log.warn("Failed to get final state." + e.getMessage());
        }
        return state;
    }

    @Override // org.apache.gobblin.commit.SpeculativeAttemptAwareConstruct
    public boolean isSpeculativeAttemptSafe() {
        return this.isSpeculativeAttemptSafe;
    }

    private boolean isDataWriterForPartitionSafe(DataWriter dataWriter) {
        return (dataWriter instanceof SpeculativeAttemptAwareConstruct) && ((SpeculativeAttemptAwareConstruct) dataWriter).isSpeculativeAttemptSafe();
    }

    public boolean isWatermarkCapable() {
        return this.isWatermarkCapable;
    }

    public Map<String, CheckpointableWatermark> getCommittableWatermark() {
        MultiWriterWatermarkTracker multiWriterWatermarkTracker = new MultiWriterWatermarkTracker();
        for (Map.Entry entry : this.partitionWriters.asMap().entrySet()) {
            if (entry.getValue() instanceof WatermarkAwareWriter) {
                Map committableWatermark = ((WatermarkAwareWriter) entry.getValue()).getCommittableWatermark();
                if (!committableWatermark.isEmpty()) {
                    multiWriterWatermarkTracker.committedWatermarks(committableWatermark);
                }
                Map unacknowledgedWatermark = ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
                if (!unacknowledgedWatermark.isEmpty()) {
                    multiWriterWatermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
                }
            }
        }
        return multiWriterWatermarkTracker.getAllCommitableWatermarks();
    }

    public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
        MultiWriterWatermarkTracker multiWriterWatermarkTracker = new MultiWriterWatermarkTracker();
        Iterator it = this.partitionWriters.asMap().entrySet().iterator();
        while (it.hasNext()) {
            Map unacknowledgedWatermark = ((WatermarkAwareWriter) ((Map.Entry) it.next()).getValue()).getUnacknowledgedWatermark();
            if (!unacknowledgedWatermark.isEmpty()) {
                multiWriterWatermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
            }
        }
        return multiWriterWatermarkTracker.getAllUnacknowledgedWatermarks();
    }

    public ControlMessageHandler getMessageHandler() {
        return this.controlMessageHandler;
    }

    private static String getPartitionsKey(int i) {
        return String.format("writer.%d.partitions", Integer.valueOf(i));
    }

    private void serializePartitionInfoToState() {
        ArrayList arrayList = new ArrayList();
        for (DataWriter dataWriter : this.partitionWriters.asMap().values()) {
            PartitionDescriptor dataDescriptor = dataWriter.getDataDescriptor();
            if (null == dataDescriptor) {
                log.warn("Drop partition info as writer {} returns a null PartitionDescriptor", dataWriter.toString());
            } else if (dataDescriptor instanceof PartitionDescriptor) {
                arrayList.add(dataDescriptor);
            } else {
                log.warn("Drop partition info as writer {} does not return a PartitionDescriptor", dataWriter.toString());
            }
        }
        if (arrayList.size() > 0) {
            this.state.setProp(getPartitionsKey(this.branchId), PartitionDescriptor.toPartitionJsonList(arrayList));
        } else {
            log.info("Partitions info not available. Will not serialize partitions");
        }
    }

    public static List<PartitionDescriptor> getPartitionInfoAndClean(State state, int i) {
        String partitionsKey = getPartitionsKey(i);
        String prop = state.getProp(partitionsKey);
        if (Strings.isNullOrEmpty(prop)) {
            return Lists.newArrayList();
        }
        state.removeProp(partitionsKey);
        return PartitionDescriptor.fromPartitionJsonList(prop);
    }

    static /* synthetic */ int access$408(PartitionedDataWriter partitionedDataWriter) {
        int i = partitionedDataWriter.writerIdSuffix;
        partitionedDataWriter.writerIdSuffix = i + 1;
        return i;
    }
}
