package org.apache.druid.indexing.seekablestream;

import com.google.common.base.Preconditions;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowPlusRaw;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
import org.apache.druid.indexing.overlord.sampler.SamplerSpec;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.Runnables;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.class */
public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetType> implements SamplerSpec {
    private static final int POLL_TIMEOUT_MS = 100;
    private final DataSchema dataSchema;
    private final FirehoseSampler firehoseSampler;
    protected final SeekableStreamSupervisorIOConfig ioConfig;
    protected final SeekableStreamSupervisorTuningConfig tuningConfig;
    protected final SamplerConfig samplerConfig;

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec$SeekableStreamSamplerFirehose.class */
    protected abstract class SeekableStreamSamplerFirehose implements Firehose {
        private final InputRowParser parser;
        private final RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
        private Iterator<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType>> recordIterator;
        private Iterator<byte[]> recordDataIterator;
        private volatile boolean closed = false;

        protected SeekableStreamSamplerFirehose(InputRowParser inputRowParser) {
            this.parser = inputRowParser;
            if (inputRowParser instanceof StringInputRowParser) {
                ((StringInputRowParser) inputRowParser).startFileFromBeginning();
            }
            this.recordSupplier = getRecordSupplier();
            try {
                assignAndSeek();
            } catch (InterruptedException e) {
                throw new SamplerException(e, "Exception while seeking to partitions", new Object[0]);
            }
        }

        public boolean hasMore() {
            return !this.closed;
        }

        @Nullable
        public InputRow nextRow() {
            InputRowPlusRaw nextRowWithRaw = nextRowWithRaw();
            if (nextRowWithRaw.getParseException() != null) {
                throw nextRowWithRaw.getParseException();
            }
            return nextRowWithRaw.getInputRow();
        }

        public InputRowPlusRaw nextRowWithRaw() {
            if (this.recordDataIterator == null || !this.recordDataIterator.hasNext()) {
                if (this.recordIterator == null || !this.recordIterator.hasNext()) {
                    this.recordIterator = this.recordSupplier.poll(100L).iterator();
                    if (!this.recordIterator.hasNext()) {
                        return InputRowPlusRaw.of((InputRow) null, (byte[]) null);
                    }
                }
                this.recordDataIterator = this.recordIterator.next().getData().iterator();
                if (!this.recordDataIterator.hasNext()) {
                    return InputRowPlusRaw.of((InputRow) null, (byte[]) null);
                }
            }
            byte[] next = this.recordDataIterator.next();
            try {
                List parseBatch = this.parser.parseBatch(ByteBuffer.wrap(next));
                return InputRowPlusRaw.of(parseBatch.isEmpty() ? null : (InputRow) parseBatch.get(0), next);
            } catch (ParseException e) {
                return InputRowPlusRaw.of(next, e);
            }
        }

        public Runnable commit() {
            return Runnables.getNoopRunnable();
        }

        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.recordSupplier.close();
        }

        private void assignAndSeek() throws InterruptedException {
            Set<StreamPartition<PartitionIdType>> set = (Set) this.recordSupplier.getPartitionIds(SeekableStreamSamplerSpec.this.ioConfig.getStream()).stream().map(obj -> {
                return StreamPartition.of(SeekableStreamSamplerSpec.this.ioConfig.getStream(), obj);
            }).collect(Collectors.toSet());
            this.recordSupplier.assign(set);
            if (SeekableStreamSamplerSpec.this.ioConfig.isUseEarliestSequenceNumber()) {
                this.recordSupplier.seekToEarliest(set);
            } else {
                this.recordSupplier.seekToLatest(set);
            }
        }

        protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType> getRecordSupplier();
    }

    public SeekableStreamSamplerSpec(SeekableStreamSupervisorSpec seekableStreamSupervisorSpec, SamplerConfig samplerConfig, FirehoseSampler firehoseSampler) {
        this.dataSchema = ((SeekableStreamSupervisorSpec) Preconditions.checkNotNull(seekableStreamSupervisorSpec, "[spec] is required")).getDataSchema();
        this.ioConfig = (SeekableStreamSupervisorIOConfig) Preconditions.checkNotNull(seekableStreamSupervisorSpec.getIoConfig(), "[spec.ioConfig] is required");
        this.tuningConfig = seekableStreamSupervisorSpec.getTuningConfig();
        this.samplerConfig = samplerConfig;
        this.firehoseSampler = firehoseSampler;
    }

    @Override // org.apache.druid.indexing.overlord.sampler.SamplerSpec
    public SamplerResponse sample() {
        return this.firehoseSampler.sample(new FirehoseFactory() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec.1
            public Firehose connect(InputRowParser inputRowParser, @Nullable File file) {
                return SeekableStreamSamplerSpec.this.getFirehose(inputRowParser);
            }
        }, this.dataSchema, this.samplerConfig);
    }

    protected abstract Firehose getFirehose(InputRowParser inputRowParser);
}
