package org.apache.druid.indexing.seekablestream;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.data.input.ByteBufferInputRowParser;
import org.apache.druid.data.input.FiniteFirehoseFactory;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.segment.indexing.DataSchema;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.class */
public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements SamplerSpec {
    static final long POLL_TIMEOUT_MS = 100;

    @Nullable
    private final DataSchema dataSchema;
    private final InputSourceSampler inputSourceSampler;
    protected final SeekableStreamSupervisorIOConfig ioConfig;

    @Nullable
    protected final SeekableStreamSupervisorTuningConfig tuningConfig;
    protected final SamplerConfig samplerConfig;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec$SeekableStreamSamplerFirehose.class */
    public class SeekableStreamSamplerFirehose implements Firehose {
        private final InputRowParser parser;
        private final CloseableIterator<InputEntity> entityIterator;

        protected SeekableStreamSamplerFirehose(InputRowParser inputRowParser) {
            this.parser = inputRowParser;
            if (inputRowParser instanceof StringInputRowParser) {
                ((StringInputRowParser) inputRowParser).startFileFromBeginning();
            }
            this.entityIterator = new RecordSupplierInputSource(SeekableStreamSamplerSpec.this.ioConfig.getStream(), SeekableStreamSamplerSpec.this.createRecordSupplier(), SeekableStreamSamplerSpec.this.ioConfig.isUseEarliestSequenceNumber()).createEntityIterator();
        }

        @Override // org.apache.druid.data.input.Firehose
        public boolean hasMore() {
            return this.entityIterator.hasNext();
        }

        @Override // org.apache.druid.data.input.Firehose
        public InputRow nextRow() {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.druid.data.input.Firehose
        public InputRowListPlusRawValues nextRowWithRaw() {
            ByteBuffer buffer = ((ByteEntity) this.entityIterator.next()).getBuffer();
            try {
                Map<String, Object> buildStringKeyMap = this.parser instanceof StringInputRowParser ? ((StringInputRowParser) this.parser).buildStringKeyMap(buffer) : null;
                try {
                    List<InputRow> parseBatch = this.parser.parseBatch(buffer);
                    return InputRowListPlusRawValues.of(parseBatch.isEmpty() ? null : parseBatch, buildStringKeyMap);
                } catch (ParseException e) {
                    return InputRowListPlusRawValues.of(buildStringKeyMap, e);
                }
            } catch (ParseException e2) {
                return InputRowListPlusRawValues.of((Map<String, Object>) null, e2);
            }
        }

        @Override // org.apache.druid.data.input.Firehose, java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            this.entityIterator.close();
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec$SeekableStreamSamplerFirehoseFactory.class */
    private class SeekableStreamSamplerFirehoseFactory implements FiniteFirehoseFactory<ByteBufferInputRowParser, Object> {
        private SeekableStreamSamplerFirehoseFactory() {
        }

        @Override // org.apache.druid.data.input.FirehoseFactory
        public Firehose connect(ByteBufferInputRowParser byteBufferInputRowParser, @Nullable File file) {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.druid.data.input.FirehoseFactory
        public Firehose connectForSampler(ByteBufferInputRowParser byteBufferInputRowParser, @Nullable File file) {
            return new SeekableStreamSamplerFirehose(byteBufferInputRowParser);
        }

        @Override // org.apache.druid.data.input.FiniteFirehoseFactory, org.apache.druid.data.input.FirehoseFactory
        public boolean isSplittable() {
            return false;
        }

        @Override // org.apache.druid.data.input.FiniteFirehoseFactory
        public Stream<InputSplit<Object>> getSplits(@Nullable SplitHintSpec splitHintSpec) {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.druid.data.input.FiniteFirehoseFactory
        public int getNumSplits(@Nullable SplitHintSpec splitHintSpec) {
            throw new UnsupportedOperationException();
        }

        @Override // org.apache.druid.data.input.FiniteFirehoseFactory
        public FiniteFirehoseFactory<ByteBufferInputRowParser, Object> withSplit(InputSplit<Object> inputSplit) {
            throw new UnsupportedOperationException();
        }
    }

    public SeekableStreamSamplerSpec(SeekableStreamSupervisorSpec seekableStreamSupervisorSpec, @Nullable SamplerConfig samplerConfig, InputSourceSampler inputSourceSampler) {
        this.dataSchema = ((SeekableStreamSupervisorSpec) Preconditions.checkNotNull(seekableStreamSupervisorSpec, "[spec] is required")).getDataSchema();
        this.ioConfig = (SeekableStreamSupervisorIOConfig) Preconditions.checkNotNull(seekableStreamSupervisorSpec.getIoConfig(), "[spec.ioConfig] is required");
        this.tuningConfig = seekableStreamSupervisorSpec.getTuningConfig();
        this.samplerConfig = samplerConfig == null ? SamplerConfig.empty() : samplerConfig;
        this.inputSourceSampler = inputSourceSampler;
    }

    @Override // org.apache.druid.client.indexing.SamplerSpec
    public SamplerResponse sample() {
        InputSource recordSupplierInputSource;
        InputFormat inputFormat;
        if (this.dataSchema.getParser() != null) {
            recordSupplierInputSource = new FirehoseFactoryToInputSourceAdaptor(new SeekableStreamSamplerFirehoseFactory(), this.dataSchema.getParser());
            inputFormat = null;
        } else {
            recordSupplierInputSource = new RecordSupplierInputSource(this.ioConfig.getStream(), createRecordSupplier(), this.ioConfig.isUseEarliestSequenceNumber());
            inputFormat = (InputFormat) Preconditions.checkNotNull(this.ioConfig.getInputFormat(), "[spec.ioConfig.inputFormat] is required");
        }
        return this.inputSourceSampler.sample(recordSupplierInputSource, inputFormat, this.dataSchema, this.samplerConfig);
    }

    protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> createRecordSupplier();
}
