/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusProvider;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class OperatorChainTest {
    @Test
    public void testPrepareCheckpointPreBarrier() throws Exception {
        AtomicInteger intRef = new AtomicInteger();
        ValidatingOperator one = new ValidatingOperator(intRef, 0);
        ValidatingOperator two = new ValidatingOperator(intRef, 1);
        ValidatingOperator three = new ValidatingOperator(intRef, 2);
        OperatorChain chain = OperatorChainTest.setupOperatorChain(one, two, three);
        chain.prepareSnapshotPreBarrier(5765167L);
        Assert.assertEquals((long)3L, (long)intRef.get());
    }

    @SafeVarargs
    private static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain(OneInputStreamOperator<T, T> ... operators) {
        Preconditions.checkNotNull(operators);
        Preconditions.checkArgument((operators.length > 0 ? 1 : 0) != 0);
        try (MockEnvironment env = MockEnvironment.builder().build();){
            OneInputStreamTask containingTask = new OneInputStreamTask((Environment)env);
            StreamStatusProvider statusProvider = (StreamStatusProvider)Mockito.mock(StreamStatusProvider.class);
            StreamConfig cfg = new StreamConfig(new Configuration());
            StreamOperator[] ops = new StreamOperator[operators.length];
            OperatorChain.BroadcastingOutputCollector lastWriter = new OperatorChain.BroadcastingOutputCollector(new Output[0], statusProvider);
            for (int i = 0; i < ops.length; ++i) {
                OneInputStreamOperator<T, T> op = operators[ops.length - i - 1];
                op.setup((StreamTask)containingTask, cfg, (Output)lastWriter);
                lastWriter = new OperatorChain.ChainingOutput(op, statusProvider, null);
                ops[i] = op;
            }
            OneInputStreamOperator<T, T> head = operators[0];
            OperatorChain operatorChain = new OperatorChain(ops, new RecordWriterOutput[0], (OperatorChain.WatermarkGaugeExposingOutput)lastWriter, head);
            return operatorChain;
        }
    }

    private static class ValidatingOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;
        static final long CHECKPOINT_ID = 5765167L;
        final AtomicInteger toUpdate;
        final int expected;

        public ValidatingOperator(AtomicInteger toUpdate, int expected) {
            this.toUpdate = toUpdate;
            this.expected = expected;
        }

        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            Assert.assertEquals((String)"wrong checkpointId", (long)5765167L, (long)checkpointId);
            Assert.assertEquals((String)"wrong order", (long)this.expected, (long)this.toUpdate.getAndIncrement());
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            throw new UnsupportedOperationException();
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }
    }
}

