/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Assert;
import org.junit.Test;

public class InputFormatSourceFunctionTest {
    @Test
    public void testNormalOp() throws Exception {
        this.testFormatLifecycle(false);
    }

    @Test
    public void testCancelation() throws Exception {
        this.testFormatLifecycle(true);
    }

    private void testFormatLifecycle(boolean midCancel) throws Exception {
        int noOfSplits = 5;
        int cancelAt = 2;
        LifeCycleTestInputFormat format = new LifeCycleTestInputFormat();
        InputFormatSourceFunction reader = new InputFormatSourceFunction((InputFormat)format, TypeInformation.of(Integer.class));
        reader.setRuntimeContext((RuntimeContext)new MockRuntimeContext(format, 5));
        Assert.assertTrue((!format.isConfigured ? 1 : 0) != 0);
        Assert.assertTrue((!format.isInputFormatOpen ? 1 : 0) != 0);
        Assert.assertTrue((!format.isSplitOpen ? 1 : 0) != 0);
        reader.open(new Configuration());
        Assert.assertTrue((boolean)format.isConfigured);
        TestSourceContext ctx = new TestSourceContext(reader, format, midCancel, 2);
        reader.run((SourceFunction.SourceContext)ctx);
        int splitsSeen = ctx.getSplitsSeen();
        Assert.assertTrue((boolean)(midCancel ? splitsSeen == 2 : splitsSeen == 5));
        Assert.assertTrue((!format.isSplitOpen ? 1 : 0) != 0);
        Assert.assertTrue((!format.isInputFormatOpen ? 1 : 0) != 0);
    }

    private static class MockRuntimeContext
    extends StreamingRuntimeContext {
        private final int noOfSplits;
        private int nextSplit = 0;
        private final LifeCycleTestInputFormat format;
        private InputSplit[] inputSplits;

        private MockRuntimeContext(LifeCycleTestInputFormat format, int noOfSplits) {
            super((AbstractStreamOperator)new MockStreamOperator(), (Environment)new MockEnvironment("no", 131072L, null, 16), Collections.emptyMap());
            this.noOfSplits = noOfSplits;
            this.format = format;
        }

        public MetricGroup getMetricGroup() {
            return new UnregisteredMetricsGroup();
        }

        public InputSplitProvider getInputSplitProvider() {
            try {
                this.inputSplits = this.format.createInputSplits(this.noOfSplits);
                Assert.assertTrue((this.inputSplits.length == this.noOfSplits ? 1 : 0) != 0);
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            return new InputSplitProvider(){

                public InputSplit getNextInputSplit() {
                    if (MockRuntimeContext.this.nextSplit < MockRuntimeContext.this.inputSplits.length) {
                        return MockRuntimeContext.this.inputSplits[MockRuntimeContext.this.nextSplit++];
                    }
                    return null;
                }
            };
        }

        private static class MockStreamOperator
        extends AbstractStreamOperator<Integer> {
            private static final long serialVersionUID = -1153976702711944427L;

            private MockStreamOperator() {
            }

            public ExecutionConfig getExecutionConfig() {
                return new ExecutionConfig();
            }
        }
    }

    private static class TestSourceContext
    implements SourceFunction.SourceContext<Integer> {
        private final InputFormatSourceFunction<Integer> reader;
        private final LifeCycleTestInputFormat format;
        private final boolean shouldCancel;
        private final int cancelAt;
        int splitIdx = 0;

        private TestSourceContext(InputFormatSourceFunction<Integer> reader, LifeCycleTestInputFormat format, boolean shouldCancel, int cancelAt) {
            this.reader = reader;
            this.format = format;
            this.shouldCancel = shouldCancel;
            this.cancelAt = cancelAt;
        }

        public void collect(Integer element) {
            Assert.assertTrue((boolean)this.format.isSplitOpen);
            Assert.assertTrue((this.splitIdx == element ? 1 : 0) != 0);
            if (this.shouldCancel && this.splitIdx == this.cancelAt) {
                this.reader.cancel();
            } else {
                ++this.splitIdx;
            }
        }

        public void collectWithTimestamp(Integer element, long timestamp) {
            throw new UnsupportedOperationException();
        }

        public void emitWatermark(Watermark mark) {
            throw new UnsupportedOperationException();
        }

        public Object getCheckpointLock() {
            return null;
        }

        public void close() {
            throw new UnsupportedOperationException();
        }

        public int getSplitsSeen() {
            return this.splitIdx;
        }
    }

    private static class LifeCycleTestInputFormat
    extends RichInputFormat<Integer, InputSplit> {
        private boolean isConfigured = false;
        private boolean isInputFormatOpen = false;
        private boolean isSplitOpen = false;
        private boolean eos = false;
        private int splitCounter = 0;
        private int reachedEndCalls = 0;
        private int nextRecordCalls = 0;

        private LifeCycleTestInputFormat() {
        }

        public void openInputFormat() {
            Assert.assertTrue((boolean)this.isConfigured);
            Assert.assertTrue((!this.isInputFormatOpen ? 1 : 0) != 0);
            Assert.assertTrue((!this.isSplitOpen ? 1 : 0) != 0);
            this.isInputFormatOpen = true;
        }

        public void closeInputFormat() {
            Assert.assertTrue((!this.isSplitOpen ? 1 : 0) != 0);
            this.isInputFormatOpen = false;
        }

        public void configure(Configuration parameters) {
            Assert.assertTrue((!this.isConfigured ? 1 : 0) != 0);
            this.isConfigured = true;
        }

        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
            return null;
        }

        public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
            Assert.assertTrue((boolean)this.isConfigured);
            InputSplit[] splits = new InputSplit[minNumSplits];
            int i = 0;
            while (i < minNumSplits) {
                final int idx = i++;
                splits[idx] = new InputSplit(){

                    public int getSplitNumber() {
                        return idx;
                    }
                };
            }
            return splits;
        }

        public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
            return null;
        }

        public void open(InputSplit split) throws IOException {
            Assert.assertTrue((boolean)this.isInputFormatOpen);
            Assert.assertTrue((boolean)this.isConfigured);
            Assert.assertTrue((!this.isSplitOpen ? 1 : 0) != 0);
            this.isSplitOpen = true;
            this.eos = false;
        }

        public boolean reachedEnd() throws IOException {
            Assert.assertTrue((boolean)this.isInputFormatOpen);
            Assert.assertTrue((boolean)this.isConfigured);
            Assert.assertTrue((boolean)this.isSplitOpen);
            if (!this.eos) {
                ++this.reachedEndCalls;
            }
            return this.eos;
        }

        public Integer nextRecord(Integer reuse) throws IOException {
            Assert.assertTrue((boolean)this.isInputFormatOpen);
            Assert.assertTrue((boolean)this.isConfigured);
            Assert.assertTrue((boolean)this.isSplitOpen);
            Assert.assertTrue((this.reachedEndCalls == ++this.nextRecordCalls ? 1 : 0) != 0);
            this.eos = true;
            return this.splitCounter++;
        }

        public void close() throws IOException {
            this.isSplitOpen = false;
        }
    }
}

