/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.record.io;

import java.io.IOException;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.record.io.ExternalProcessFixedLengthInputFormat;
import org.apache.flink.api.java.record.io.ExternalProcessInputSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.OperatingSystem;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ExternalProcessFixedLengthInputFormatTest {
    private ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> format;
    private final String neverEndingCommand = "cat /dev/urandom";
    private final String thousandRecordsCommand = "dd if=/dev/zero bs=8 count=1000";
    private final String incompleteRecordsCommand = "dd if=/dev/zero bs=7 count=2";
    private final String failingCommand = "ls /I/do/not/exist";

    @Before
    public void prepare() {
        this.format = new MyExternalProcessTestInputFormat();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testOpen() {
        if (OperatingSystem.isWindows()) {
            return;
        }
        Configuration config = new Configuration();
        config.setInteger("pact.input.recordLength", 8);
        ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
        boolean processDestroyed = false;
        try {
            this.format.configure(config);
            this.format.open((GenericInputSplit)split);
            String[] cmd = new String[]{"/bin/sh", "-c", "ps aux | grep -v grep | grep \"cat /dev/urandom\" | wc -l"};
            byte[] wcOut = new byte[128];
            Process p = Runtime.getRuntime().exec(cmd);
            p.getInputStream().read(wcOut);
            int pCnt = Integer.parseInt(new String(wcOut).trim());
            Assert.assertTrue((pCnt > 0 ? 1 : 0) != 0);
            this.format.close();
        }
        catch (IOException e) {
            Assert.fail();
        }
        catch (RuntimeException e) {
            if (e.getMessage().equals("External process was destroyed although stream was not fully read.")) {
                processDestroyed = true;
            }
        }
        finally {
            Assert.assertTrue((boolean)processDestroyed);
        }
    }

    @Test
    public void testCheckExitCode() {
        boolean invalidExitCode;
        block10: {
            ExternalProcessInputSplit split;
            Configuration config;
            block9: {
                if (OperatingSystem.isWindows()) {
                    return;
                }
                config = new Configuration();
                config.setInteger("pact.input.recordLength", 8);
                split = new ExternalProcessInputSplit(1, 1, "ls /I/do/not/exist");
                this.format.configure(config);
                invalidExitCode = false;
                try {
                    this.format.open((GenericInputSplit)split);
                    this.format.waitForProcessToFinish();
                    this.format.close();
                }
                catch (IOException e) {
                    Assert.fail();
                }
                catch (InterruptedException e) {
                    Assert.fail();
                }
                catch (RuntimeException e) {
                    if (!e.getMessage().startsWith("External process did not finish with an allowed exit code:")) break block9;
                    invalidExitCode = true;
                }
            }
            Assert.assertTrue((boolean)invalidExitCode);
            invalidExitCode = false;
            config.setString("pact.input.externalProcess.allowedExitCodes", "0,1,2");
            this.format.configure(config);
            try {
                this.format.open((GenericInputSplit)split);
                Thread.sleep(100L);
                this.format.close();
            }
            catch (IOException e) {
                Assert.fail();
            }
            catch (InterruptedException e) {
                Assert.fail();
            }
            catch (RuntimeException e) {
                if (!e.getMessage().startsWith("External process did not finish with an allowed exit code:")) break block10;
                invalidExitCode = true;
            }
        }
        Assert.assertTrue((!invalidExitCode ? 1 : 0) != 0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Test
    public void testUserCodeTermination() {
        if (OperatingSystem.isWindows()) {
            return;
        }
        Configuration config = new Configuration();
        config.setInteger("pact.input.recordLength", 8);
        config.setInteger("test.failingCount", 100);
        ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.neverEndingCommand);
        Record record = new Record();
        boolean userException = false;
        boolean processDestroyed = false;
        try {
            this.format.configure(config);
            this.format.open((GenericInputSplit)split);
            while (!this.format.reachedEnd()) {
                try {
                    this.format.nextRecord(record);
                }
                catch (RuntimeException re) {
                    userException = true;
                    break;
                }
            }
            this.format.close();
            Assert.assertTrue((userException && processDestroyed ? 1 : 0) != 0);
        }
        catch (IOException e) {
            Assert.fail();
            Assert.assertTrue((userException && processDestroyed ? 1 : 0) != 0);
        }
        catch (RuntimeException e2) {
            if (e2.getMessage().equals("External process was destroyed although stream was not fully read.")) {
                processDestroyed = true;
            }
            Assert.assertTrue((userException && processDestroyed ? 1 : 0) != 0);
            {
                catch (Throwable throwable) {
                    Assert.assertTrue((userException && processDestroyed ? 1 : 0) != 0);
                    throw throwable;
                }
            }
        }
    }

    @Test
    public void testReadStream() {
        if (OperatingSystem.isWindows()) {
            return;
        }
        Configuration config = new Configuration();
        config.setInteger("pact.input.recordLength", 8);
        ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.thousandRecordsCommand);
        Record record = new Record();
        int cnt = 0;
        try {
            this.format.configure(config);
            this.format.open((GenericInputSplit)split);
            while (!this.format.reachedEnd()) {
                if (this.format.nextRecord(record) == null) continue;
                ++cnt;
            }
            this.format.close();
        }
        catch (IOException e) {
            Assert.fail();
        }
        catch (RuntimeException e) {
            Assert.fail((String)e.getMessage());
        }
        Assert.assertTrue((cnt == 1000 ? 1 : 0) != 0);
    }

    @Test
    public void testReadInvalidStream() {
        if (OperatingSystem.isWindows()) {
            return;
        }
        Configuration config = new Configuration();
        config.setInteger("pact.input.recordLength", 8);
        ExternalProcessInputSplit split = new ExternalProcessInputSplit(1, 1, this.incompleteRecordsCommand);
        Record record = new Record();
        boolean incompleteRecordDetected = false;
        int cnt = 0;
        try {
            this.format.configure(config);
            this.format.open((GenericInputSplit)split);
            while (!this.format.reachedEnd()) {
                if (this.format.nextRecord(record) == null) continue;
                ++cnt;
            }
            this.format.close();
        }
        catch (IOException e) {
            Assert.fail();
        }
        catch (RuntimeException e) {
            if (e.getMessage().equals("External process produced incomplete record")) {
                incompleteRecordDetected = true;
            }
            Assert.fail((String)e.getMessage());
        }
        Assert.assertTrue((boolean)incompleteRecordDetected);
    }

    private final class MyExternalProcessTestInputFormat
    extends ExternalProcessFixedLengthInputFormat<ExternalProcessInputSplit> {
        private static final long serialVersionUID = 1L;
        public static final String FAILCOUNT_PARAMETER_KEY = "test.failingCount";
        private long cnt = 0L;
        private int failCnt;

        private MyExternalProcessTestInputFormat() {
        }

        public void configure(Configuration parameters) {
            super.configure(parameters);
            this.failCnt = parameters.getInteger(FAILCOUNT_PARAMETER_KEY, Integer.MAX_VALUE);
        }

        public boolean readBytes(Record record, byte[] bytes, int startPos) {
            if (this.cnt == (long)this.failCnt) {
                throw new RuntimeException("This is a test exception!");
            }
            int v1 = 0;
            v1 |= 0xFF & bytes[startPos + 0];
            v1 = v1 << 8 | 0xFF & bytes[startPos + 1];
            v1 = v1 << 8 | 0xFF & bytes[startPos + 2];
            v1 = v1 << 8 | 0xFF & bytes[startPos + 3];
            int v2 = 0;
            v2 |= 0xFF & bytes[startPos + 4];
            v2 = v2 << 8 | 0xFF & bytes[startPos + 5];
            v2 = v2 << 8 | 0xFF & bytes[startPos + 6];
            v2 = v2 << 8 | 0xFF & bytes[startPos + 7];
            record.setField(0, (Value)new IntValue(v1));
            record.setField(1, (Value)new IntValue(v2));
            ++this.cnt;
            return true;
        }

        public ExternalProcessInputSplit[] createInputSplits(int minNumSplits) throws IOException {
            return null;
        }

        public DefaultInputSplitAssigner getInputSplitAssigner(GenericInputSplit[] splits) {
            return new DefaultInputSplitAssigner((InputSplit[])splits);
        }

        public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
            return null;
        }
    }
}

