package org.apache.flink.streaming.util;

import java.util.Collection;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.class */
public class OneInputStreamOperatorTestHarness<IN, OUT> {
    final OneInputStreamOperator<IN, OUT> operator;
    final ConcurrentLinkedQueue<Object> outputList;
    final StreamConfig config;
    final ExecutionConfig executionConfig;
    final Object checkpointLock;
    final TimeServiceProvider timeServiceProvider;
    StreamTask<?, ?> mockTask;
    private AbstractStateBackend stateBackend;
    private boolean setupCalled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness$MockOutput.class */
    public class MockOutput implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;

        private MockOutput() {
        }

        public void emitWatermark(Watermark watermark) {
            OneInputStreamOperatorTestHarness.this.outputList.add(watermark);
        }

        public void collect(StreamRecord<OUT> streamRecord) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject(streamRecord.getValue()).createSerializer(OneInputStreamOperatorTestHarness.this.executionConfig);
            }
            OneInputStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(streamRecord.getValue()), streamRecord.getTimestamp()));
        }

        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness$TriggerTask.class */
    public static final class TriggerTask implements Runnable {
        private final Object lock;
        private final Triggerable target;
        private final long timestamp;

        TriggerTask(Object obj, Triggerable triggerable, long j) {
            this.lock = obj;
            this.target = triggerable;
            this.timestamp = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this.lock) {
                try {
                    this.target.trigger(this.timestamp);
                } catch (Throwable th) {
                    try {
                        throw th;
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> oneInputStreamOperator) {
        this(oneInputStreamOperator, new ExecutionConfig());
    }

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, ExecutionConfig executionConfig) {
        this(oneInputStreamOperator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
    }

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> oneInputStreamOperator, ExecutionConfig executionConfig, TimeServiceProvider timeServiceProvider) {
        this.stateBackend = new MemoryStateBackend();
        this.setupCalled = false;
        this.operator = oneInputStreamOperator;
        this.outputList = new ConcurrentLinkedQueue<>();
        this.config = new StreamConfig(new Configuration());
        this.executionConfig = executionConfig;
        this.checkpointLock = new Object();
        final MockEnvironment mockEnvironment = new MockEnvironment("MockTwoInputTask", 3145728L, new MockInputSplitProvider(), StreamTaskTestHarness.DEFAULT_NETWORK_BUFFER_SIZE);
        this.mockTask = (StreamTask) Mockito.mock(StreamTask.class);
        this.timeServiceProvider = timeServiceProvider;
        Mockito.when(this.mockTask.getName()).thenReturn("Mock Task");
        Mockito.when(this.mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
        Mockito.when(this.mockTask.getConfiguration()).thenReturn(this.config);
        Mockito.when(this.mockTask.getEnvironment()).thenReturn(mockEnvironment);
        Mockito.when(this.mockTask.getExecutionConfig()).thenReturn(executionConfig);
        try {
            ((StreamTask) Mockito.doAnswer(new Answer<AbstractStateBackend>() { // from class: org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.1
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public AbstractStateBackend m45answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String str = (String) invocationOnMock.getArguments()[0];
                    TypeSerializer typeSerializer = (TypeSerializer) invocationOnMock.getArguments()[1];
                    OneInputStreamOperatorTestHarness.this.stateBackend.disposeAllStateForCurrentJob();
                    OneInputStreamOperatorTestHarness.this.stateBackend.initializeForJob(mockEnvironment, str, typeSerializer);
                    return OneInputStreamOperatorTestHarness.this.stateBackend;
                }
            }).when(this.mockTask)).createStateBackend((String) Matchers.any(String.class), (TypeSerializer) Matchers.any(TypeSerializer.class));
            ((StreamTask) Mockito.doAnswer(new Answer<Void>() { // from class: org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.2
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Void m46answer(InvocationOnMock invocationOnMock) throws Throwable {
                    long longValue = ((Long) invocationOnMock.getArguments()[0]).longValue();
                    OneInputStreamOperatorTestHarness.this.timeServiceProvider.registerTimer(longValue, new TriggerTask(OneInputStreamOperatorTestHarness.this.checkpointLock, (Triggerable) invocationOnMock.getArguments()[1], longValue));
                    return null;
                }
            }).when(this.mockTask)).registerTimer(Matchers.anyLong(), (Triggerable) Matchers.any(Triggerable.class));
            ((StreamTask) Mockito.doAnswer(new Answer<Long>() { // from class: org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.3
                /* renamed from: answer, reason: merged with bridge method [inline-methods] */
                public Long m47answer(InvocationOnMock invocationOnMock) throws Throwable {
                    return Long.valueOf(OneInputStreamOperatorTestHarness.this.timeServiceProvider.getCurrentProcessingTime());
                }
            }).when(this.mockTask)).getCurrentProcessingTime();
        } catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    public void setStateBackend(AbstractStateBackend abstractStateBackend) {
        this.stateBackend = abstractStateBackend;
    }

    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public Environment getEnvironment() {
        return this.mockTask.getEnvironment();
    }

    public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> typeInformation) {
        ClosureCleaner.clean(keySelector, false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(typeInformation.createSerializer(this.executionConfig));
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public void setup() throws Exception {
        this.operator.setup(this.mockTask, this.config, new MockOutput());
        this.setupCalled = true;
    }

    public void open() throws Exception {
        if (!this.setupCalled) {
            setup();
        }
        this.operator.open();
    }

    public StreamTaskState snapshot(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = this.operator.snapshotOperatorState(j, j2);
        if (snapshotOperatorState != null) {
            if (snapshotOperatorState.getFunctionState() instanceof AsynchronousStateHandle) {
                snapshotOperatorState.setFunctionState(snapshotOperatorState.getFunctionState().materialize());
            }
            if (snapshotOperatorState.getOperatorState() instanceof AsynchronousStateHandle) {
                snapshotOperatorState.setOperatorState(snapshotOperatorState.getOperatorState().materialize());
            }
            if (snapshotOperatorState.getKvStates() != null) {
                Set<String> keySet = snapshotOperatorState.getKvStates().keySet();
                HashMap kvStates = snapshotOperatorState.getKvStates();
                for (String str : keySet) {
                    if (kvStates.get(str) instanceof AsynchronousKvStateSnapshot) {
                        kvStates.put(str, ((AsynchronousKvStateSnapshot) kvStates.get(str)).materialize());
                    }
                }
            }
        }
        return snapshotOperatorState;
    }

    public void restore(StreamTaskState streamTaskState, long j) throws Exception {
        this.operator.restoreState(streamTaskState);
    }

    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.timeServiceProvider != null) {
            this.timeServiceProvider.shutdownService();
        }
        this.setupCalled = false;
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.operator.setKeyContextElement1(streamRecord);
        this.operator.processElement(streamRecord);
    }

    public void processElements(Collection<StreamRecord<IN>> collection) throws Exception {
        for (StreamRecord<IN> streamRecord : collection) {
            this.operator.setKeyContextElement1(streamRecord);
            this.operator.processElement(streamRecord);
        }
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.operator.processWatermark(watermark);
    }
}
