/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.io;

import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.junit.Assert;

public class RichInputOutputITCase
extends JavaProgramTestBase {
    private String inputPath;
    private static ConcurrentLinkedQueue<Integer> readCalls;
    private static ConcurrentLinkedQueue<Integer> writeCalls;

    protected void preSubmit() throws Exception {
        this.inputPath = this.createTempFile("input", "ab\ncd\nef\n");
    }

    protected void testProgram() throws Exception {
        readCalls = new ConcurrentLinkedQueue();
        writeCalls = new ConcurrentLinkedQueue();
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.createInput((InputFormat)new TestInputFormat(new Path(this.inputPath))).output((OutputFormat)new TestOutputFormat());
        JobExecutionResult result = env.execute();
        Object a = result.getAllAccumulatorResults().get("DATA_SOURCE_ACCUMULATOR");
        Object b = result.getAllAccumulatorResults().get("DATA_SINK_ACCUMULATOR");
        long recordsRead = (Long)a;
        long recordsWritten = (Long)b;
        Assert.assertEquals((long)recordsRead, (long)readCalls.size());
        Assert.assertEquals((long)recordsWritten, (long)writeCalls.size());
    }

    private static final class TestOutputFormat
    extends RichOutputFormat<String> {
        private LongCounter counter = new LongCounter();

        private TestOutputFormat() {
        }

        public void configure(Configuration parameters) {
        }

        public void open(int a, int b) {
            try {
                this.getRuntimeContext().addAccumulator("DATA_SINK_ACCUMULATOR", (Accumulator)this.counter);
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
        }

        public void close() throws IOException {
        }

        public void writeRecord(String record) {
            writeCalls.add(1);
            this.counter.add(1L);
        }
    }

    private static final class TestInputFormat
    extends TextInputFormat {
        private static final long serialVersionUID = 1L;
        private LongCounter counter = new LongCounter();

        public TestInputFormat(Path filePath) {
            super(filePath);
        }

        public void open(FileInputSplit split) throws IOException {
            try {
                this.getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", (Accumulator)this.counter);
            }
            catch (UnsupportedOperationException unsupportedOperationException) {
                // empty catch block
            }
            super.open(split);
        }

        public String nextRecord(String reuse) throws IOException {
            readCalls.add(1);
            this.counter.add(1L);
            return (String)super.nextRecord((Object)reuse);
        }
    }
}

