package org.apache.flink.runtime.operators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({Task.class, ResultPartitionWriter.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
/* loaded from: input_file:org/apache/flink/runtime/operators/DataSinkTaskTest.class */
public class DataSinkTaskTest extends TaskTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
    private static final int MEMORY_MANAGER_SIZE = 3145728;
    private static final int NETWORK_BUFFER_SIZE = 1024;
    private final String tempTestPath = constructTestPath(DataSinkTaskTest.class, "dst_test");

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSinkTaskTest$MockFailingOutputFormat.class */
    public static class MockFailingOutputFormat extends MockOutputFormat {
        private static final long serialVersionUID = 1;
        int cnt = 0;

        @Override // org.apache.flink.runtime.operators.DataSinkTaskTest.MockOutputFormat
        public void configure(Configuration configuration) {
            super.configure(configuration);
        }

        @Override // org.apache.flink.runtime.operators.DataSinkTaskTest.MockOutputFormat
        public void writeRecord(Record record) throws IOException {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 10) {
                throw new RuntimeException("Expected Test Exception");
            }
            super.writeRecord(record);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/DataSinkTaskTest$MockOutputFormat.class */
    public static class MockOutputFormat extends FileOutputFormat<Record> {
        private static final long serialVersionUID = 1;
        final StringBuilder bld = new StringBuilder();

        public void configure(Configuration configuration) {
            super.configure(configuration);
        }

        @Override // 
        public void writeRecord(Record record) throws IOException {
            IntValue field = record.getField(0, IntValue.class);
            IntValue field2 = record.getField(1, IntValue.class);
            this.bld.setLength(0);
            this.bld.append(field.getValue());
            this.bld.append('_');
            this.bld.append(field2.getValue());
            this.bld.append('\n');
            this.stream.write(this.bld.toString().getBytes());
        }
    }

    @After
    public void cleanUp() {
        File file = new File(this.tempTestPath);
        if (file.exists()) {
            file.delete();
        }
    }

    @Test
    public void testDataSinkTask() {
        FileReader fileReader = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
                super.addInput(new UniformRecordGenerator(100, 20, false), 0);
                DataSinkTask dataSinkTask = new DataSinkTask();
                super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockOutputFormat.class, new File(this.tempTestPath).toURI().toString());
                dataSinkTask.invoke();
                File file = new File(this.tempTestPath);
                Assert.assertTrue("Temp output file does not exist", file.exists());
                fileReader = new FileReader(file);
                bufferedReader = new BufferedReader(fileReader);
                HashMap hashMap = new HashMap(100);
                while (bufferedReader.ready()) {
                    String readLine = bufferedReader.readLine();
                    Integer valueOf = Integer.valueOf(Integer.parseInt(readLine.substring(0, readLine.indexOf("_"))));
                    Integer valueOf2 = Integer.valueOf(Integer.parseInt(readLine.substring(readLine.indexOf("_") + 1, readLine.length())));
                    if (!hashMap.containsKey(valueOf)) {
                        hashMap.put(valueOf, new HashSet());
                    }
                    ((HashSet) hashMap.get(valueOf)).add(valueOf2);
                }
                Assert.assertTrue("Invalid key count in out file. Expected: 100 Actual: " + hashMap.keySet().size(), hashMap.keySet().size() == 100);
                for (Integer num : hashMap.keySet()) {
                    Assert.assertTrue("Invalid value count for key: " + num + ". Expected: 20 Actual: " + ((HashSet) hashMap.get(num)).size(), ((HashSet) hashMap.get(num)).size() == 20);
                }
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th) {
                    }
                }
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Throwable th2) {
                    }
                }
            } catch (Throwable th3) {
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                    }
                }
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Throwable th5) {
                    }
                }
                throw th3;
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (Throwable th6) {
                }
            }
            if (fileReader != null) {
                try {
                    fileReader.close();
                } catch (Throwable th7) {
                }
            }
        }
    }

    @Test
    public void testUnionDataSinkTask() {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        IteratorWrappingTestSingleInputGate[] iteratorWrappingTestSingleInputGateArr = {super.addInput(new UniformRecordGenerator(10, 20, 0, 0, false), 0, false), super.addInput(new UniformRecordGenerator(10, 20, 10, 0, false), 0, false), super.addInput(new UniformRecordGenerator(10, 20, 10 * 2, 0, false), 0, false), super.addInput(new UniformRecordGenerator(10, 20, 10 * 3, 0, false), 0, false)};
        DataSinkTask dataSinkTask = new DataSinkTask();
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        try {
            for (IteratorWrappingTestSingleInputGate iteratorWrappingTestSingleInputGate : iteratorWrappingTestSingleInputGateArr) {
                iteratorWrappingTestSingleInputGate.notifyNonEmpty();
            }
            dataSinkTask.invoke();
        } catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", e);
            Assert.fail("Invoke method caused exception.");
        }
        File file = new File(this.tempTestPath);
        Assert.assertTrue("Temp output file does not exist", file.exists());
        FileReader fileReader = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                try {
                    fileReader = new FileReader(file);
                    bufferedReader = new BufferedReader(fileReader);
                    HashMap hashMap = new HashMap(10);
                    while (bufferedReader.ready()) {
                        String readLine = bufferedReader.readLine();
                        Integer valueOf = Integer.valueOf(Integer.parseInt(readLine.substring(0, readLine.indexOf("_"))));
                        Integer valueOf2 = Integer.valueOf(Integer.parseInt(readLine.substring(readLine.indexOf("_") + 1, readLine.length())));
                        if (!hashMap.containsKey(valueOf)) {
                            hashMap.put(valueOf, new HashSet());
                        }
                        ((HashSet) hashMap.get(valueOf)).add(valueOf2);
                    }
                    Assert.assertTrue("Invalid key count in out file. Expected: 10 Actual: " + hashMap.keySet().size(), hashMap.keySet().size() == 10 * 4);
                    for (Integer num : hashMap.keySet()) {
                        Assert.assertTrue("Invalid value count for key: " + num + ". Expected: 20 Actual: " + ((HashSet) hashMap.get(num)).size(), ((HashSet) hashMap.get(num)).size() == 20);
                    }
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th) {
                        }
                    }
                    if (fileReader != null) {
                        try {
                            fileReader.close();
                        } catch (Throwable th2) {
                        }
                    }
                } catch (Throwable th3) {
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th4) {
                        }
                    }
                    if (fileReader != null) {
                        try {
                            fileReader.close();
                        } catch (Throwable th5) {
                        }
                    }
                    throw th3;
                }
            } catch (IOException e2) {
                Assert.fail("Caught IOE while reading out file");
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th6) {
                    }
                }
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Throwable th7) {
                    }
                }
            }
        } catch (FileNotFoundException e3) {
            Assert.fail("Out file got lost...");
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (Throwable th8) {
                }
            }
            if (fileReader != null) {
                try {
                    fileReader.close();
                } catch (Throwable th9) {
                }
            }
        }
    }

    @Test
    public void testSortingDataSinkTask() {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        super.addInput(new UniformRecordGenerator(100, 20, true), 0);
        DataSinkTask dataSinkTask = new DataSinkTask();
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator(new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, 1.0d);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        try {
            dataSinkTask.invoke();
        } catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", e);
            Assert.fail("Invoke method caused exception.");
        }
        File file = new File(this.tempTestPath);
        Assert.assertTrue("Temp output file does not exist", file.exists());
        FileReader fileReader = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                fileReader = new FileReader(file);
                bufferedReader = new BufferedReader(fileReader);
                HashSet hashSet = new HashSet();
                int i = -1;
                while (bufferedReader.ready()) {
                    String readLine = bufferedReader.readLine();
                    Integer valueOf = Integer.valueOf(Integer.parseInt(readLine.substring(0, readLine.indexOf("_"))));
                    Integer valueOf2 = Integer.valueOf(Integer.parseInt(readLine.substring(readLine.indexOf("_") + 1, readLine.length())));
                    Assert.assertTrue("Values not in ascending order", valueOf2.intValue() >= i);
                    if (valueOf2.intValue() > i) {
                        if (i != -1) {
                            Assert.assertTrue("Keys missing for value", hashSet.size() == 100);
                        }
                        hashSet.clear();
                        i = valueOf2.intValue();
                    }
                    Assert.assertTrue("Duplicate key for value", hashSet.add(valueOf));
                }
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th) {
                    }
                }
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Throwable th2) {
                    }
                }
            } catch (Throwable th3) {
                if (bufferedReader != null) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th4) {
                    }
                }
                if (fileReader != null) {
                    try {
                        fileReader.close();
                    } catch (Throwable th5) {
                    }
                }
                throw th3;
            }
        } catch (FileNotFoundException e2) {
            Assert.fail("Out file got lost...");
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (Throwable th6) {
                }
            }
            if (fileReader != null) {
                try {
                    fileReader.close();
                } catch (Throwable th7) {
                }
            }
        } catch (IOException e3) {
            Assert.fail("Caught IOE while reading out file");
            if (bufferedReader != null) {
                try {
                    bufferedReader.close();
                } catch (Throwable th8) {
                }
            }
            if (fileReader != null) {
                try {
                    fileReader.close();
                } catch (Throwable th9) {
                }
            }
        }
    }

    @Test
    public void testFailingDataSinkTask() {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        super.addInput(new UniformRecordGenerator(100, 20, false), 0);
        DataSinkTask dataSinkTask = new DataSinkTask();
        super.getTaskConfig().setStubParameters(new Configuration());
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockFailingOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        boolean z = false;
        try {
            dataSinkTask.invoke();
        } catch (Exception e) {
            z = true;
        }
        Assert.assertTrue("Function exception was not forwarded.", z);
        Assert.assertFalse("Temp output file has not been removed", new File(this.tempTestPath).exists());
    }

    @Test
    public void testFailingSortingDataSinkTask() {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        super.addInput(new UniformRecordGenerator(100, 20, true), 0);
        DataSinkTask dataSinkTask = new DataSinkTask();
        super.getTaskConfig().setStubParameters(new Configuration());
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator(new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, 1.0d);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockFailingOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        boolean z = false;
        try {
            dataSinkTask.invoke();
        } catch (Exception e) {
            z = true;
        }
        Assert.assertTrue("Function exception was not forwarded.", z);
        Assert.assertFalse("Temp output file has not been removed", new File(this.tempTestPath).exists());
    }

    @Test
    public void testCancelDataSinkTask() throws Exception {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask dataSinkTask = new DataSinkTask();
        super.getTaskConfig().setStubParameters(new Configuration());
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.DataSinkTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dataSinkTask.invoke();
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail("Task threw exception although it was properly canceled");
                }
            }
        };
        thread.start();
        File file = new File(this.tempTestPath);
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (!file.exists() && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(10L);
        }
        Assert.assertTrue("Task did not create file within 60 seconds", file.exists());
        Thread.sleep(500L);
        dataSinkTask.cancel();
        thread.interrupt();
        thread.join();
        Assert.assertFalse("Temp output file has not been removed", file.exists());
    }

    @Test
    public void testCancelSortingDataSinkTask() {
        super.initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask dataSinkTask = new DataSinkTask();
        super.getTaskConfig().setStubParameters(new Configuration());
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator(new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, 1.0d);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        super.registerFileOutputTask((AbstractInvokable) dataSinkTask, MockOutputFormat.class, new File(this.tempTestPath).toURI().toString());
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.DataSinkTaskTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    dataSinkTask.invoke();
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail("Task threw exception although it was properly canceled");
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(2, thread, (AbstractInvokable) dataSinkTask);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
    }

    public static String constructTestPath(Class<?> cls, String str) {
        String property = System.getProperty("java.io.tmpdir");
        if (!property.endsWith("/") && !property.endsWith("\\")) {
            property = property + System.getProperty("file.separator");
        }
        return property + cls.getName() + "-" + str;
    }

    public static String constructTestURI(Class<?> cls, String str) {
        return new File(constructTestPath(cls, str)).toURI().toString();
    }
}
