package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DiscoveryDruidNode;
import org.apache.druid.discovery.LookupNodeService;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.indexer.IngestionState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import org.apache.druid.indexing.common.LockGranularity;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import org.apache.druid.indexing.common.TaskReport;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.CheckPointDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import org.apache.druid.indexing.common.actions.SegmentLockAcquireAction;
import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.RealtimeIndexTask;
import org.apache.druid.indexing.input.InputRowSchemas;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.RealtimeIOConfig;
import org.apache.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.druid.segment.realtime.FireDepartment;
import org.apache.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.druid.server.security.Access;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.DateTime;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.class */
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity> implements ChatHandler {
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskRunner.class);
    static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
    static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions";
    private final Map<PartitionIdType, SequenceOffsetType> endOffsets;
    private final SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> task;
    private final SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig;
    private final SeekableStreamIndexTaskTuningConfig tuningConfig;
    private final InputRowSchema inputRowSchema;

    @Nullable
    private final InputFormat inputFormat;

    @Nullable
    private final InputRowParser<ByteBuffer> parser;
    private final String stream;
    private final LockGranularity lockGranularityToUse;
    private RowIngestionMeters rowIngestionMeters;
    private ParseExceptionHandler parseExceptionHandler;
    private FireDepartmentMetrics fireDepartmentMetrics;
    private AuthorizerMapper authorizerMapper;
    private volatile DateTime startTime;
    private volatile TaskToolbox toolbox;
    private volatile Thread runThread;
    private volatile Appenderator appenderator;
    private volatile StreamAppenderatorDriver driver;
    private volatile IngestionState ingestionState;
    private volatile long nextCheckpointTime;
    private volatile CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> sequences;
    private volatile Throwable backgroundThreadException;
    private final Map<PartitionIdType, SequenceOffsetType> lastReadOffsets = new HashMap();
    private final ConcurrentMap<PartitionIdType, SequenceOffsetType> currOffsets = new ConcurrentHashMap();
    private final ConcurrentMap<PartitionIdType, SequenceOffsetType> lastPersistedOffsets = new ConcurrentHashMap();
    private final Lock pauseLock = new ReentrantLock();
    private final Condition hasPaused = this.pauseLock.newCondition();
    private final Condition shouldResume = this.pauseLock.newCondition();
    protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
    private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
    private final Object statusLock = new Object();
    protected final Lock pollRetryLock = new ReentrantLock();
    protected final Condition isAwaitingRetry = this.pollRetryLock.newCondition();
    private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
    private final List<ListenableFuture<SegmentsAndCommitMetadata>> publishWaitList = new ArrayList();
    private final List<ListenableFuture<SegmentsAndCommitMetadata>> handOffWaitList = new ArrayList();
    private volatile Status status = Status.NOT_STARTED;
    protected volatile boolean pauseRequested = false;

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner$Status.class */
    public enum Status {
        NOT_STARTED,
        STARTING,
        READING,
        PAUSED,
        PUBLISHING
    }

    public SeekableStreamIndexTaskRunner(SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType, RecordType> seekableStreamIndexTask, @Nullable InputRowParser<ByteBuffer> inputRowParser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularity) {
        Preconditions.checkNotNull(seekableStreamIndexTask);
        this.task = seekableStreamIndexTask;
        this.ioConfig = seekableStreamIndexTask.getIOConfig();
        this.tuningConfig = seekableStreamIndexTask.getTuningConfig();
        this.inputRowSchema = InputRowSchemas.fromDataSchema(seekableStreamIndexTask.getDataSchema());
        this.inputFormat = this.ioConfig.getInputFormat();
        this.parser = inputRowParser;
        this.authorizerMapper = authorizerMapper;
        this.stream = this.ioConfig.getStartSequenceNumbers().getStream();
        this.endOffsets = new ConcurrentHashMap(this.ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
        this.sequences = new CopyOnWriteArrayList<>();
        this.ingestionState = IngestionState.NOT_STARTED;
        this.lockGranularityToUse = lockGranularity;
        resetNextCheckpointTime();
    }

    public TaskStatus run(TaskToolbox taskToolbox) {
        try {
            return runInternal(taskToolbox);
        } catch (Exception e) {
            log.error(e, "Encountered exception while running task.", new Object[0]);
            String stackTraceAsString = Throwables.getStackTraceAsString(e);
            taskToolbox.getTaskReportFileWriter().write(this.task.getId(), getTaskCompletionReports(stackTraceAsString, 0L));
            return TaskStatus.failure(this.task.getId(), stackTraceAsString);
        }
    }

    private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(Map<PartitionIdType, SequenceOffsetType> map) {
        return map.equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()) ? this.ioConfig.getStartSequenceNumbers().getExclusivePartitions() : isEndOffsetExclusive() ? Collections.emptySet() : map.keySet();
    }

    @VisibleForTesting
    public void setToolbox(TaskToolbox taskToolbox) {
        this.toolbox = taskToolbox;
    }

    @VisibleForTesting
    public void initializeSequences() throws IOException {
        Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> entry;
        if (!restoreSequences()) {
            TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkPointsFromContext = getCheckPointsFromContext(this.toolbox, (String) this.task.getContextValue(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY));
            if (checkPointsFromContext != null) {
                Iterator<Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>>> it = checkPointsFromContext.entrySet().iterator();
                Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> next = it.next();
                while (true) {
                    entry = next;
                    if (!it.hasNext()) {
                        break;
                    }
                    Map.Entry<Integer, Map<PartitionIdType, SequenceOffsetType>> next2 = it.next();
                    addSequence(new SequenceMetadata<>(entry.getKey().intValue(), StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), entry.getKey()}), entry.getValue(), next2.getValue(), true, computeExclusiveStartPartitionsForSequence(entry.getValue())));
                    next = next2;
                }
                addSequence(new SequenceMetadata<>(entry.getKey().intValue(), StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), entry.getKey()}), entry.getValue(), this.endOffsets, false, computeExclusiveStartPartitionsForSequence(entry.getValue())));
            } else {
                addSequence(new SequenceMetadata<>(0, StringUtils.format("%s_%s", new Object[]{this.ioConfig.getBaseSequenceName(), 0}), this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap(), this.endOffsets, false, this.ioConfig.getStartSequenceNumbers().getExclusivePartitions()));
            }
        }
        log.info("Starting with sequences: %s", new Object[]{this.sequences});
    }

    /* JADX WARN: Failed to calculate best type for var: r22v3 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r22v3 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryInsertCasts(FixTypesVisitor.java:363)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r22v3 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r23v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r23v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryInsertCasts(FixTypesVisitor.java:363)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r23v2 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 22, insn: 0x09f0: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r22 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:266:0x09f0 */
    /* JADX WARN: Not initialized variable reg: 23, insn: 0x09f5: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r23 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:268:0x09f5 */
    /* JADX WARN: Type inference failed for: r12v0, types: [org.apache.druid.segment.realtime.firehose.ChatHandler, org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends org.apache.druid.data.input.impl.ByteEntity>, org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner] */
    /* JADX WARN: Type inference failed for: r22v3, types: [org.apache.druid.indexing.seekablestream.common.RecordSupplier] */
    /* JADX WARN: Type inference failed for: r23v2, types: [java.lang.Throwable] */
    private TaskStatus runInternal(TaskToolbox taskToolbox) throws Exception {
        ?? r22;
        ?? r23;
        RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier;
        Throwable th;
        this.startTime = DateTimes.nowUtc();
        this.status = Status.STARTING;
        setToolbox(taskToolbox);
        this.authorizerMapper = taskToolbox.getAuthorizerMapper();
        this.rowIngestionMeters = taskToolbox.getRowIngestionMetersFactory().createRowIngestionMeters();
        this.parseExceptionHandler = new ParseExceptionHandler(this.rowIngestionMeters, this.tuningConfig.isLogParseExceptions(), this.tuningConfig.getMaxParseExceptions(), this.tuningConfig.getMaxSavedParseExceptions());
        StreamChunkParser streamChunkParser = new StreamChunkParser(this.parser, this.inputFormat, this.inputRowSchema, this.task.getDataSchema().getTransformSpec(), taskToolbox.getIndexingTmpDir(), inputRow -> {
            return inputRow != null && this.task.withinMinMaxRecordTime(inputRow);
        }, this.rowIngestionMeters, this.parseExceptionHandler);
        initializeSequences();
        log.debug("Found chat handler of class[%s]", new Object[]{taskToolbox.getChatHandlerProvider().getClass().getName()});
        taskToolbox.getChatHandlerProvider().register(this.task.getId(), (ChatHandler) this, false);
        this.runThread = Thread.currentThread();
        FireDepartment fireDepartment = new FireDepartment(this.task.getDataSchema(), new RealtimeIOConfig((FirehoseFactory) null, (PlumberSchool) null), (RealtimeTuningConfig) null);
        this.fireDepartmentMetrics = fireDepartment.getMetrics();
        taskToolbox.addMonitor(TaskRealtimeMetricsMonitorBuilder.build(this.task, fireDepartment, this.rowIngestionMeters));
        String str = (String) this.task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
        LookupNodeService lookupNodeService = str == null ? taskToolbox.getLookupNodeService() : new LookupNodeService(str);
        DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(taskToolbox.getDruidNode(), NodeRole.PEON, ImmutableMap.of(taskToolbox.getDataNodeService().getName(), taskToolbox.getDataNodeService(), lookupNodeService.getName(), lookupNodeService));
        Exception exc = null;
        long j = 0;
        try {
            try {
                try {
                    newTaskRecordSupplier = this.task.newTaskRecordSupplier();
                    th = null;
                    if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                        taskToolbox.getDataSegmentServerAnnouncer().announce();
                        taskToolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
                    }
                    this.appenderator = this.task.newAppenderator(taskToolbox, this.fireDepartmentMetrics, this.rowIngestionMeters, this.parseExceptionHandler);
                    this.driver = this.task.newDriver(this.appenderator, taskToolbox, this.fireDepartmentMetrics);
                    Object startJob = this.driver.startJob(segmentIdWithShardSpec -> {
                        try {
                            if (this.lockGranularityToUse == LockGranularity.SEGMENT) {
                                return ((LockResult) taskToolbox.getTaskActionClient().submit(new SegmentLockAcquireAction(TaskLockType.EXCLUSIVE, segmentIdWithShardSpec.getInterval(), segmentIdWithShardSpec.getVersion(), segmentIdWithShardSpec.getShardSpec().getPartitionNum(), 1000L))).isOk();
                            }
                            TaskLock taskLock = (TaskLock) taskToolbox.getTaskActionClient().submit(new TimeChunkLockAcquireAction(TaskLockType.EXCLUSIVE, segmentIdWithShardSpec.getInterval(), 1000L));
                            if (taskLock == null) {
                                return false;
                            }
                            if (taskLock.isRevoked()) {
                                throw new ISE(StringUtils.format("Lock for interval [%s] was revoked.", new Object[]{segmentIdWithShardSpec.getInterval()}), new Object[0]);
                            }
                            return true;
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    if (startJob == null) {
                        Preconditions.checkState(this.sequences.get(0).startOffsets.entrySet().stream().allMatch(entry -> {
                            return createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(entry.getKey()))) >= 0;
                        }), "Sequence sequences are not compatible with start sequences of task");
                        this.currOffsets.putAll(this.sequences.get(0).startOffsets);
                    } else {
                        SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata = deserializePartitionsFromMetadata(taskToolbox.getJsonMapper(), ((Map) startJob).get(METADATA_NEXT_PARTITIONS));
                        this.currOffsets.putAll(deserializePartitionsFromMetadata.getPartitionSequenceNumberMap());
                        if (!deserializePartitionsFromMetadata.getStream().equals(this.ioConfig.getStartSequenceNumbers().getStream())) {
                            throw new ISE("Restored stream[%s] but expected stream[%s]", new Object[]{deserializePartitionsFromMetadata.getStream(), this.ioConfig.getStartSequenceNumbers().getStream()});
                        }
                        if (!this.currOffsets.keySet().equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) {
                            throw new ISE("Restored partitions[%s] but expected partitions[%s]", new Object[]{this.currOffsets.keySet(), this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()});
                        }
                        if (this.sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
                            this.endOffsets.putAll(this.sequences.size() == 0 ? this.currOffsets : getLastSequenceMetadata().getEndOffsets());
                        }
                    }
                    log.info("Initialized sequences: %s", new Object[]{this.sequences.stream().map((v0) -> {
                        return v0.toString();
                    }).collect(Collectors.joining(", "))});
                    int size = this.currOffsets.size();
                    if (this.currOffsets.entrySet().removeIf(entry2 -> {
                        return isEndOfShard(entry2.getValue());
                    })) {
                        log.info("Removed [%d] partitions from assignment which have already been closed.", new Object[]{Integer.valueOf(size - this.currOffsets.size())});
                    }
                    if (!isEndOffsetExclusive()) {
                        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry3 : this.currOffsets.entrySet()) {
                            if (!entry3.getValue().equals(this.ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(entry3.getKey())) || this.ioConfig.getStartSequenceNumbers().getExclusivePartitions().contains(entry3.getKey())) {
                                this.lastReadOffsets.put(entry3.getKey(), entry3.getValue());
                            }
                        }
                    }
                    Supplier supplier = () -> {
                        final Map<? extends PartitionIdType, ? extends SequenceOffsetType> copyOf = ImmutableMap.copyOf(this.currOffsets);
                        this.lastPersistedOffsets.clear();
                        this.lastPersistedOffsets.putAll(copyOf);
                        return new Committer() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.1
                            public Object getMetadata() {
                                return ImmutableMap.of(SeekableStreamIndexTaskRunner.METADATA_NEXT_PARTITIONS, new SeekableStreamEndSequenceNumbers(SeekableStreamIndexTaskRunner.this.stream, copyOf));
                            }

                            public void run() {
                            }
                        };
                    };
                    maybePersistAndPublishSequences(supplier);
                    Set<StreamPartition<PartitionIdType>> assignPartitions = assignPartitions(newTaskRecordSupplier);
                    possiblyResetDataSourceMetadata(taskToolbox, newTaskRecordSupplier, assignPartitions);
                    seekToStartingSequence(newTaskRecordSupplier, assignPartitions);
                    this.ingestionState = IngestionState.BUILD_SEGMENTS;
                    boolean z = !assignPartitions.isEmpty();
                    this.status = Status.READING;
                    Throwable th2 = null;
                    while (z) {
                        try {
                            try {
                                if (possiblyPause()) {
                                    assignPartitions = assignPartitions(newTaskRecordSupplier);
                                    possiblyResetDataSourceMetadata(taskToolbox, newTaskRecordSupplier, assignPartitions);
                                    if (assignPartitions.isEmpty()) {
                                        log.debug("All partitions have been fully read.", new Object[0]);
                                        this.publishOnStop.set(true);
                                        this.stopRequested.set(true);
                                    }
                                }
                                if (this.stopRequested.get() || this.sequences.size() == 0 || getLastSequenceMetadata().isCheckpointed()) {
                                    this.status = Status.PUBLISHING;
                                }
                                if (this.stopRequested.get()) {
                                    break;
                                }
                                if (this.backgroundThreadException != null) {
                                    throw new RuntimeException(this.backgroundThreadException);
                                }
                                checkPublishAndHandoffFailure();
                                maybePersistAndPublishSequences(supplier);
                                List<OrderedPartitionableRecord> records = getRecords(newTaskRecordSupplier, taskToolbox);
                                z = !assignPartitions.isEmpty();
                                SequenceMetadata sequenceMetadata = null;
                                for (OrderedPartitionableRecord orderedPartitionableRecord : records) {
                                    boolean verifyRecordInRange = verifyRecordInRange(orderedPartitionableRecord.getPartitionId(), orderedPartitionableRecord.getSequenceNumber());
                                    log.trace("Got stream[%s] partition[%s] sequenceNumber[%s], shouldProcess[%s].", new Object[]{orderedPartitionableRecord.getStream(), orderedPartitionableRecord.getPartitionId(), orderedPartitionableRecord.getSequenceNumber(), Boolean.valueOf(verifyRecordInRange)});
                                    if (verifyRecordInRange) {
                                        List<InputRow> parse = streamChunkParser.parse(orderedPartitionableRecord.getData(), isEndOfShard(orderedPartitionableRecord.getSequenceNumber()));
                                        boolean z2 = false;
                                        SequenceMetadata sequenceMetadata2 = (SequenceMetadata) this.sequences.stream().filter(sequenceMetadata3 -> {
                                            return sequenceMetadata3.canHandle(this, orderedPartitionableRecord);
                                        }).findFirst().orElse(null);
                                        if (sequenceMetadata2 == null) {
                                            throw new ISE("Cannot find any valid sequence for record with partition [%s] and sequenceNumber [%s]. Current sequences: %s", new Object[]{orderedPartitionableRecord.getPartitionId(), orderedPartitionableRecord.getSequenceNumber(), this.sequences});
                                        }
                                        for (InputRow inputRow2 : parse) {
                                            AppenderatorDriverAddResult add = this.driver.add(inputRow2, sequenceMetadata2.getSequenceName(), supplier, true, false);
                                            if (!add.isOk()) {
                                                throw new ISE("Could not allocate segment for row with timestamp[%s]", new Object[]{inputRow2.getTimestamp()});
                                            }
                                            if (add.isPushRequired(this.tuningConfig.m121getPartitionsSpec().getMaxRowsPerSegment(), Long.valueOf(this.tuningConfig.m121getPartitionsSpec().getMaxTotalRowsOr(20000000L))) && !sequenceMetadata2.isCheckpointed()) {
                                                sequenceMetadata = sequenceMetadata2;
                                            }
                                            z2 |= add.isPersistRequired();
                                        }
                                        if (z2) {
                                            Futures.addCallback(this.driver.persistAsync((Committer) supplier.get()), new FutureCallback<Object>() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.2
                                                public void onSuccess(@Nullable Object obj) {
                                                    SeekableStreamIndexTaskRunner.log.debug("Persist completed with metadata: %s", new Object[]{obj});
                                                }

                                                public void onFailure(Throwable th3) {
                                                    SeekableStreamIndexTaskRunner.log.error("Persist failed, dying", new Object[0]);
                                                    SeekableStreamIndexTaskRunner.this.backgroundThreadException = th3;
                                                }
                                            });
                                        }
                                        this.lastReadOffsets.put(orderedPartitionableRecord.getPartitionId(), orderedPartitionableRecord.getSequenceNumber());
                                        this.currOffsets.put(orderedPartitionableRecord.getPartitionId(), getNextStartOffset(orderedPartitionableRecord.getSequenceNumber()));
                                    }
                                    if (!isMoreToReadAfterReadingRecord(orderedPartitionableRecord.getSequenceNumber(), this.endOffsets.get(orderedPartitionableRecord.getPartitionId())) && assignPartitions.remove(orderedPartitionableRecord.getStreamPartition())) {
                                        log.info("Finished reading stream[%s], partition[%s].", new Object[]{orderedPartitionableRecord.getStream(), orderedPartitionableRecord.getPartitionId()});
                                        newTaskRecordSupplier.assign(assignPartitions);
                                        z = !assignPartitions.isEmpty();
                                    }
                                }
                                if (!z) {
                                    this.fireDepartmentMetrics.markProcessingDone();
                                }
                                if (System.currentTimeMillis() > this.nextCheckpointTime) {
                                    sequenceMetadata = getLastSequenceMetadata();
                                }
                                if (sequenceMetadata != null && z) {
                                    Preconditions.checkArgument(getLastSequenceMetadata().getSequenceName().equals(sequenceMetadata.getSequenceName()), "Cannot checkpoint a sequence [%s] which is not the latest one, sequences %s", new Object[]{sequenceMetadata, this.sequences});
                                    requestPause();
                                    if (!((Boolean) taskToolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(this.task.getDataSource(), this.ioConfig.getTaskGroupId(), null, createDataSourceMetadata(new SeekableStreamStartSequenceNumbers(this.stream, sequenceMetadata.getStartOffsets(), sequenceMetadata.getExclusiveStartPartitions()))))).booleanValue()) {
                                        throw new ISE("Checkpoint request with sequences [%s] failed, dying", new Object[]{this.currOffsets});
                                    }
                                }
                            } catch (Throwable th3) {
                                try {
                                    this.driver.persist((Committer) supplier.get());
                                } catch (Exception e) {
                                    if (0 == 0) {
                                        throw e;
                                    }
                                    th2.addSuppressed(e);
                                }
                                throw th3;
                            }
                        } catch (Exception e2) {
                            log.error(e2, "Encountered exception in run() before persisting.", new Object[0]);
                            throw e2;
                        }
                    }
                    this.ingestionState = IngestionState.COMPLETED;
                    try {
                        this.driver.persist((Committer) supplier.get());
                    } catch (Exception e3) {
                        if (0 == 0) {
                            throw e3;
                        }
                        th2.addSuppressed(e3);
                    }
                    synchronized (this.statusLock) {
                        if (this.stopRequested.get() && !this.publishOnStop.get()) {
                            throw new InterruptedException("Stopping without publishing");
                        }
                        this.status = Status.PUBLISHING;
                    }
                    ArrayList arrayList = new ArrayList(this.sequences);
                    int i = 0;
                    while (i < arrayList.size()) {
                        SequenceMetadata sequenceMetadata4 = (SequenceMetadata) arrayList.get(i);
                        if (!this.publishingSequences.contains(sequenceMetadata4.getSequenceName())) {
                            if (i == arrayList.size() - 1) {
                                sequenceMetadata4.setEndOffsets(this.currOffsets);
                            }
                            sequenceMetadata4.updateAssignments(this.currOffsets, this::isMoreToReadAfterReadingRecord);
                            this.publishingSequences.add(sequenceMetadata4.getSequenceName());
                            publishAndRegisterHandoff(sequenceMetadata4);
                        }
                        i++;
                    }
                } catch (Throwable th4) {
                    if (r22 != 0) {
                        if (r23 != 0) {
                            try {
                                r22.close();
                            } catch (Throwable th5) {
                                r23.addSuppressed(th5);
                            }
                        } else {
                            r22.close();
                        }
                    }
                    throw th4;
                }
            } catch (InterruptedException | RejectedExecutionException e4) {
                exc = e4;
                try {
                    Futures.allAsList(this.publishWaitList).cancel(true);
                    Futures.allAsList(this.handOffWaitList).cancel(true);
                    if (this.appenderator != null) {
                        this.appenderator.closeNow();
                    }
                } catch (Exception e5) {
                    e4.addSuppressed(e5);
                }
                if ((e4 instanceof RejectedExecutionException) && (e4.getCause() == null || !(e4.getCause() instanceof InterruptedException))) {
                    throw e4;
                }
                if (!this.stopRequested.get()) {
                    Thread.currentThread().interrupt();
                    throw e4;
                }
                try {
                    if (this.driver != null) {
                        this.driver.close();
                    }
                    taskToolbox.getChatHandlerProvider().unregister(this.task.getId());
                    if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                        taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                        taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                    }
                } catch (Throwable th6) {
                    if (exc == null) {
                        throw th6;
                    }
                    exc.addSuppressed(th6);
                }
            } catch (Exception e6) {
                exc = e6;
                try {
                    Futures.allAsList(this.publishWaitList).cancel(true);
                    Futures.allAsList(this.handOffWaitList).cancel(true);
                    if (this.appenderator != null) {
                        this.appenderator.closeNow();
                    }
                } catch (Exception e7) {
                    e6.addSuppressed(e7);
                }
                throw e6;
            }
            if (this.backgroundThreadException != null) {
                throw new RuntimeException(this.backgroundThreadException);
            }
            Futures.allAsList(this.publishWaitList).get();
            List emptyList = Collections.emptyList();
            if (this.tuningConfig.getHandoffConditionTimeout() == 0) {
                emptyList = (List) Futures.allAsList(this.handOffWaitList).get();
            } else {
                long nanoTime = System.nanoTime();
                try {
                    try {
                        emptyList = (List) Futures.allAsList(this.handOffWaitList).get(this.tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
                        j = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                    } catch (TimeoutException e8) {
                        log.makeAlert("Timeout waiting for handoff", new Object[0]).addData("taskId", this.task.getId()).addData("handoffConditionTimeout", Long.valueOf(this.tuningConfig.getHandoffConditionTimeout())).emit();
                        j = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                    }
                } catch (Throwable th7) {
                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime);
                    throw th7;
                }
            }
            Iterator it = emptyList.iterator();
            while (it.hasNext()) {
                log.info("Handoff complete for segments: %s", new Object[]{String.join(", ", Lists.transform(((SegmentsAndCommitMetadata) it.next()).getSegments(), (v0) -> {
                    return v0.toString();
                }))});
            }
            this.appenderator.close();
            if (newTaskRecordSupplier != null) {
                if (0 != 0) {
                    try {
                        newTaskRecordSupplier.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    newTaskRecordSupplier.close();
                }
            }
            try {
                if (this.driver != null) {
                    this.driver.close();
                }
                taskToolbox.getChatHandlerProvider().unregister(this.task.getId());
                if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                }
            } catch (Throwable th9) {
                if (0 == 0) {
                    throw th9;
                }
                exc.addSuppressed(th9);
            }
            taskToolbox.getTaskReportFileWriter().write(this.task.getId(), getTaskCompletionReports(null, j));
            return TaskStatus.success(this.task.getId());
        } catch (Throwable th10) {
            try {
                if (this.driver != null) {
                    this.driver.close();
                }
                taskToolbox.getChatHandlerProvider().unregister(this.task.getId());
                if (taskToolbox.getAppenderatorsManager().shouldTaskMakeNodeAnnouncements()) {
                    taskToolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
                    taskToolbox.getDataSegmentServerAnnouncer().unannounce();
                }
            } catch (Throwable th11) {
                if (exc == null) {
                    throw th11;
                }
                exc.addSuppressed(th11);
            }
            throw th10;
        }
    }

    private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException {
        List list = (List) this.publishWaitList.stream().filter((v0) -> {
            return v0.isDone();
        }).collect(Collectors.toList());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            ((ListenableFuture) it.next()).get();
        }
        this.publishWaitList.removeAll(list);
        List list2 = (List) this.handOffWaitList.stream().filter((v0) -> {
            return v0.isDone();
        }).collect(Collectors.toList());
        Iterator it2 = list2.iterator();
        while (it2.hasNext()) {
            ((ListenableFuture) it2.next()).get();
        }
        this.handOffWaitList.removeAll(list2);
    }

    private void publishAndRegisterHandoff(final SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata) {
        log.debug("Publishing segments for sequence [%s].", new Object[]{sequenceMetadata});
        ListenableFuture<SegmentsAndCommitMetadata> transform = Futures.transform(this.driver.publish(sequenceMetadata.createPublisher(this, this.toolbox, this.ioConfig.isUseTransaction()), (Committer) sequenceMetadata.getCommitterSupplier(this, this.stream, this.lastPersistedOffsets).get(), Collections.singletonList(sequenceMetadata.getSequenceName())), segmentsAndCommitMetadata -> {
            if (segmentsAndCommitMetadata == null) {
                throw new ISE("Transaction failure publishing segments for sequence [%s]", new Object[]{sequenceMetadata});
            }
            return segmentsAndCommitMetadata;
        });
        this.publishWaitList.add(transform);
        final ListenableFuture<SegmentsAndCommitMetadata> create = SettableFuture.create();
        this.handOffWaitList.add(create);
        Futures.addCallback(transform, new FutureCallback<SegmentsAndCommitMetadata>() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.3
            public void onSuccess(final SegmentsAndCommitMetadata segmentsAndCommitMetadata2) {
                SeekableStreamIndexTaskRunner.log.info("Published %s segments for sequence [%s] with metadata [%s].", new Object[]{Integer.valueOf(segmentsAndCommitMetadata2.getSegments().size()), sequenceMetadata.getSequenceName(), Preconditions.checkNotNull(segmentsAndCommitMetadata2.getCommitMetadata(), "commitMetadata")});
                SeekableStreamIndexTaskRunner.log.infoSegments(segmentsAndCommitMetadata2.getSegments(), "Published segments");
                SeekableStreamIndexTaskRunner.this.sequences.remove(sequenceMetadata);
                SeekableStreamIndexTaskRunner.this.publishingSequences.remove(sequenceMetadata.getSequenceName());
                try {
                    SeekableStreamIndexTaskRunner.this.persistSequences();
                    Futures.transform(SeekableStreamIndexTaskRunner.this.driver.registerHandoff(segmentsAndCommitMetadata2), new Function<SegmentsAndCommitMetadata, Void>() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.3.1
                        @Nullable
                        public Void apply(@Nullable SegmentsAndCommitMetadata segmentsAndCommitMetadata3) {
                            if (segmentsAndCommitMetadata3 == null) {
                                SeekableStreamIndexTaskRunner.log.warn("Failed to hand off %s segments", new Object[]{Integer.valueOf(segmentsAndCommitMetadata2.getSegments().size())});
                                SeekableStreamIndexTaskRunner.log.warnSegments(segmentsAndCommitMetadata2.getSegments(), "Failed to hand off segments");
                            }
                            create.set(segmentsAndCommitMetadata3);
                            return null;
                        }
                    });
                } catch (IOException e) {
                    SeekableStreamIndexTaskRunner.log.error(e, "Unable to persist state, dying", new Object[0]);
                    create.setException(e);
                    throw new RuntimeException(e);
                }
            }

            public void onFailure(Throwable th) {
                SeekableStreamIndexTaskRunner.log.error(th, "Error while publishing segments for sequenceNumber[%s]", new Object[]{sequenceMetadata});
                create.setException(th);
            }
        });
    }

    private static File getSequencesPersistFile(TaskToolbox taskToolbox) {
        return new File(taskToolbox.getPersistDir(), "sequences.json");
    }

    private boolean restoreSequences() throws IOException {
        File sequencesPersistFile = getSequencesPersistFile(this.toolbox);
        if (!sequencesPersistFile.exists()) {
            return false;
        }
        this.sequences = new CopyOnWriteArrayList<>((Collection) this.toolbox.getJsonMapper().readValue(sequencesPersistFile, getSequenceMetadataTypeReference()));
        return true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void persistSequences() throws IOException {
        this.toolbox.getJsonMapper().writerFor(getSequenceMetadataTypeReference()).writeValue(getSequencesPersistFile(this.toolbox), this.sequences);
        log.info("Saved sequence metadata to disk: %s", new Object[]{this.sequences});
    }

    private Map<String, TaskReport> getTaskCompletionReports(@Nullable String str, long j) {
        TaskReport[] taskReportArr = new TaskReport[1];
        taskReportArr[0] = new IngestionStatsAndErrorsTaskReport(this.task.getId(), new IngestionStatsAndErrorsTaskReportData(this.ingestionState, getTaskCompletionUnparseableEvents(), getTaskCompletionRowStats(), str, str == null, j));
        return TaskReport.buildTaskReports(taskReportArr);
    }

    private Map<String, Object> getTaskCompletionUnparseableEvents() {
        HashMap hashMap = new HashMap();
        List<ParseExceptionReport> reportListFromSavedParseExceptions = IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports());
        if (reportListFromSavedParseExceptions != null) {
            hashMap.put("buildSegments", reportListFromSavedParseExceptions);
        }
        return hashMap;
    }

    private Map<String, Object> getTaskCompletionRowStats() {
        HashMap hashMap = new HashMap();
        hashMap.put("buildSegments", this.rowIngestionMeters.getTotals());
        return hashMap;
    }

    private void maybePersistAndPublishSequences(Supplier<Committer> supplier) throws InterruptedException {
        Iterator<SequenceMetadata<PartitionIdType, SequenceOffsetType>> it = this.sequences.iterator();
        while (it.hasNext()) {
            SequenceMetadata<PartitionIdType, SequenceOffsetType> next = it.next();
            next.updateAssignments(this.currOffsets, this::isMoreToReadBeforeReadingRecord);
            if (!next.isOpen() && !this.publishingSequences.contains(next.getSequenceName())) {
                this.publishingSequences.add(next.getSequenceName());
                try {
                    log.debug("Persist completed with metadata [%s], adding sequence [%s] to publish queue.", new Object[]{this.driver.persist((Committer) supplier.get()), next.getSequenceName()});
                    publishAndRegisterHandoff(next);
                } catch (InterruptedException e) {
                    log.warn("Interrupted while persisting metadata for sequence [%s].", new Object[]{next.getSequenceName()});
                    throw e;
                }
            }
        }
    }

    private Set<StreamPartition<PartitionIdType>> assignPartitions(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier) {
        HashSet hashSet = new HashSet();
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : this.currOffsets.entrySet()) {
            PartitionIdType key = entry.getKey();
            SequenceOffsetType value = entry.getValue();
            SequenceOffsetType sequenceoffsettype = this.endOffsets.get(key);
            if (isRecordAlreadyRead(key, sequenceoffsettype) || !isMoreToReadBeforeReadingRecord(value, sequenceoffsettype)) {
                log.info("Finished reading partition[%s], up to[%s].", new Object[]{key, value});
            } else {
                log.info("Adding partition[%s], start[%s] -> end[%s] to assignment.", new Object[]{key, value, sequenceoffsettype});
                hashSet.add(StreamPartition.of(this.stream, key));
            }
        }
        recordSupplier.assign(hashSet);
        return hashSet;
    }

    private void addSequence(SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata) {
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : sequenceMetadata.getStartOffsets().entrySet()) {
            PartitionIdType key = entry.getKey();
            SequenceOffsetType value = entry.getValue();
            if (!this.sequences.isEmpty()) {
                SequenceOffsetType sequenceoffsettype = getLastSequenceMetadata().endOffsets.get(key);
                if (!value.equals(sequenceoffsettype)) {
                    throw new ISE("New sequence startOffset[%s] does not equal expected prior offset[%s]", new Object[]{value, sequenceoffsettype});
                }
            }
        }
        if (!isEndOffsetExclusive() && !this.sequences.isEmpty()) {
            SequenceMetadata<PartitionIdType, SequenceOffsetType> lastSequenceMetadata = getLastSequenceMetadata();
            if (!lastSequenceMetadata.endOffsets.keySet().equals(sequenceMetadata.getExclusiveStartPartitions())) {
                throw new ISE("Exclusive start partitions[%s] for new sequence don't match to the prior offset[%s]", new Object[]{sequenceMetadata.getExclusiveStartPartitions(), lastSequenceMetadata});
            }
        }
        this.sequences.add(sequenceMetadata);
    }

    private SequenceMetadata<PartitionIdType, SequenceOffsetType> getLastSequenceMetadata() {
        Preconditions.checkState(!this.sequences.isEmpty(), "Empty sequences");
        return this.sequences.get(this.sequences.size() - 1);
    }

    private boolean isRecordAlreadyRead(PartitionIdType partitionidtype, SequenceOffsetType sequenceoffsettype) {
        SequenceOffsetType sequenceoffsettype2 = this.lastReadOffsets.get(partitionidtype);
        return sequenceoffsettype2 != null && createSequenceNumber(sequenceoffsettype).compareTo(createSequenceNumber(sequenceoffsettype2)) <= 0;
    }

    private boolean isMoreToReadBeforeReadingRecord(SequenceOffsetType sequenceoffsettype, SequenceOffsetType sequenceoffsettype2) {
        int compareTo = createSequenceNumber(sequenceoffsettype).compareTo(createSequenceNumber(sequenceoffsettype2));
        return isEndOffsetExclusive() ? compareTo < 0 : compareTo <= 0;
    }

    private boolean isMoreToReadAfterReadingRecord(SequenceOffsetType sequenceoffsettype, SequenceOffsetType sequenceoffsettype2) {
        return createSequenceNumber(getNextStartOffset(sequenceoffsettype)).compareTo(createSequenceNumber(sequenceoffsettype2)) < 0;
    }

    private void seekToStartingSequence(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, Set<StreamPartition<PartitionIdType>> set) throws InterruptedException {
        for (StreamPartition<PartitionIdType> streamPartition : set) {
            SequenceOffsetType sequenceoffsettype = this.currOffsets.get(streamPartition.getPartitionId());
            log.info("Seeking partition[%s] to[%s].", new Object[]{streamPartition.getPartitionId(), sequenceoffsettype});
            recordSupplier.seek(streamPartition, sequenceoffsettype);
        }
    }

    private boolean possiblyPause() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            if (!this.pauseRequested) {
                return false;
            }
            this.status = Status.PAUSED;
            this.hasPaused.signalAll();
            log.debug("Received pause command, pausing ingestion until resumed.", new Object[0]);
            while (this.pauseRequested) {
                this.shouldResume.await();
            }
            this.status = Status.READING;
            this.shouldResume.signalAll();
            log.debug("Received resume command, resuming ingestion.", new Object[0]);
            return true;
        } finally {
            this.pauseLock.unlock();
        }
    }

    private boolean isPaused() {
        return this.status == Status.PAUSED;
    }

    private void requestPause() {
        this.pauseRequested = true;
    }

    protected void sendResetRequestAndWait(Map<StreamPartition<PartitionIdType>, SequenceOffsetType> map, TaskToolbox taskToolbox) throws IOException {
        Map mapKeys = CollectionUtils.mapKeys(map, (v0) -> {
            return v0.getPartitionId();
        });
        if (!((Boolean) taskToolbox.getTaskActionClient().submit(new ResetDataSourceMetadataAction(this.task.getDataSource(), createDataSourceMetadata(new SeekableStreamEndSequenceNumbers(this.ioConfig.getStartSequenceNumbers().getStream(), mapKeys))))).booleanValue()) {
            log.makeAlert("Failed to send offset reset request", new Object[0]).addData("task", this.task.getId()).addData("dataSource", this.task.getDataSource()).addData("partitions", ImmutableSet.copyOf(mapKeys.keySet())).emit();
        } else {
            log.makeAlert("Offsets were reset automatically, potential data duplication or loss", new Object[0]).addData("task", this.task.getId()).addData("dataSource", this.task.getDataSource()).addData("partitions", mapKeys.keySet()).emit();
            requestPause();
        }
    }

    private Access authorizationCheck(HttpServletRequest httpServletRequest, Action action) {
        return IndexTaskUtils.datasourceAuthorizationCheck(httpServletRequest, action, this.task.getDataSource(), this.authorizerMapper);
    }

    public Appenderator getAppenderator() {
        return this.appenderator;
    }

    @VisibleForTesting
    public RowIngestionMeters getRowIngestionMeters() {
        return this.rowIngestionMeters;
    }

    @VisibleForTesting
    public FireDepartmentMetrics getFireDepartmentMetrics() {
        return this.fireDepartmentMetrics;
    }

    public void stopForcefully() {
        log.info("Stopping forcefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        this.runThread.interrupt();
    }

    public void stopGracefully() {
        log.info("Stopping gracefully (status: [%s])", new Object[]{this.status});
        this.stopRequested.set(true);
        synchronized (this.statusLock) {
            if (this.status == Status.PUBLISHING) {
                this.runThread.interrupt();
                return;
            }
            try {
                if (!this.pauseLock.tryLock(15L, TimeUnit.SECONDS)) {
                    log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread", new Object[0]);
                    this.runThread.interrupt();
                    return;
                }
                try {
                    if (this.pauseRequested) {
                        this.pauseRequested = false;
                        this.shouldResume.signalAll();
                    }
                    this.pauseLock.unlock();
                    if (this.pollRetryLock.tryLock(15L, TimeUnit.SECONDS)) {
                        try {
                            this.isAwaitingRetry.signalAll();
                            this.pollRetryLock.unlock();
                        } catch (Throwable th) {
                            this.pollRetryLock.unlock();
                            throw th;
                        }
                    } else {
                        log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread", new Object[0]);
                        this.runThread.interrupt();
                    }
                } catch (Throwable th2) {
                    this.pauseLock.unlock();
                    throw th2;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    @POST
    @Path("/stop")
    public Response stop(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        stopGracefully();
        return Response.status(Response.Status.OK).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/status")
    public Status getStatusHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return this.status;
    }

    @VisibleForTesting
    public Status getStatus() {
        return this.status;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/current")
    public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getCurrentOffsets();
    }

    public ConcurrentMap<PartitionIdType, SequenceOffsetType> getCurrentOffsets() {
        return this.currOffsets;
    }

    @GET
    @Produces({"application/json"})
    @Path("/offsets/end")
    public Map<PartitionIdType, SequenceOffsetType> getEndOffsetsHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getEndOffsets();
    }

    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets() {
        return this.endOffsets;
    }

    @Path("/offsets/end")
    @Consumes({"application/json"})
    @POST
    @Produces({"application/json"})
    public Response setEndOffsetsHTTP(Map<PartitionIdType, SequenceOffsetType> map, @QueryParam("finish") @DefaultValue("true") boolean z, @Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return setEndOffsets(map, z);
    }

    public Map<String, Object> doGetRowStats() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        hashMap2.put("buildSegments", this.rowIngestionMeters.getTotals());
        hashMap3.put("buildSegments", this.rowIngestionMeters.getMovingAverages());
        hashMap.put("movingAverages", hashMap3);
        hashMap.put("totals", hashMap2);
        return hashMap;
    }

    public Map<String, Object> doGetLiveReports() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        Map<String, Object> taskCompletionUnparseableEvents = getTaskCompletionUnparseableEvents();
        hashMap3.put("ingestionState", this.ingestionState);
        hashMap3.put("unparseableEvents", taskCompletionUnparseableEvents);
        hashMap3.put("rowStats", doGetRowStats());
        hashMap2.put("taskId", this.task.getId());
        hashMap2.put("payload", hashMap3);
        hashMap2.put("type", IngestionStatsAndErrorsTaskReport.REPORT_KEY);
        hashMap.put(IngestionStatsAndErrorsTaskReport.REPORT_KEY, hashMap2);
        return hashMap;
    }

    @GET
    @Produces({"application/json"})
    @Path("/rowStats")
    public Response getRowStats(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return Response.ok(doGetRowStats()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/liveReports")
    public Response getLiveReport(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return Response.ok(doGetLiveReports()).build();
    }

    @GET
    @Produces({"application/json"})
    @Path("/unparseableEvents")
    public Response getUnparseableEvents(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return Response.ok(IndexTaskUtils.getReportListFromSavedParseExceptions(this.parseExceptionHandler.getSavedParseExceptionReports())).build();
    }

    @VisibleForTesting
    public Response setEndOffsets(Map<PartitionIdType, SequenceOffsetType> map, boolean z) throws InterruptedException {
        if (map == null) {
            return Response.status(Response.Status.BAD_REQUEST).entity("Request body must contain a map of { partition:endOffset }").build();
        }
        try {
            if (!this.endOffsets.keySet().containsAll(map.keySet())) {
                return Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("Request contains partitions not being handled by this task, my partitions: %s", new Object[]{this.endOffsets.keySet()})).build();
            }
            try {
                this.pauseLock.lockInterruptibly();
                Preconditions.checkState(map.size() > 0, "No sequences found to set end sequences");
                SequenceMetadata<PartitionIdType, SequenceOffsetType> lastSequenceMetadata = getLastSequenceMetadata();
                Set<PartitionIdType> emptySet = isEndOffsetExclusive() ? Collections.emptySet() : map.keySet();
                if ((lastSequenceMetadata.getStartOffsets().equals(map) && lastSequenceMetadata.getExclusiveStartPartitions().equals(emptySet) && !z) || (lastSequenceMetadata.getEndOffsets().equals(map) && z)) {
                    log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", new Object[]{map});
                    resume();
                    Response build = Response.ok(map).build();
                    this.pauseLock.unlock();
                    return build;
                }
                if (lastSequenceMetadata.isCheckpointed()) {
                    Response build2 = Response.status(Response.Status.BAD_REQUEST).type("text/plain").entity(StringUtils.format("Sequence [%s] has already endOffsets set, cannot set to [%s]", new Object[]{lastSequenceMetadata, map})).build();
                    this.pauseLock.unlock();
                    return build2;
                }
                if (!isPaused()) {
                    Response build3 = Response.status(Response.Status.BAD_REQUEST).entity("Task must be paused before changing the end offsets").build();
                    this.pauseLock.unlock();
                    return build3;
                }
                for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : map.entrySet()) {
                    if (createSequenceNumber(entry.getValue()).compareTo(createSequenceNumber(this.currOffsets.get(entry.getKey()))) < 0) {
                        Response build4 = Response.status(Response.Status.BAD_REQUEST).entity(StringUtils.format("End sequence must be >= current sequence for partition [%s] (current: %s)", new Object[]{entry.getKey(), this.currOffsets.get(entry.getKey())})).build();
                        this.pauseLock.unlock();
                        return build4;
                    }
                }
                resetNextCheckpointTime();
                lastSequenceMetadata.setEndOffsets(map);
                if (z) {
                    log.info("Sequence[%s] end offsets updated from [%s] to [%s].", new Object[]{lastSequenceMetadata.getSequenceName(), this.endOffsets, map});
                    this.endOffsets.putAll(map);
                } else {
                    SequenceMetadata<PartitionIdType, SequenceOffsetType> sequenceMetadata = new SequenceMetadata<>(lastSequenceMetadata.getSequenceId() + 1, StringUtils.format("%s_%d", new Object[]{this.ioConfig.getBaseSequenceName(), Integer.valueOf(lastSequenceMetadata.getSequenceId() + 1)}), map, this.endOffsets, false, emptySet);
                    log.info("Sequence[%s] created with start offsets [%s] and end offsets [%s].", new Object[]{sequenceMetadata.getSequenceName(), map, this.endOffsets});
                    addSequence(sequenceMetadata);
                }
                persistSequences();
                this.pauseLock.unlock();
                resume();
                return Response.ok(map).build();
            } catch (Exception e) {
                log.error(e, "Failed to set end offsets.", new Object[0]);
                this.backgroundThreadException = e;
                resume();
                Response build5 = Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(Throwables.getStackTraceAsString(e)).build();
                this.pauseLock.unlock();
                return build5;
            }
        } catch (Throwable th) {
            this.pauseLock.unlock();
            throw th;
        }
    }

    private void resetNextCheckpointTime() {
        this.nextCheckpointTime = DateTimes.nowUtc().plus(this.tuningConfig.getIntermediateHandoffPeriod()).getMillis();
    }

    @VisibleForTesting
    public CopyOnWriteArrayList<SequenceMetadata<PartitionIdType, SequenceOffsetType>> getSequences() {
        return this.sequences;
    }

    @GET
    @Produces({"application/json"})
    @Path("/checkpoints")
    public Map<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpointsHTTP(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.READ);
        return getCheckpoints();
    }

    private Map<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpoints() {
        return new TreeMap((Map) this.sequences.stream().collect(Collectors.toMap((v0) -> {
            return v0.getSequenceId();
        }, (v0) -> {
            return v0.getStartOffsets();
        })));
    }

    @POST
    @Produces({"application/json"})
    @Path("/pause")
    public Response pauseHTTP(@Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return pause();
    }

    @VisibleForTesting
    public Response pause() throws InterruptedException {
        if (this.status != Status.PAUSED && this.status != Status.READING) {
            return Response.status(Response.Status.BAD_REQUEST).type("text/plain").entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", new Object[]{this.status})).build();
        }
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = true;
            this.pollRetryLock.lockInterruptibly();
            try {
                this.isAwaitingRetry.signalAll();
                this.pollRetryLock.unlock();
                if (isPaused()) {
                    this.shouldResume.signalAll();
                }
                long nanos = TimeUnit.SECONDS.toNanos(2L);
                while (!isPaused()) {
                    if (nanos <= 0) {
                        Response build = Response.status(Response.Status.ACCEPTED).entity("Request accepted but task has not yet paused").build();
                        this.pauseLock.unlock();
                        return build;
                    }
                    nanos = this.hasPaused.awaitNanos(nanos);
                }
                try {
                    return Response.ok().entity(this.toolbox.getJsonMapper().writeValueAsString(getCurrentOffsets())).build();
                } catch (JsonProcessingException e) {
                    throw new RuntimeException((Throwable) e);
                }
            } catch (Throwable th) {
                this.pollRetryLock.unlock();
                throw th;
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @POST
    @Path("/resume")
    public Response resumeHTTP(@Context HttpServletRequest httpServletRequest) throws InterruptedException {
        authorizationCheck(httpServletRequest, Action.WRITE);
        resume();
        return Response.status(Response.Status.OK).build();
    }

    @VisibleForTesting
    public void resume() throws InterruptedException {
        this.pauseLock.lockInterruptibly();
        try {
            this.pauseRequested = false;
            this.shouldResume.signalAll();
            long nanos = TimeUnit.SECONDS.toNanos(5L);
            while (isPaused()) {
                if (nanos <= 0) {
                    throw new RuntimeException("Resume command was not accepted within 5 seconds");
                }
                nanos = this.shouldResume.awaitNanos(nanos);
            }
        } finally {
            this.pauseLock.unlock();
        }
    }

    @GET
    @Produces({"application/json"})
    @Path("/time/start")
    public DateTime getStartTime(@Context HttpServletRequest httpServletRequest) {
        authorizationCheck(httpServletRequest, Action.WRITE);
        return this.startTime;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean verifyRecordInRange(PartitionIdType partitionidtype, SequenceOffsetType sequenceoffsettype) {
        Object checkNotNull = Preconditions.checkNotNull(this.currOffsets.get(partitionidtype), "Current offset is null for partition[%s]", new Object[]{sequenceoffsettype, partitionidtype});
        OrderedSequenceNumber createSequenceNumber = createSequenceNumber(sequenceoffsettype);
        if (createSequenceNumber.compareTo(createSequenceNumber(checkNotNull)) < 0) {
            throw new ISE("Record sequenceNumber[%s] is smaller than current sequenceNumber[%s] for partition[%s]", new Object[]{sequenceoffsettype, checkNotNull, partitionidtype});
        }
        if (isRecordAlreadyRead(partitionidtype, sequenceoffsettype)) {
            return false;
        }
        return isMoreToReadBeforeReadingRecord(createSequenceNumber.get(), this.endOffsets.get(partitionidtype));
    }

    protected abstract boolean isEndOfShard(SequenceOffsetType sequenceoffsettype);

    @Nullable
    protected abstract TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckPointsFromContext(TaskToolbox taskToolbox, String str) throws IOException;

    protected abstract SequenceOffsetType getNextStartOffset(SequenceOffsetType sequenceoffsettype);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract SeekableStreamEndSequenceNumbers<PartitionIdType, SequenceOffsetType> deserializePartitionsFromMetadata(ObjectMapper objectMapper, Object obj);

    @NotNull
    protected abstract List<OrderedPartitionableRecord<PartitionIdType, SequenceOffsetType, RecordType>> getRecords(RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, TaskToolbox taskToolbox) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadata(SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> seekableStreamSequenceNumbers);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract OrderedSequenceNumber<SequenceOffsetType> createSequenceNumber(SequenceOffsetType sequenceoffsettype);

    protected abstract void possiblyResetDataSourceMetadata(TaskToolbox taskToolbox, RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier, Set<StreamPartition<PartitionIdType>> set);

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract boolean isEndOffsetExclusive();

    protected abstract TypeReference<List<SequenceMetadata<PartitionIdType, SequenceOffsetType>>> getSequenceMetadataTypeReference();
}
