/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.sort.MultiInputSortingDataInput;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractDataOutput;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
import org.apache.flink.streaming.runtime.io.TwoInputSelectionHandler;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

public class StreamTwoInputProcessorFactory {
    public static <IN1, IN2> StreamTwoInputProcessor<IN1, IN2> create(AbstractInvokable ownerTask, CheckpointedInputGate[] checkpointedInputGates, IOManager ioManager, MemoryManager memoryManager, TaskIOMetricGroup taskIOMetricGroup, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, BoundedMultiInput endOfInputAware, StreamConfig streamConfig, Configuration taskManagerConfig, Configuration jobConfig, ExecutionConfig executionConfig, ClassLoader userClassloader, Counter numRecordsIn) {
        InputSelectable inputSelectable;
        Preconditions.checkNotNull((Object)endOfInputAware);
        StreamStatusTracker statusTracker = new StreamStatusTracker();
        taskIOMetricGroup.reuseRecordsInputCounter(numRecordsIn);
        TypeSerializer typeSerializer1 = streamConfig.getTypeSerializerIn(0, userClassloader);
        StreamTaskInput<Object> input1 = new StreamTaskNetworkInput(checkpointedInputGates[0], typeSerializer1, ioManager, new StatusWatermarkValve(checkpointedInputGates[0].getNumberOfInputChannels()), 0);
        TypeSerializer typeSerializer2 = streamConfig.getTypeSerializerIn(1, userClassloader);
        StreamTaskInput<Object> input2 = new StreamTaskNetworkInput(checkpointedInputGates[1], typeSerializer2, ioManager, new StatusWatermarkValve(checkpointedInputGates[1].getNumberOfInputChannels()), 1);
        InputSelectable inputSelectable2 = inputSelectable = streamOperator instanceof InputSelectable ? (InputSelectable)((Object)streamOperator) : null;
        if (streamConfig.shouldSortInputs()) {
            if (inputSelectable != null) {
                throw new IllegalStateException("The InputSelectable interface is not supported with sorting inputs");
            }
            MultiInputSortingDataInput.SelectableSortingInputs selectableSortingInputs = MultiInputSortingDataInput.wrapInputs(ownerTask, new StreamTaskInput[]{input1, input2}, new KeySelector[]{streamConfig.getStatePartitioner(0, userClassloader), streamConfig.getStatePartitioner(1, userClassloader)}, new TypeSerializer[]{typeSerializer1, typeSerializer2}, streamConfig.getStateKeySerializer(userClassloader), memoryManager, ioManager, executionConfig.isObjectReuseEnabled(), streamConfig.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.BATCH_OP, taskManagerConfig, userClassloader), jobConfig);
            inputSelectable = selectableSortingInputs.getInputSelectable();
            input1 = StreamTwoInputProcessorFactory.getSortedInput(selectableSortingInputs.getSortingInputs()[0]);
            input2 = StreamTwoInputProcessorFactory.getSortedInput(selectableSortingInputs.getSortingInputs()[1]);
        }
        StreamTaskNetworkOutput output1 = new StreamTaskNetworkOutput(streamOperator, record -> StreamTwoInputProcessorFactory.processRecord1(record, streamOperator), streamStatusMaintainer, input1WatermarkGauge, statusTracker, 0, numRecordsIn);
        StreamOneInputProcessor processor1 = new StreamOneInputProcessor(input1, output1, endOfInputAware);
        StreamTaskNetworkOutput output2 = new StreamTaskNetworkOutput(streamOperator, record -> StreamTwoInputProcessorFactory.processRecord2(record, streamOperator), streamStatusMaintainer, input2WatermarkGauge, statusTracker, 1, numRecordsIn);
        StreamOneInputProcessor processor2 = new StreamOneInputProcessor(input2, output2, endOfInputAware);
        return new StreamTwoInputProcessor(new TwoInputSelectionHandler(inputSelectable), processor1, processor2);
    }

    private static <IN1> StreamTaskInput<IN1> getSortedInput(StreamTaskInput<?> multiInput) {
        return multiInput;
    }

    private static <T> void processRecord1(StreamRecord<T> record, TwoInputStreamOperator<T, ?, ?> streamOperator) throws Exception {
        streamOperator.setKeyContextElement1(record);
        streamOperator.processElement1(record);
    }

    private static <T> void processRecord2(StreamRecord<T> record, TwoInputStreamOperator<?, T, ?> streamOperator) throws Exception {
        streamOperator.setKeyContextElement2(record);
        streamOperator.processElement2(record);
    }

    private static class StreamTaskNetworkOutput<T>
    extends AbstractDataOutput<T> {
        private final TwoInputStreamOperator<?, ?, ?> operator;
        private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;
        private final Counter numRecordsIn;
        private final StreamStatusTracker statusTracker;

        private StreamTaskNetworkOutput(TwoInputStreamOperator<?, ?, ?> operator, ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge inputWatermarkGauge, StreamStatusTracker statusTracker, int inputIndex, Counter numRecordsIn) {
            super(streamStatusMaintainer);
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.recordConsumer = (ThrowingConsumer)Preconditions.checkNotNull(recordConsumer);
            this.inputWatermarkGauge = (WatermarkGauge)Preconditions.checkNotNull((Object)inputWatermarkGauge);
            this.statusTracker = statusTracker;
            this.inputIndex = inputIndex;
            this.numRecordsIn = numRecordsIn;
        }

        @Override
        public void emitRecord(StreamRecord<T> record) throws Exception {
            this.numRecordsIn.inc();
            this.recordConsumer.accept(record);
        }

        @Override
        public void emitWatermark(Watermark watermark) throws Exception {
            this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
            if (this.inputIndex == 0) {
                this.operator.processWatermark1(watermark);
            } else {
                this.operator.processWatermark2(watermark);
            }
        }

        @Override
        public void emitStreamStatus(StreamStatus streamStatus) {
            StreamStatus anotherStreamStatus;
            if (this.inputIndex == 0) {
                this.statusTracker.setFirstStatus(streamStatus);
                anotherStreamStatus = this.statusTracker.getSecondStatus();
            } else {
                this.statusTracker.setSecondStatus(streamStatus);
                anotherStreamStatus = this.statusTracker.getFirstStatus();
            }
            if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                if (streamStatus.isActive()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                } else if (anotherStreamStatus.isIdle()) {
                    this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                }
            }
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {
            if (this.inputIndex == 0) {
                this.operator.processLatencyMarker1(latencyMarker);
            } else {
                this.operator.processLatencyMarker2(latencyMarker);
            }
        }
    }

    private static class StreamStatusTracker {
        private StreamStatus firstStatus = StreamStatus.ACTIVE;
        private StreamStatus secondStatus = StreamStatus.ACTIVE;

        private StreamStatusTracker() {
        }

        public StreamStatus getFirstStatus() {
            return this.firstStatus;
        }

        public void setFirstStatus(StreamStatus firstStatus) {
            this.firstStatus = firstStatus;
        }

        public StreamStatus getSecondStatus() {
            return this.secondStatus;
        }

        public void setSecondStatus(StreamStatus secondStatus) {
            this.secondStatus = secondStatus;
        }
    }
}

