package org.apache.druid.indexing.seekablestream;

import com.google.common.base.Throwables;
import java.io.File;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.systemfield.SystemFieldDecoratorFactory;
import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.class */
public class RecordSupplierInputSource<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> extends AbstractInputSource {
    private static final Logger LOG = new Logger(RecordSupplierInputSource.class);
    private final String topic;
    private final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
    private final boolean useEarliestOffset;
    private final Integer iteratorTimeoutMs;

    public RecordSupplierInputSource(String str, RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, boolean z, Integer num) {
        this.topic = str;
        this.recordSupplier = recordSupplier;
        this.useEarliestOffset = z;
        this.iteratorTimeoutMs = num;
        assignAndSeek(recordSupplier);
    }

    private void assignAndSeek(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier) {
        try {
            Set<StreamPartition<PartitionIdType>> set = (Set) recordSupplier.getPartitionIds(this.topic).stream().map(obj -> {
                return StreamPartition.of(this.topic, obj);
            }).collect(Collectors.toSet());
            recordSupplier.assign(set);
            if (this.useEarliestOffset) {
                recordSupplier.seekToEarliest(set);
            } else {
                recordSupplier.seekToLatest(set);
            }
        } catch (Exception e) {
            Object[] objArr = new Object[3];
            objArr[0] = this.useEarliestOffset ? "earliest" : "latest";
            objArr[1] = this.topic;
            objArr[2] = Throwables.getRootCause(e).getMessage();
            throw new SamplerException(e, "Exception while seeking to the [%s] offset of partitions in topic [%s]: %s", objArr);
        }
    }

    public boolean isSplittable() {
        return false;
    }

    public boolean needsFormat() {
        return true;
    }

    protected InputSourceReader formattableReader(InputRowSchema inputRowSchema, InputFormat inputFormat, @Nullable File file) {
        return new InputEntityIteratingReader(inputRowSchema, JsonInputFormat.withLineSplittable(inputFormat, false), createEntityIterator(), SystemFieldDecoratorFactory.NONE, file);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CloseableIterator<InputEntity> createEntityIterator() {
        return new CloseableIterator<InputEntity>() { // from class: org.apache.druid.indexing.seekablestream.RecordSupplierInputSource.1
            private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> recordIterator;
            private Iterator<? extends ByteEntity> bytesIterator;
            private volatile boolean closed;
            private final long createTime = System.currentTimeMillis();
            private final Long terminationTime;

            {
                this.terminationTime = RecordSupplierInputSource.this.iteratorTimeoutMs != null ? Long.valueOf(this.createTime + RecordSupplierInputSource.this.iteratorTimeoutMs.intValue()) : null;
            }

            private void waitNextIteratorIfNecessary() {
                while (!this.closed) {
                    if (this.bytesIterator != null && this.bytesIterator.hasNext()) {
                        return;
                    }
                    while (!this.closed && (this.recordIterator == null || !this.recordIterator.hasNext())) {
                        if (this.terminationTime != null && System.currentTimeMillis() > this.terminationTime.longValue()) {
                            RecordSupplierInputSource.LOG.info("Configured sampler timeout [%s] has been exceeded, returning without a bytesIterator.", new Object[]{RecordSupplierInputSource.this.iteratorTimeoutMs});
                            this.bytesIterator = null;
                            return;
                        }
                        this.recordIterator = RecordSupplierInputSource.this.recordSupplier.poll(100L).iterator();
                    }
                    if (!this.closed) {
                        this.bytesIterator = this.recordIterator.next().getData().iterator();
                    }
                }
            }

            public boolean hasNext() {
                waitNextIteratorIfNecessary();
                return this.bytesIterator != null && this.bytesIterator.hasNext();
            }

            /* renamed from: next, reason: merged with bridge method [inline-methods] */
            public InputEntity m127next() {
                return this.bytesIterator.next();
            }

            public void close() {
                RecordSupplierInputSource.LOG.info("Closing entity iterator.", new Object[0]);
                this.closed = true;
                RecordSupplierInputSource.this.recordSupplier.close();
            }
        };
    }
}
