package org.apache.flink.runtime.operators;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTaskTest.class */
public class DataSourceTaskTest extends TaskTestBase {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final int MEMORY_MANAGER_SIZE = 1048576;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private List<Record> outList;

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTaskTest$InputFilePreparator.class */
    public static class InputFilePreparator {
        public static void prepareInputFile(MutableObjectIterator<Record> mutableObjectIterator, File file, boolean z) throws IOException {
            BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(file));
            Throwable th = null;
            if (z) {
                try {
                    try {
                        bufferedWriter.write("####_I_AM_INVALID_########\n");
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (bufferedWriter != null) {
                        if (th != null) {
                            try {
                                bufferedWriter.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedWriter.close();
                        }
                    }
                    throw th3;
                }
            }
            Record record = new Record();
            while (true) {
                Record record2 = (Record) mutableObjectIterator.next(record);
                record = record2;
                if (record2 == null) {
                    break;
                }
                bufferedWriter.write(record.getField(0, IntValue.class).getValue() + "_" + record.getField(1, IntValue.class).getValue() + "\n");
            }
            if (z) {
                bufferedWriter.write("####_I_AM_INVALID_########\n");
            }
            bufferedWriter.flush();
            if (bufferedWriter != null) {
                if (0 == 0) {
                    bufferedWriter.close();
                    return;
                }
                try {
                    bufferedWriter.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTaskTest$MockDelayingInputFormat.class */
    public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();

        public Record readRecord(Record record, byte[] bArr, int i, int i2) {
            try {
                Thread.sleep(100L);
                String str = new String(bArr, i, i2, ConfigConstants.DEFAULT_CHARSET);
                try {
                    this.key.setValue(Integer.parseInt(str.substring(0, str.indexOf("_"))));
                    this.value.setValue(Integer.parseInt(str.substring(str.indexOf("_") + 1, str.length())));
                    record.setField(0, this.key);
                    record.setField(1, this.value);
                    return record;
                } catch (RuntimeException e) {
                    return null;
                }
            } catch (InterruptedException e2) {
                return null;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTaskTest$MockFailingInputFormat.class */
    public static class MockFailingInputFormat extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private int cnt = 0;

        public Record readRecord(Record record, byte[] bArr, int i, int i2) {
            if (this.cnt == 10) {
                throw new RuntimeException("Excpected Test Exception.");
            }
            this.cnt++;
            String str = new String(bArr, i, i2, ConfigConstants.DEFAULT_CHARSET);
            try {
                this.key.setValue(Integer.parseInt(str.substring(0, str.indexOf("_"))));
                this.value.setValue(Integer.parseInt(str.substring(str.indexOf("_") + 1, str.length())));
                record.setField(0, this.key);
                record.setField(1, this.value);
                return record;
            } catch (RuntimeException e) {
                return null;
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSourceTaskTest$MockInputFormat.class */
    public static class MockInputFormat extends DelimitedInputFormat<Record> {
        private static final long serialVersionUID = 1;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private boolean opened = false;
        private boolean closed = false;

        public Record readRecord(Record record, byte[] bArr, int i, int i2) {
            String str = new String(bArr, i, i2, ConfigConstants.DEFAULT_CHARSET);
            try {
                this.key.setValue(Integer.parseInt(str.substring(0, str.indexOf("_"))));
                this.value.setValue(Integer.parseInt(str.substring(str.indexOf("_") + 1, str.length())));
                record.setField(0, this.key);
                record.setField(1, this.value);
                return record;
            } catch (RuntimeException e) {
                return null;
            }
        }

        public void openInputFormat() {
            Assert.assertFalse("Invalid status of the input format. Expected for opened: false, Actual: " + this.opened, this.opened);
            this.opened = true;
        }

        public void closeInputFormat() {
            Assert.assertFalse("Invalid status of the input format. Expected for closed: false, Actual: " + this.closed, this.closed);
            this.closed = true;
        }
    }

    @Test
    public void testDataSourceTask() throws IOException {
        this.outList = new ArrayList();
        File file = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(100, 20, false), file, true);
        super.initEnvironment(1048576L, 1024);
        super.addOutput(this.outList);
        DataSourceTask dataSourceTask = new DataSourceTask(this.mockEnv);
        super.registerFileInputTask(dataSourceTask, MockInputFormat.class, file.toURI().toString(), "\n");
        try {
            dataSourceTask.invoke();
        } catch (Exception e) {
            System.err.println(e);
            Assert.fail("Invoke method caused exception.");
        }
        try {
            Field declaredField = DataSourceTask.class.getDeclaredField("format");
            declaredField.setAccessible(true);
            MockInputFormat mockInputFormat = (MockInputFormat) declaredField.get(dataSourceTask);
            Assert.assertTrue("Invalid status of the input format. Expected for opened: true, Actual: " + mockInputFormat.opened, mockInputFormat.opened);
            Assert.assertTrue("Invalid status of the input format. Expected for closed: true, Actual: " + mockInputFormat.closed, mockInputFormat.closed);
        } catch (Exception e2) {
            System.err.println(e2);
            Assert.fail("Reflection error while trying to validate inputFormat status.");
        }
        Assert.assertTrue("Invalid output size. Expected: " + (100 * 20) + " Actual: " + this.outList.size(), this.outList.size() == 100 * 20);
        HashMap hashMap = new HashMap(100);
        for (Record record : this.outList) {
            int value = record.getField(0, IntValue.class).getValue();
            int value2 = record.getField(1, IntValue.class).getValue();
            if (!hashMap.containsKey(Integer.valueOf(value))) {
                hashMap.put(Integer.valueOf(value), new HashSet());
            }
            ((HashSet) hashMap.get(Integer.valueOf(value))).add(Integer.valueOf(value2));
        }
        Assert.assertTrue("Invalid key count in out file. Expected: 100 Actual: " + hashMap.keySet().size(), hashMap.keySet().size() == 100);
        for (Integer num : hashMap.keySet()) {
            Assert.assertTrue("Invalid value count for key: " + num + ". Expected: 20 Actual: " + ((HashSet) hashMap.get(num)).size(), ((HashSet) hashMap.get(num)).size() == 20);
        }
    }

    @Test
    public void testFailingDataSourceTask() throws IOException {
        this.outList = new NirvanaOutputList();
        File file = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(20, 10, false), file, false);
        super.initEnvironment(1048576L, 1024);
        super.addOutput(this.outList);
        DataSourceTask dataSourceTask = new DataSourceTask(this.mockEnv);
        super.registerFileInputTask(dataSourceTask, MockFailingInputFormat.class, file.toURI().toString(), "\n");
        boolean z = false;
        try {
            dataSourceTask.invoke();
        } catch (Exception e) {
            z = true;
        }
        Assert.assertTrue("Function exception was not forwarded.", z);
        Assert.assertTrue("Temp output file does not exist", file.exists());
    }

    @Test
    public void testCancelDataSourceTask() throws IOException {
        super.initEnvironment(1048576L, 1024);
        super.addOutput(new NirvanaOutputList());
        File file = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        InputFilePreparator.prepareInputFile(new UniformRecordGenerator(20, 4, false), file, false);
        final DataSourceTask dataSourceTask = new DataSourceTask(this.mockEnv);
        super.registerFileInputTask(dataSourceTask, MockDelayingInputFormat.class, file.toURI().toString(), "\n");
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.DataSourceTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dataSourceTask.invoke();
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail("Task threw exception although it was properly canceled");
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, (AbstractInvokable) dataSourceTask);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Temp output file does not exist", file.exists());
    }
}
