package org.apache.flink.runtime.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/CrossTaskTest.class */
public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
    private static final long CROSS_MEM = 1048576;
    private final double cross_frac;
    private final DriverTestBase.CountingOutputCollector output;

    /* loaded from: input_file:org/apache/flink/runtime/operators/CrossTaskTest$MockCrossStub.class */
    public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
        private static final long serialVersionUID = 1;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.api.java.record.functions.CrossFunction, org.apache.flink.api.common.functions.CrossFunction
        public Record cross(Record record, Record record2) throws Exception {
            return record;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/CrossTaskTest$MockFailingCrossStub.class */
    public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
        private static final long serialVersionUID = 1;
        private int cnt = 0;

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.flink.api.java.record.functions.CrossFunction, org.apache.flink.api.common.functions.CrossFunction
        public Record cross(Record record, Record record2) {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 10) {
                throw new ExpectedTestException();
            }
            return record;
        }
    }

    public CrossTaskTest(ExecutionConfig executionConfig) {
        super(executionConfig, 1048576L, 0);
        this.output = new DriverTestBase.CountingOutputCollector();
        this.cross_frac = 1048576.0d / getMemoryManager().getMemorySize();
    }

    @Test
    public void testBlock1CrossTask() {
        int i = 10 * 1 * 100 * 4;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testBlock2CrossTask() {
        int i = 10 * 1 * 100 * 4;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testFailingBlockCrossTask() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockFailingCrossStub.class);
            Assert.fail("Exception not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
    }

    @Test
    public void testFailingBlockCrossTask2() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockFailingCrossStub.class);
            Assert.fail("Exception not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
    }

    @Test
    public void testStream1CrossTask() {
        int i = 10 * 1 * 100 * 4;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testStream2CrossTask() {
        int i = 10 * 1 * 100 * 4;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testFailingStreamCrossTask() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockFailingCrossStub.class);
            Assert.fail("Exception not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
    }

    @Test
    public void testFailingStreamCrossTask2() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(100, 4, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockFailingCrossStub.class);
            Assert.fail("Exception not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
    }

    @Test
    public void testStreamEmptyInnerCrossTask() {
        int i = 10 * 1 * 0 * 0;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(0, 0, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testStreamEmptyOuterCrossTask() {
        int i = 10 * 1 * 0 * 0;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(0, 0, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testBlockEmptyInnerCrossTask() {
        int i = 10 * 1 * 0 * 0;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(0, 0, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testBlockEmptyOuterCrossTask() {
        int i = 10 * 1 * 0 * 0;
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new UniformRecordGenerator(0, 0, false));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        try {
            testDriver(new CrossDriver(), MockCrossStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test failed due to an exception.");
        }
        Assert.assertEquals("Wrong result size.", i, this.output.getNumberOfRecords());
    }

    @Test
    public void testCancelBlockCrossTaskInit() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new DelayingInfinitiveInputIterator(100));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver crossDriver = new CrossDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CrossTaskTest.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CrossTaskTest.this.testDriver(crossDriver, MockCrossStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Exception was thrown despite proper canceling.", atomicBoolean.get());
    }

    @Test
    public void testCancelBlockCrossTaskCrossing() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new DelayingInfinitiveInputIterator(100));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver crossDriver = new CrossDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CrossTaskTest.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CrossTaskTest.this.testDriver(crossDriver, MockCrossStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Exception was thrown despite proper canceling.", atomicBoolean.get());
    }

    @Test
    public void testCancelStreamCrossTaskInit() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new DelayingInfinitiveInputIterator(100));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver crossDriver = new CrossDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CrossTaskTest.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CrossTaskTest.this.testDriver(crossDriver, MockCrossStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Exception was thrown despite proper canceling.", atomicBoolean.get());
    }

    @Test
    public void testCancelStreamCrossTaskCrossing() {
        setOutput(this.output);
        addInput(new UniformRecordGenerator(10, 1, false));
        addInput(new DelayingInfinitiveInputIterator(100));
        getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.cross_frac);
        final CrossDriver crossDriver = new CrossDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.CrossTaskTest.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    CrossTaskTest.this.testDriver(crossDriver, MockCrossStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Exception was thrown despite proper canceling.", atomicBoolean.get());
    }
}
