package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Committer;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import org.apache.druid.indexing.overlord.SegmentPublishResult;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.timeline.DataSegment;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SequenceMetadata.class */
public class SequenceMetadata<PartitionIdType, SequenceOffsetType> {
    private static final EmittingLogger log = new EmittingLogger(SequenceMetadata.class);
    private final int sequenceId;
    private final String sequenceName;
    private final Set<PartitionIdType> exclusiveStartPartitions;
    private final Set<PartitionIdType> assignments;
    private final boolean sentinel;
    private final ReentrantLock lock = new ReentrantLock();
    final Map<PartitionIdType, SequenceOffsetType> startOffsets;
    final Map<PartitionIdType, SequenceOffsetType> endOffsets;
    private boolean checkpointed;

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SequenceMetadata$SequenceMetadataTransactionalSegmentPublisher.class */
    private class SequenceMetadataTransactionalSegmentPublisher implements TransactionalSegmentPublisher {
        private final SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> runner;
        private final TaskToolbox toolbox;
        private final boolean useTransaction;

        public SequenceMetadataTransactionalSegmentPublisher(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> seekableStreamIndexTaskRunner, TaskToolbox taskToolbox, boolean z) {
            this.runner = seekableStreamIndexTaskRunner;
            this.toolbox = taskToolbox;
            this.useTransaction = z;
        }

        public SegmentPublishResult publishAnnotatedSegments(@Nullable Set<DataSegment> set, @Nullable Set<DataSegment> set2, Set<DataSegment> set3, @Nullable Object obj) throws IOException {
            SegmentTransactionalInsertAction appendAction;
            if (set != null && !set.isEmpty()) {
                throw new ISE("Stream ingestion task unexpectedly attempted to overwrite segments: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(set)});
            }
            if (set2 != null && !set2.isEmpty()) {
                throw new ISE("Stream ingestion task unexpectedly attempted to drop segments: %s", new Object[]{SegmentUtils.commaSeparatedIdentifiers(set2)});
            }
            SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata = this.runner.deserializePartitionsFromMetadata(this.toolbox.getJsonMapper(), ((Map) Preconditions.checkNotNull(obj, "commitMetadata")).get("publishPartitions"));
            if (!SequenceMetadata.this.getEndOffsets().equals(deserializePartitionsFromMetadata.getPartitionSequenceNumberMap())) {
                throw new ISE("Driver for sequence[%s] attempted to publish invalid metadata[%s].", new Object[]{SequenceMetadata.this.toString(), obj});
            }
            if (set3.isEmpty()) {
                SeekableStreamStartSequenceNumbers seekableStreamStartSequenceNumbers = new SeekableStreamStartSequenceNumbers(deserializePartitionsFromMetadata.getStream(), SequenceMetadata.this.getStartOffsets(), SequenceMetadata.this.exclusiveStartPartitions);
                if (SequenceMetadata.this.isMetadataUnchanged(seekableStreamStartSequenceNumbers, deserializePartitionsFromMetadata)) {
                    SequenceMetadata.log.info("With empty segment set, start offsets [%s] and end offsets [%s] are the same, skipping metadata commit.", new Object[]{seekableStreamStartSequenceNumbers, deserializePartitionsFromMetadata});
                    return SegmentPublishResult.ok(set3);
                }
                SequenceMetadata.log.info("With empty segment set, start offsets [%s] and end offsets [%s] changed, committing new metadata.", new Object[]{seekableStreamStartSequenceNumbers, deserializePartitionsFromMetadata});
                appendAction = SegmentTransactionalInsertAction.commitMetadataOnlyAction(this.runner.getAppenderator().getDataSource(), this.runner.createDataSourceMetadata(seekableStreamStartSequenceNumbers), this.runner.createDataSourceMetadata(deserializePartitionsFromMetadata));
            } else {
                appendAction = this.useTransaction ? SegmentTransactionalInsertAction.appendAction(set3, this.runner.createDataSourceMetadata(new SeekableStreamStartSequenceNumbers(deserializePartitionsFromMetadata.getStream(), SequenceMetadata.this.getStartOffsets(), SequenceMetadata.this.exclusiveStartPartitions)), this.runner.createDataSourceMetadata(deserializePartitionsFromMetadata)) : SegmentTransactionalInsertAction.appendAction(set3, null, null);
            }
            return (SegmentPublishResult) this.toolbox.getTaskActionClient().submit(appendAction);
        }

        public boolean supportsEmptyPublish() {
            return true;
        }
    }

    @JsonCreator
    public SequenceMetadata(@JsonProperty("sequenceId") int i, @JsonProperty("sequenceName") String str, @JsonProperty("startOffsets") Map<PartitionIdType, SequenceOffsetType> map, @JsonProperty("endOffsets") Map<PartitionIdType, SequenceOffsetType> map2, @JsonProperty("checkpointed") boolean z, @JsonProperty("exclusiveStartPartitions") Set<PartitionIdType> set) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(map);
        Preconditions.checkNotNull(map2);
        this.sequenceId = i;
        this.sequenceName = str;
        this.startOffsets = ImmutableMap.copyOf(map);
        this.endOffsets = new HashMap(map2);
        this.assignments = new HashSet(map.keySet());
        this.checkpointed = z;
        this.sentinel = false;
        this.exclusiveStartPartitions = set == null ? Collections.emptySet() : set;
    }

    @JsonProperty
    public Set<PartitionIdType> getExclusiveStartPartitions() {
        return this.exclusiveStartPartitions;
    }

    @JsonProperty
    public int getSequenceId() {
        return this.sequenceId;
    }

    @JsonProperty
    public boolean isCheckpointed() {
        this.lock.lock();
        try {
            return this.checkpointed;
        } finally {
            this.lock.unlock();
        }
    }

    @JsonProperty
    public String getSequenceName() {
        return this.sequenceName;
    }

    @JsonProperty
    public Map<PartitionIdType, SequenceOffsetType> getStartOffsets() {
        return this.startOffsets;
    }

    @JsonProperty
    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() {
        this.lock.lock();
        try {
            return this.endOffsets;
        } finally {
            this.lock.unlock();
        }
    }

    @JsonProperty
    public boolean isSentinel() {
        return this.sentinel;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setEndOffsets(Map<PartitionIdType, SequenceOffsetType> map) {
        this.lock.lock();
        try {
            this.endOffsets.putAll(map);
            this.checkpointed = true;
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateAssignments(Map<PartitionIdType, SequenceOffsetType> map, BiFunction<SequenceOffsetType, SequenceOffsetType, Boolean> biFunction) {
        this.lock.lock();
        try {
            this.assignments.clear();
            map.forEach((obj, obj2) -> {
                if (((Boolean) biFunction.apply(obj2, this.endOffsets.get(obj))).booleanValue()) {
                    this.assignments.add(obj);
                }
            });
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return !this.assignments.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean canHandle(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> seekableStreamIndexTaskRunner, OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, ?> orderedPartitionableRecord) {
        boolean z;
        boolean z2;
        this.lock.lock();
        try {
            OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber = seekableStreamIndexTaskRunner.createSequenceNumber(this.endOffsets.get(orderedPartitionableRecord.getPartitionId()));
            OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber2 = seekableStreamIndexTaskRunner.createSequenceNumber(this.startOffsets.get(orderedPartitionableRecord.getPartitionId()));
            OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber3 = seekableStreamIndexTaskRunner.createSequenceNumber(orderedPartitionableRecord.getSequenceNumber());
            if (!isOpen() || createSequenceNumber3 == null || createSequenceNumber == null || createSequenceNumber2 == null) {
                return false;
            }
            if (seekableStreamIndexTaskRunner.isEndOffsetExclusive()) {
                z = createSequenceNumber3.compareTo(createSequenceNumber2) >= 0;
            } else {
                z = createSequenceNumber3.compareTo(createSequenceNumber2) >= (getExclusiveStartPartitions().contains(orderedPartitionableRecord.getPartitionId()) ? 1 : 0);
            }
            if (seekableStreamIndexTaskRunner.isEndOffsetExclusive()) {
                z2 = z & (createSequenceNumber3.compareTo(createSequenceNumber) < 0);
            } else {
                z2 = z & (createSequenceNumber3.compareTo(createSequenceNumber) <= 0);
            }
            boolean z3 = z2;
            this.lock.unlock();
            return z3;
        } finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        this.lock.lock();
        try {
            return "SequenceMetadata{sequenceId=" + this.sequenceId + ", sequenceName='" + this.sequenceName + "', assignments=" + this.assignments + ", startOffsets=" + this.startOffsets + ", exclusiveStartPartitions=" + this.exclusiveStartPartitions + ", endOffsets=" + this.endOffsets + ", sentinel=" + this.sentinel + ", checkpointed=" + this.checkpointed + '}';
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Supplier<Committer> getCommitterSupplier(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> seekableStreamIndexTaskRunner, String str, Map<PartitionIdType, SequenceOffsetType> map) {
        return () -> {
            return new Committer() { // from class: org.apache.druid.indexing.seekablestream.SequenceMetadata.1
                /* JADX WARN: Multi-variable type inference failed */
                public Object getMetadata() {
                    SequenceMetadata.this.lock.lock();
                    try {
                        Preconditions.checkState(SequenceMetadata.this.assignments.isEmpty(), "This committer can be used only once all the records till sequences [%s] have been consumed, also make sure to call updateAssignments before using this committer", new Object[]{SequenceMetadata.this.endOffsets});
                        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : SequenceMetadata.this.endOffsets.entrySet()) {
                            SequenceOffsetType value = entry.getValue();
                            if (map.containsKey(entry.getKey()) && seekableStreamIndexTaskRunner.createSequenceNumber(map.get(entry.getKey())).compareTo(seekableStreamIndexTaskRunner.createSequenceNumber(value)) > 0) {
                                value = map.get(entry.getKey());
                            }
                            map.put(entry.getKey(), value);
                        }
                        ImmutableMap of = ImmutableMap.of("nextPartitions", new SeekableStreamStartSequenceNumbers(str, map, SequenceMetadata.this.exclusiveStartPartitions), "publishPartitions", new SeekableStreamEndSequenceNumbers(str, SequenceMetadata.this.endOffsets));
                        SequenceMetadata.this.lock.unlock();
                        return of;
                    } catch (Throwable th) {
                        SequenceMetadata.this.lock.unlock();
                        throw th;
                    }
                }

                public void run() {
                }
            };
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TransactionalSegmentPublisher createPublisher(SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?> seekableStreamIndexTaskRunner, TaskToolbox taskToolbox, boolean z) {
        return new SequenceMetadataTransactionalSegmentPublisher(seekableStreamIndexTaskRunner, taskToolbox, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isMetadataUnchanged(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> seekableStreamSequenceNumbers, SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> seekableStreamSequenceNumbers2) {
        return seekableStreamSequenceNumbers.getPartitionSequenceNumberMap().equals(seekableStreamSequenceNumbers2.getPartitionSequenceNumberMap());
    }
}
