package org.apache.flink.iteration.operator;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheReader;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheSnapshot;
import org.apache.flink.iteration.datacache.nonkeyed.DataCacheWriter;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerFactory;
import org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerListener;
import org.apache.flink.iteration.typeinfo.IterationRecordSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/iteration/operator/ReplayOperator.class */
public class ReplayOperator<T> extends AbstractStreamOperator<IterationRecord<T>> implements TwoInputStreamOperator<IterationRecord<T>, IterationRecord<Void>, IterationRecord<T>>, OperatorEpochWatermarkTrackerListener, BoundedMultiInput {
    private OperatorEpochWatermarkTracker progressTracker;
    private Path basePath;
    private FileSystem fileSystem;
    private TypeSerializer<T> typeSerializer;
    private MailboxExecutor mailboxExecutor;
    private DataCacheWriter<T> dataCacheWriter;

    @Nullable
    private DataCacheReader<T> currentDataCacheReader;
    private int currentEpoch;
    private ListState<Integer> parallelismState;
    private ListState<Integer> currentEpochState;

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<IterationRecord<T>>> output) {
        super.setup(streamTask, streamConfig, output);
        this.progressTracker = OperatorEpochWatermarkTrackerFactory.create(streamConfig, streamTask, this);
        try {
            this.basePath = OperatorUtils.getDataCachePath(streamTask.getEnvironment().getTaskManagerInfo().getConfiguration(), streamTask.getEnvironment().getIOManager().getSpillingDirectoriesPaths());
            this.fileSystem = this.basePath.getFileSystem();
            this.typeSerializer = ((IterationRecordSerializer) streamConfig.getTypeSerializerOut(getClass().getClassLoader())).getInnerSerializer();
            this.mailboxExecutor = streamTask.getMailboxExecutorFactory().createExecutor(-1);
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.parallelismState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("parallelism", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(this.parallelismState, "parallelism").ifPresent(num -> {
            Preconditions.checkState(num.intValue() == getRuntimeContext().getNumberOfParallelSubtasks(), "The Replay operator is recovered with parallelism changed from " + num + " to " + getRuntimeContext().getNumberOfParallelSubtasks());
        });
        this.currentEpochState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("epoch", IntSerializer.INSTANCE));
        OperatorStateUtils.getUniqueElement(this.currentEpochState, "epoch").ifPresent(num2 -> {
            this.currentEpoch = num2.intValue();
        });
        try {
            SupplierWithException<Path, IOException> createDataCacheFileGenerator = OperatorUtils.createDataCacheFileGenerator(this.basePath, "replay", this.config.getOperatorID());
            DataCacheSnapshot dataCacheSnapshot = null;
            List list = IteratorUtils.toList(stateInitializationContext.getRawOperatorStateInputs().iterator());
            if (list.size() > 0) {
                Preconditions.checkState(list.size() == 1, "Currently the replay operator does not support rescaling");
                dataCacheSnapshot = DataCacheSnapshot.recover(((StatePartitionStreamProvider) list.get(0)).getStream(), this.fileSystem, createDataCacheFileGenerator);
            }
            this.dataCacheWriter = new DataCacheWriter<>(this.typeSerializer, this.fileSystem, createDataCacheFileGenerator, dataCacheSnapshot == null ? Collections.emptyList() : dataCacheSnapshot.getSegments());
            if (dataCacheSnapshot != null && dataCacheSnapshot.getReaderPosition() != null) {
                this.currentDataCacheReader = new DataCacheReader<>(this.typeSerializer, dataCacheSnapshot.getSegments(), dataCacheSnapshot.getReaderPosition());
            }
        } catch (Exception e) {
            throw new FlinkRuntimeException("Failed to replay the records", e);
        }
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.parallelismState.clear();
        if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.parallelismState.update(Collections.singletonList(Integer.valueOf(getRuntimeContext().getNumberOfParallelSubtasks())));
        }
        this.currentEpochState.update(Collections.singletonList(Integer.valueOf(this.currentEpoch)));
        this.dataCacheWriter.writeSegmentsToFiles();
        DataCacheSnapshot dataCacheSnapshot = new DataCacheSnapshot(this.fileSystem, this.currentDataCacheReader == null ? null : this.currentDataCacheReader.getPosition(), this.dataCacheWriter.getSegments());
        stateSnapshotContext.getRawOperatorStateOutput().startNewPartition();
        dataCacheSnapshot.writeTo(stateSnapshotContext.getRawOperatorStateOutput());
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement1(StreamRecord<IterationRecord<T>> streamRecord) throws Exception {
        switch (((IterationRecord) streamRecord.getValue()).getType()) {
            case RECORD:
                this.dataCacheWriter.addRecord(((IterationRecord) streamRecord.getValue()).getValue());
                this.output.collect(streamRecord);
                return;
            case EPOCH_WATERMARK:
                this.progressTracker.onEpochWatermark(0, ((IterationRecord) streamRecord.getValue()).getSender(), ((IterationRecord) streamRecord.getValue()).getEpoch());
                return;
            default:
                throw new UnsupportedOperationException("Not supported element type: " + streamRecord.getValue());
        }
    }

    public void processElement2(StreamRecord<IterationRecord<Void>> streamRecord) throws Exception {
        if (((IterationRecord) streamRecord.getValue()).getType() != IterationRecord.Type.EPOCH_WATERMARK) {
            throw new UnsupportedOperationException("Not supported element type: " + streamRecord.getValue());
        }
        this.progressTracker.onEpochWatermark(1, ((IterationRecord) streamRecord.getValue()).getSender(), ((IterationRecord) streamRecord.getValue()).getEpoch());
    }

    public void endInput(int i) throws Exception {
        this.progressTracker.finish(i - 1);
        if (i != 1 || this.currentDataCacheReader == null) {
            return;
        }
        replayRecords(this.currentDataCacheReader, this.currentEpoch);
    }

    @Override // org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTrackerListener
    public void onEpochWatermarkIncrement(int i) throws IOException {
        if (i == 0) {
            this.dataCacheWriter.finish();
            emitEpochWatermark(i);
        } else {
            if (i == Integer.MAX_VALUE) {
                emitEpochWatermark(i);
                return;
            }
            Preconditions.checkState(this.currentDataCacheReader == null, "Concurrent replay is not supported");
            this.currentEpoch = i;
            this.currentDataCacheReader = new DataCacheReader<>(this.typeSerializer, this.dataCacheWriter.getSegments());
            replayRecords(this.currentDataCacheReader, i);
        }
    }

    private void replayRecords(DataCacheReader<T> dataCacheReader, int i) {
        StreamRecord streamRecord = new StreamRecord(IterationRecord.newRecord(null, i));
        while (dataCacheReader.hasNext()) {
            do {
            } while (this.mailboxExecutor.tryYield());
            ((IterationRecord) streamRecord.getValue()).setValue(dataCacheReader.next());
            this.output.collect(streamRecord);
        }
        this.currentDataCacheReader = null;
        emitEpochWatermark(i);
    }

    private void emitEpochWatermark(int i) {
        this.output.collect(new StreamRecord(IterationRecord.newEpochWatermark(i, OperatorUtils.getUniqueSenderId(this.config.getOperatorID(), getContainingTask().getIndexInSubtaskGroup()))));
    }
}
