package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordPairComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/JoinTaskTest.class */
public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
    private static final long HASH_MEM = 6291456;
    private static final long SORT_MEM = 3145728;
    private static final int NUM_SORTER = 2;
    private static final long BNLJN_MEM = 327680;
    private final double bnljn_frac;
    private final double hash_frac;
    private final RecordComparator comparator1;
    private final RecordComparator comparator2;
    private final List<Record> outList;

    /* loaded from: input_file:org/apache/flink/runtime/operators/JoinTaskTest$MockDelayingMatchStub.class */
    public static final class MockDelayingMatchStub implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
            }
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/JoinTaskTest$MockFailingMatchStub.class */
    public static final class MockFailingMatchStub implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1;
        private int cnt = 0;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 10) {
                throw new ExpectedTestException();
            }
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/JoinTaskTest$MockMatchStub.class */
    public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1;

        public void join(Record record, Record record2, Collector<Record> collector) throws Exception {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void join(Object obj, Object obj2, Collector collector) throws Exception {
            join((Record) obj, (Record) obj2, (Collector<Record>) collector);
        }
    }

    public JoinTaskTest(ExecutionConfig executionConfig) {
        super(executionConfig, HASH_MEM, NUM_SORTER, SORT_MEM);
        this.comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        this.comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        this.outList = new ArrayList();
        this.bnljn_frac = 327680.0d / getMemoryManager().getMemorySize();
        this.hash_frac = 6291456.0d / getMemoryManager().getMemorySize();
    }

    @Test
    public void testSortBoth1MatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator1.m467duplicate());
            addInputSorted(new UniformRecordGenerator(10, NUM_SORTER, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = NUM_SORTER * Math.min(20, 10);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortBoth2MatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator1.m467duplicate());
            addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 1 * 1 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortBoth3MatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator1.m467duplicate());
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 1 * 20 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortBoth4MatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator1.m467duplicate());
            addInputSorted(new UniformRecordGenerator(20, 1, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 20 * 1 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortBoth5MatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator1.m467duplicate());
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 20 * 20 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortFirstMatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator1.m467duplicate());
            addInput(new UniformRecordGenerator(20, 20, true));
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 20 * 20 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testSortSecondMatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        try {
            addInput(new UniformRecordGenerator(20, 20, true));
            addInputSorted(new UniformRecordGenerator(20, 20, false), this.comparator2.m467duplicate());
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 20 * 20 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testMergeMatchTask() {
        setOutput(this.outList);
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        addInput(new UniformRecordGenerator(20, 20, true));
        addInput(new UniformRecordGenerator(20, 20, true));
        try {
            testDriver(joinDriver, MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
        int min = 20 * 20 * Math.min(20, 20);
        Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + min, this.outList.size() == min);
        this.outList.clear();
    }

    @Test
    public void testFailingMatchTask() {
        setOutput(new NirvanaOutputList());
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        setNumFileHandlesForSort(4);
        JoinDriver joinDriver = new JoinDriver();
        addInput(new UniformRecordGenerator(20, 20, true));
        addInput(new UniformRecordGenerator(20, 20, true));
        try {
            testDriver(joinDriver, MockFailingMatchStub.class);
            Assert.fail("Driver did not forward Exception.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("The test caused an exception.");
        }
    }

    @Test
    public void testCancelMatchTaskWhileSort1() {
        try {
            setOutput(new NirvanaOutputList());
            addDriverComparator(this.comparator1);
            addDriverComparator(this.comparator2);
            getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
            getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            setNumFileHandlesForSort(4);
            final JoinDriver joinDriver = new JoinDriver();
            try {
                addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.m467duplicate());
                addInput(new UniformRecordGenerator(20, 20, true));
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("The test caused an exception.");
            }
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread("Task runner for testCancelMatchTaskWhileSort1()") { // from class: org.apache.flink.runtime.operators.JoinTaskTest.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            cancel();
            thread.interrupt();
            thread.join(60000L);
            Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                th.printStackTrace();
                Assert.fail("Error in task while canceling: " + th.getMessage());
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCancelMatchTaskWhileSort2() {
        try {
            setOutput(new NirvanaOutputList());
            addDriverComparator(this.comparator1);
            addDriverComparator(this.comparator2);
            getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
            getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            setNumFileHandlesForSort(4);
            final JoinDriver joinDriver = new JoinDriver();
            try {
                addInput(new UniformRecordGenerator(20, 20, true));
                addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.m467duplicate());
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("The test caused an exception.");
            }
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread("Task runner for testCancelMatchTaskWhileSort2()") { // from class: org.apache.flink.runtime.operators.JoinTaskTest.2
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            cancel();
            thread.interrupt();
            thread.join(60000L);
            Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                th.printStackTrace();
                Assert.fail("Error in task while canceling: " + th.getMessage());
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testCancelMatchTaskWhileMatching() {
        try {
            setOutput(new NirvanaOutputList());
            addDriverComparator(this.comparator1);
            addDriverComparator(this.comparator2);
            getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
            getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
            getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
            setNumFileHandlesForSort(4);
            final JoinDriver joinDriver = new JoinDriver();
            addInput(new UniformRecordGenerator(20, 20, true));
            addInput(new UniformRecordGenerator(20, 20, true));
            final AtomicReference atomicReference = new AtomicReference();
            Thread thread = new Thread("Task runner for testCancelMatchTaskWhileMatching()") { // from class: org.apache.flink.runtime.operators.JoinTaskTest.3
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver(joinDriver, MockDelayingMatchStub.class);
                    } catch (Throwable th) {
                        atomicReference.set(th);
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            cancel();
            thread.interrupt();
            thread.join(60000L);
            Assert.assertFalse("Task thread did not finish within 60 seconds", thread.isAlive());
            Throwable th = (Throwable) atomicReference.get();
            if (th != null) {
                th.printStackTrace();
                Assert.fail("Error in task while canceling: " + th.getMessage());
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testHash1MatchTask() {
        addInput(new UniformRecordGenerator(20, 1, false));
        addInput(new UniformRecordGenerator(10, NUM_SORTER, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", 1 * NUM_SORTER * Math.min(20, 10), this.outList.size());
        this.outList.clear();
    }

    @Test
    public void testHash2MatchTask() {
        addInput(new UniformRecordGenerator(20, 1, false));
        addInput(new UniformRecordGenerator(20, 1, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", 1 * 1 * Math.min(20, 20), this.outList.size());
        this.outList.clear();
    }

    @Test
    public void testHash3MatchTask() {
        addInput(new UniformRecordGenerator(20, 1, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", 1 * 20 * Math.min(20, 20), this.outList.size());
        this.outList.clear();
    }

    @Test
    public void testHash4MatchTask() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 1, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", 20 * 1 * Math.min(20, 20), this.outList.size());
        this.outList.clear();
    }

    @Test
    public void testHash5MatchTask() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(this.outList);
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockMatchStub.class);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
        Assert.assertEquals("Wrong result set size.", 20 * 20 * Math.min(20, 20), this.outList.size());
        this.outList.clear();
    }

    @Test
    public void testFailingHashFirstMatchTask() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(new NirvanaOutputList());
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockFailingMatchStub.class);
            Assert.fail("Function exception was not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
    }

    @Test
    public void testFailingHashSecondMatchTask() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(new NirvanaOutputList());
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        try {
            testDriver(new JoinDriver(), MockFailingMatchStub.class);
            Assert.fail("Function exception was not forwarded.");
        } catch (ExpectedTestException e) {
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail("Test caused an exception.");
        }
    }

    @Test
    public void testCancelHashMatchTaskWhileBuildFirst() {
        try {
            addInput(new DelayingInfinitiveInputIterator(100));
            addInput(new UniformRecordGenerator(20, 20, false));
            addDriverComparator(this.comparator1);
            addDriverComparator(this.comparator2);
            getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
            setOutput(new NirvanaOutputList());
            getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
            getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
            final JoinDriver joinDriver = new JoinDriver();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.JoinTaskTest.4
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                        atomicBoolean.set(true);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            cancel();
            try {
                thread.join();
            } catch (InterruptedException e) {
                Assert.fail("Joining threads failed");
            }
            Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testHashCancelMatchTaskWhileBuildSecond() {
        try {
            addInput(new UniformRecordGenerator(20, 20, false));
            addInput(new DelayingInfinitiveInputIterator(100));
            addDriverComparator(this.comparator1);
            addDriverComparator(this.comparator2);
            getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
            setOutput(new NirvanaOutputList());
            getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
            getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
            final JoinDriver joinDriver = new JoinDriver();
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.JoinTaskTest.5
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                        atomicBoolean.set(true);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            thread.start();
            Thread.sleep(1000L);
            cancel();
            try {
                thread.join();
            } catch (InterruptedException e) {
                Assert.fail("Joining threads failed");
            }
            Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testHashFirstCancelMatchTaskWhileMatching() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(new NirvanaOutputList());
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        final JoinDriver joinDriver = new JoinDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.JoinTaskTest.6
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
    }

    @Test
    public void testHashSecondCancelMatchTaskWhileMatching() {
        addInput(new UniformRecordGenerator(20, 20, false));
        addInput(new UniformRecordGenerator(20, 20, false));
        addDriverComparator(this.comparator1);
        addDriverComparator(this.comparator2);
        getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
        setOutput(new NirvanaOutputList());
        getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        final JoinDriver joinDriver = new JoinDriver();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread thread = new Thread() { // from class: org.apache.flink.runtime.operators.JoinTaskTest.7
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    JoinTaskTest.this.testDriver(joinDriver, MockMatchStub.class);
                    atomicBoolean.set(true);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        };
        thread.start();
        TaskCancelThread taskCancelThread = new TaskCancelThread(1, thread, this);
        taskCancelThread.start();
        try {
            taskCancelThread.join();
            thread.join();
        } catch (InterruptedException e) {
            Assert.fail("Joining threads failed");
        }
        Assert.assertTrue("Test threw an exception even though it was properly canceled.", atomicBoolean.get());
    }
}
