/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util;

import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class OneInputStreamOperatorTestHarness<IN, OUT> {
    final OneInputStreamOperator<IN, OUT> operator;
    final ConcurrentLinkedQueue<Object> outputList;
    final StreamConfig config;
    final ExecutionConfig executionConfig;
    final Object checkpointLock;
    final TimeServiceProvider timeServiceProvider;
    StreamTask<?, ?> mockTask;
    private AbstractStateBackend stateBackend = new MemoryStateBackend();
    private boolean setupCalled = false;

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
        this(operator, new ExecutionConfig());
    }

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig) {
        this(operator, executionConfig, (TimeServiceProvider)DefaultTimeServiceProvider.create((ScheduledExecutorService)Executors.newSingleThreadScheduledExecutor()));
    }

    public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator, ExecutionConfig executionConfig, TimeServiceProvider testTimeProvider) {
        this.operator = operator;
        this.outputList = new ConcurrentLinkedQueue();
        this.config = new StreamConfig(new Configuration());
        this.executionConfig = executionConfig;
        this.checkpointLock = new Object();
        MockEnvironment env = new MockEnvironment("MockTwoInputTask", 0x300000L, new MockInputSplitProvider(), 1024);
        this.mockTask = (StreamTask)Mockito.mock(StreamTask.class);
        this.timeServiceProvider = testTimeProvider;
        Mockito.when((Object)this.mockTask.getName()).thenReturn((Object)"Mock Task");
        Mockito.when((Object)this.mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
        Mockito.when((Object)this.mockTask.getConfiguration()).thenReturn((Object)this.config);
        Mockito.when((Object)this.mockTask.getEnvironment()).thenReturn((Object)env);
        Mockito.when((Object)this.mockTask.getExecutionConfig()).thenReturn((Object)executionConfig);
        try {
            ((StreamTask)Mockito.doAnswer((Answer)new Answer<AbstractStateBackend>((Environment)env){
                final /* synthetic */ Environment val$env;
                {
                    this.val$env = environment;
                }

                public AbstractStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable {
                    String operatorIdentifier = (String)invocationOnMock.getArguments()[0];
                    TypeSerializer keySerializer = (TypeSerializer)invocationOnMock.getArguments()[1];
                    OneInputStreamOperatorTestHarness.this.stateBackend.disposeAllStateForCurrentJob();
                    OneInputStreamOperatorTestHarness.this.stateBackend.initializeForJob(this.val$env, operatorIdentifier, keySerializer);
                    return OneInputStreamOperatorTestHarness.this.stateBackend;
                }
            }).when(this.mockTask)).createStateBackend((String)Matchers.any(String.class), (TypeSerializer)Matchers.any(TypeSerializer.class));
        }
        catch (Exception e) {
            throw new RuntimeException(e.getMessage(), e);
        }
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                long execTime = (Long)invocation.getArguments()[0];
                Triggerable target = (Triggerable)invocation.getArguments()[1];
                OneInputStreamOperatorTestHarness.this.timeServiceProvider.registerTimer(execTime, (Runnable)new TriggerTask(OneInputStreamOperatorTestHarness.this.checkpointLock, target, execTime));
                return null;
            }
        }).when(this.mockTask)).registerTimer(Matchers.anyLong(), (Triggerable)Matchers.any(Triggerable.class));
        ((StreamTask)Mockito.doAnswer((Answer)new Answer<Long>(){

            public Long answer(InvocationOnMock invocation) throws Throwable {
                return OneInputStreamOperatorTestHarness.this.timeServiceProvider.getCurrentProcessingTime();
            }
        }).when(this.mockTask)).getCurrentProcessingTime();
    }

    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.config.setTimeCharacteristic(timeCharacteristic);
    }

    public TimeCharacteristic getTimeCharacteristic() {
        return this.config.getTimeCharacteristic();
    }

    public void setStateBackend(AbstractStateBackend stateBackend) {
        this.stateBackend = stateBackend;
    }

    public Object getCheckpointLock() {
        return this.mockTask.getCheckpointLock();
    }

    public Environment getEnvironment() {
        return this.mockTask.getEnvironment();
    }

    public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
        ClosureCleaner.clean(keySelector, (boolean)false);
        this.config.setStatePartitioner(0, keySelector);
        this.config.setStateKeySerializer(keyType.createSerializer(this.executionConfig));
    }

    public ConcurrentLinkedQueue<Object> getOutput() {
        return this.outputList;
    }

    public void setup() throws Exception {
        this.setup(null);
    }

    public void setup(TypeSerializer<OUT> outputTypeSerializer) {
        this.operator.setup(this.mockTask, this.config, (Output)new MockOutput(outputTypeSerializer));
        this.setupCalled = true;
    }

    public void open() throws Exception {
        if (!this.setupCalled) {
            this.setup();
        }
        this.operator.open();
    }

    public StreamTaskState snapshot(long checkpointId, long timestamp) throws Exception {
        StreamTaskState snapshot = this.operator.snapshotOperatorState(checkpointId, timestamp);
        if (snapshot != null) {
            AsynchronousStateHandle asyncState;
            if (snapshot.getFunctionState() instanceof AsynchronousStateHandle) {
                asyncState = (AsynchronousStateHandle)snapshot.getFunctionState();
                snapshot.setFunctionState(asyncState.materialize());
            }
            if (snapshot.getOperatorState() instanceof AsynchronousStateHandle) {
                asyncState = (AsynchronousStateHandle)snapshot.getOperatorState();
                snapshot.setOperatorState(asyncState.materialize());
            }
            if (snapshot.getKvStates() != null) {
                Set keys = snapshot.getKvStates().keySet();
                HashMap kvStates = snapshot.getKvStates();
                for (String key : keys) {
                    if (!(kvStates.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                    AsynchronousKvStateSnapshot asyncHandle = (AsynchronousKvStateSnapshot)kvStates.get(key);
                    kvStates.put(key, asyncHandle.materialize());
                }
            }
        }
        return snapshot;
    }

    public void snaphotToFile(StreamTaskState snapshot, String filename) throws Exception {
        FileOutputStream out = new FileOutputStream(filename);
        ObjectOutputStream oos = new ObjectOutputStream(out);
        oos.writeObject(snapshot);
        out.close();
    }

    public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
        this.operator.restoreState(snapshot);
    }

    public void close() throws Exception {
        this.operator.close();
        this.operator.dispose();
        if (this.timeServiceProvider != null) {
            this.timeServiceProvider.shutdownService();
        }
        this.setupCalled = false;
    }

    public void processElement(StreamRecord<IN> element) throws Exception {
        this.operator.setKeyContextElement1(element);
        this.operator.processElement(element);
    }

    public void processElements(Collection<StreamRecord<IN>> elements) throws Exception {
        for (StreamRecord<IN> element : elements) {
            this.operator.setKeyContextElement1(element);
            this.operator.processElement(element);
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        this.operator.processWatermark(mark);
    }

    private static final class TriggerTask
    implements Runnable {
        private final Object lock;
        private final Triggerable target;
        private final long timestamp;

        TriggerTask(Object lock, Triggerable target, long timestamp) {
            this.lock = lock;
            this.target = target;
            this.timestamp = timestamp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                try {
                    this.target.trigger(this.timestamp);
                }
                catch (Throwable t) {
                    try {
                        throw t;
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    private class MockOutput
    implements Output<StreamRecord<OUT>> {
        private TypeSerializer<OUT> outputSerializer;

        public MockOutput(TypeSerializer<OUT> outputSerializer) {
            this.outputSerializer = outputSerializer;
        }

        public void emitWatermark(Watermark mark) {
            OneInputStreamOperatorTestHarness.this.outputList.add(mark);
        }

        public void collect(StreamRecord<OUT> element) {
            if (this.outputSerializer == null) {
                this.outputSerializer = TypeExtractor.getForObject((Object)element.getValue()).createSerializer(OneInputStreamOperatorTestHarness.this.executionConfig);
            }
            OneInputStreamOperatorTestHarness.this.outputList.add(new StreamRecord(this.outputSerializer.copy(element.getValue()), element.getTimestamp()));
        }

        public void close() {
        }
    }
}

