/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.File;
import java.io.IOException;
import java.util.function.Function;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;

public class OneInputStreamTaskTestHarness<IN, OUT>
extends StreamTaskTestHarness<OUT> {
    private TypeInformation<IN> inputType;
    private TypeSerializer<IN> inputSerializer;

    public OneInputStreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, int numInputGates, int numInputChannelsPerGate, TypeInformation<IN> inputType, TypeInformation<OUT> outputType) {
        this(taskFactory, numInputGates, numInputChannelsPerGate, inputType, outputType, TestLocalRecoveryConfig.disabled());
    }

    public OneInputStreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, int numInputGates, int numInputChannelsPerGate, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, File localRootDir) {
        super(taskFactory, outputType, localRootDir);
        this.inputType = inputType;
        this.inputSerializer = inputType.createSerializer(this.executionConfig);
        this.numInputGates = numInputGates;
        this.numInputChannelsPerGate = numInputChannelsPerGate;
        this.streamConfig.setStateKeySerializer(this.inputSerializer);
    }

    public OneInputStreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, int numInputGates, int numInputChannelsPerGate, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, LocalRecoveryConfig localRecoveryConfig) {
        super(taskFactory, outputType, localRecoveryConfig);
        this.inputType = inputType;
        this.inputSerializer = inputType.createSerializer(this.executionConfig);
        this.numInputGates = numInputGates;
        this.numInputChannelsPerGate = numInputChannelsPerGate;
    }

    public OneInputStreamTaskTestHarness(Function<Environment, ? extends StreamTask<OUT, ?>> taskFactory, TypeInformation<IN> inputType, TypeInformation<OUT> outputType) {
        this(taskFactory, 1, 1, inputType, outputType, TestLocalRecoveryConfig.disabled());
    }

    @Override
    protected void initializeInputs() throws IOException, InterruptedException {
        this.inputGates = new StreamTestSingleInputGate[this.numInputGates];
        for (int i = 0; i < this.numInputGates; ++i) {
            this.inputGates[i] = new StreamTestSingleInputGate<IN>(this.numInputChannelsPerGate, this.bufferSize, this.inputSerializer);
            this.mockEnv.addInputGate((InputGate)this.inputGates[i].getInputGate());
        }
        this.streamConfig.setNumberOfInputs(1);
        this.streamConfig.setTypeSerializerIn1(this.inputSerializer);
    }

    public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
        ClosureCleaner.clean(keySelector, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)false);
        this.streamConfig.setStatePartitioner(0, keySelector);
        this.streamConfig.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
    }

    @Override
    public OneInputStreamTask<IN, OUT> getTask() {
        return (OneInputStreamTask)super.getTask();
    }
}

