/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler;
import org.apache.flink.util.ExceptionUtils;

@Internal
public final class StreamTwoInputProcessor<IN1, IN2>
implements StreamInputProcessor {
    private final TwoInputSelectionHandler inputSelectionHandler;
    private final StreamOneInputProcessor<IN1> processor1;
    private final StreamOneInputProcessor<IN2> processor2;
    private InputStatus firstInputStatus = InputStatus.MORE_AVAILABLE;
    private InputStatus secondInputStatus = InputStatus.MORE_AVAILABLE;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamTwoInputProcessor(TwoInputSelectionHandler inputSelectionHandler, StreamOneInputProcessor<IN1> processor1, StreamOneInputProcessor<IN2> processor2) {
        this.inputSelectionHandler = inputSelectionHandler;
        this.processor1 = processor1;
        this.processor2 = processor2;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return this.isAnyInputAvailable();
        }
        StreamOneInputProcessor<Object> input = this.inputSelectionHandler.isFirstInputSelected() ? this.processor1 : this.processor2;
        return input.getAvailableFuture();
    }

    @Override
    public InputStatus processInput() throws Exception {
        int readingInputIndex = this.isPrepared ? this.selectNextReadingInputIndex() : this.selectFirstReadingInputIndex();
        if (readingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = readingInputIndex;
        if (readingInputIndex == 0) {
            this.firstInputStatus = this.processor1.processInput();
        } else {
            this.secondInputStatus = this.processor2.processInput();
        }
        this.inputSelectionHandler.nextSelection();
        return this.getInputStatus();
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws IOException {
        return CompletableFuture.allOf(this.processor1.prepareSnapshot(channelStateWriter, checkpointId), this.processor2.prepareSnapshot(channelStateWriter, checkpointId));
    }

    private int selectFirstReadingInputIndex() throws IOException {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return this.selectNextReadingInputIndex();
    }

    private InputStatus getInputStatus() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return InputStatus.END_OF_INPUT;
        }
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            if (this.firstInputStatus == InputStatus.MORE_AVAILABLE || this.secondInputStatus == InputStatus.MORE_AVAILABLE) {
                return InputStatus.MORE_AVAILABLE;
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
        InputStatus selectedStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.firstInputStatus : this.secondInputStatus;
        InputStatus otherStatus = this.inputSelectionHandler.isFirstInputSelected() ? this.secondInputStatus : this.firstInputStatus;
        return selectedStatus == InputStatus.END_OF_INPUT ? otherStatus : selectedStatus;
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        try {
            this.processor1.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
        }
        try {
            this.processor2.close();
        }
        catch (IOException e) {
            ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)ex);
        }
        if (ex != null) {
            throw ex;
        }
    }

    private int selectNextReadingInputIndex() throws IOException {
        this.updateAvailability();
        this.checkInputSelectionAgainstIsFinished();
        int readingInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex);
        if (readingInputIndex == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            this.checkAndSetAvailable(1 - readingInputIndex);
        }
        return readingInputIndex;
    }

    private void checkInputSelectionAgainstIsFinished() throws IOException {
        if (this.inputSelectionHandler.areAllInputsSelected()) {
            return;
        }
        if (this.inputSelectionHandler.isFirstInputSelected() && this.firstInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only first input is selected but it is already finished");
        }
        if (this.inputSelectionHandler.isSecondInputSelected() && this.secondInputStatus == InputStatus.END_OF_INPUT) {
            throw new IOException("Can not make a progress: only second input is selected but it is already finished");
        }
    }

    private void updateAvailability() {
        this.updateAvailability(this.firstInputStatus, this.processor1, 0);
        this.updateAvailability(this.secondInputStatus, this.processor2, 1);
    }

    private void updateAvailability(InputStatus status, StreamOneInputProcessor<?> input, int inputIdx) {
        if (status == InputStatus.MORE_AVAILABLE || status != InputStatus.END_OF_INPUT && input.isApproximatelyAvailable()) {
            this.inputSelectionHandler.setAvailableInput(inputIdx);
        } else {
            this.inputSelectionHandler.setUnavailableInput(inputIdx);
        }
    }

    private void checkAndSetAvailable(int inputIndex) {
        InputStatus status;
        InputStatus inputStatus = status = inputIndex == 0 ? this.firstInputStatus : this.secondInputStatus;
        if (status == InputStatus.END_OF_INPUT) {
            return;
        }
        if (this.getInput(inputIndex).isAvailable()) {
            this.inputSelectionHandler.setAvailableInput(inputIndex);
        }
    }

    private CompletableFuture<?> isAnyInputAvailable() {
        if (this.firstInputStatus == InputStatus.END_OF_INPUT) {
            return this.processor2.getAvailableFuture();
        }
        if (this.secondInputStatus == InputStatus.END_OF_INPUT) {
            return this.processor1.getAvailableFuture();
        }
        return AvailabilityProvider.or(this.processor1.getAvailableFuture(), this.processor2.getAvailableFuture());
    }

    private StreamOneInputProcessor<?> getInput(int inputIndex) {
        return inputIndex == 0 ? this.processor1 : this.processor2;
    }
}

