package org.apache.flink.test.manual;

import java.io.Serializable;
import java.util.Iterator;
import java.util.Random;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects.class */
public class OverwriteObjects {
    private static final int NUMBER_OF_ELEMENTS = 3000000;
    private static final int KEY_RANGE = 1000000;
    private static final int MAX_PARALLELISM = 4;
    public static final Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
    private static final long RANDOM_SEED = new Random().nextLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects$OverwriteObjectsCross.class */
    public class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler;

        private OverwriteObjectsCross() {
            this.scrambler = new Scrambler(true);
        }

        public Tuple2<IntValue, IntValue> cross(Tuple2<IntValue, IntValue> tuple2, Tuple2<IntValue, IntValue> tuple22) throws Exception {
            return this.scrambler.scramble(tuple2, tuple22);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects$OverwriteObjectsJoin.class */
    public class OverwriteObjectsJoin implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler;

        private OverwriteObjectsJoin() {
            this.scrambler = new Scrambler(true);
        }

        public Tuple2<IntValue, IntValue> join(Tuple2<IntValue, IntValue> tuple2, Tuple2<IntValue, IntValue> tuple22) throws Exception {
            return this.scrambler.scramble(tuple2, tuple22);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects$OverwriteObjectsReduce.class */
    public class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler;

        public OverwriteObjectsReduce(boolean z) {
            this.scrambler = new Scrambler(z);
        }

        public Tuple2<IntValue, IntValue> reduce(Tuple2<IntValue, IntValue> tuple2, Tuple2<IntValue, IntValue> tuple22) throws Exception {
            return this.scrambler.scramble(tuple2, tuple22);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects$Scrambler.class */
    public static final class Scrambler implements Serializable {
        private final boolean keyed;
        private Tuple2<IntValue, IntValue> d = new Tuple2<>(new IntValue(), new IntValue());
        private Random random = new Random(OverwriteObjects.RANDOM_SEED ^ (-1));

        public Scrambler(boolean z) {
            this.keyed = z;
        }

        public Tuple2<IntValue, IntValue> scramble(Tuple2<IntValue, IntValue> tuple2, Tuple2<IntValue, IntValue> tuple22) {
            Tuple2<IntValue, IntValue> tuple23;
            Random random = new Random(OverwriteObjects.RANDOM_SEED);
            if (tuple2 != null && tuple22 != null) {
                random.setSeed((((IntValue) tuple2.f0).getValue() << 32) + ((IntValue) tuple22.f0).getValue());
            } else if (tuple2 != null) {
                random.setSeed(((IntValue) tuple2.f0).getValue());
            } else {
                if (tuple22 == null) {
                    throw new RuntimeException("One of a or b should be not null");
                }
                random.setSeed(((IntValue) tuple22.f0).getValue());
            }
            switch (random.nextInt(OverwriteObjects.MAX_PARALLELISM)) {
                case 0:
                    tuple23 = tuple2;
                    break;
                case 1:
                    tuple23 = tuple22;
                    break;
                case 2:
                    tuple23 = this.d;
                    break;
                case 3:
                    tuple23 = new Tuple2<>(new IntValue(), new IntValue());
                    break;
                default:
                    throw new RuntimeException("Unexpected value in switch statement");
            }
            if (tuple2 == null || tuple22 == null) {
                if (tuple23 == null) {
                    tuple23 = this.d;
                }
                if (tuple2 == null) {
                    ((IntValue) tuple22.f0).copyTo((IntValue) tuple23.f0);
                    ((IntValue) tuple22.f1).copyTo((IntValue) tuple23.f1);
                } else {
                    ((IntValue) tuple2.f0).copyTo((IntValue) tuple23.f0);
                    ((IntValue) tuple2.f1).copyTo((IntValue) tuple23.f1);
                }
            } else {
                if (this.keyed) {
                    ((IntValue) tuple23.f0).setValue(((IntValue) tuple2.f0).getValue());
                } else {
                    ((IntValue) tuple23.f0).setValue(((IntValue) tuple2.f0).getValue() + ((IntValue) tuple22.f0).getValue());
                }
                ((IntValue) tuple23.f1).setValue(((IntValue) tuple2.f1).getValue() + ((IntValue) tuple22.f1).getValue());
            }
            scrambleIfNot(tuple2, tuple23);
            scrambleIfNot(tuple22, tuple23);
            scrambleIfNot(this.d, tuple23);
            return tuple23;
        }

        private void scrambleIfNot(Tuple2<IntValue, IntValue> tuple2, Object obj) {
            if (tuple2 == null || tuple2 == obj) {
                return;
            }
            ((IntValue) tuple2.f0).setValue(this.random.nextInt());
            ((IntValue) tuple2.f1).setValue(this.random.nextInt());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/manual/OverwriteObjects$TupleIntValueIntValueIterator.class */
    public static final class TupleIntValueIntValueIterator implements Iterator<Tuple2<IntValue, IntValue>>, Serializable {
        private int numElements;
        private final int keyRange;
        private Tuple2<IntValue, IntValue> ret = new Tuple2<>(new IntValue(), new IntValue());
        private final Random rnd = new Random(123);

        public TupleIntValueIntValueIterator(int i, int i2) {
            this.numElements = i;
            this.keyRange = i2;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.numElements > 0;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public Tuple2<IntValue, IntValue> next() {
            this.numElements--;
            ((IntValue) this.ret.f0).setValue(this.rnd.nextInt(this.keyRange));
            ((IntValue) this.ret.f1).setValue(1);
            return this.ret;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    public static void main(String[] strArr) throws Exception {
        new OverwriteObjects().run();
    }

    public void run() throws Exception {
        LOG.info("Random seed = {}", Long.valueOf(RANDOM_SEED));
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.getConfig().disableSysoutLogging();
        for (int i = MAX_PARALLELISM; i > 0; i--) {
            LOG.info("Parallelism = {}", Integer.valueOf(i));
            executionEnvironment.setParallelism(i);
            testReduce(executionEnvironment);
            testGroupedReduce(executionEnvironment);
            testJoin(executionEnvironment);
            testCross(executionEnvironment);
        }
    }

    public void testReduce(ExecutionEnvironment executionEnvironment) throws Exception {
        LOG.info("Testing reduce");
        executionEnvironment.getConfig().enableObjectReuse();
        Tuple2 tuple2 = (Tuple2) getDataSet(executionEnvironment).reduce(new OverwriteObjectsReduce(false)).collect().get(0);
        executionEnvironment.getConfig().disableObjectReuse();
        Tuple2 tuple22 = (Tuple2) getDataSet(executionEnvironment).reduce(new OverwriteObjectsReduce(false)).collect().get(0);
        Assert.assertEquals(3000000L, ((IntValue) tuple2.f1).getValue());
        Assert.assertEquals(3000000L, ((IntValue) tuple22.f1).getValue());
        Assert.assertEquals(tuple22, tuple2);
    }

    public void testGroupedReduce(ExecutionEnvironment executionEnvironment) throws Exception {
        LOG.info("Testing grouped reduce");
        executionEnvironment.getConfig().enableObjectReuse();
        Utils.ChecksumHashCode checksumHashCode = DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).groupBy(new int[]{0}).reduce(new OverwriteObjectsReduce(true)));
        executionEnvironment.getConfig().disableObjectReuse();
        Assert.assertEquals(DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).groupBy(new int[]{0}).reduce(new OverwriteObjectsReduce(true))), checksumHashCode);
    }

    public void testJoin(ExecutionEnvironment executionEnvironment) throws Exception {
        for (JoinOperatorBase.JoinHint joinHint : JoinOperatorBase.JoinHint.values()) {
            if (joinHint != JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES) {
                LOG.info("Testing inner join with JoinHint = {}", joinHint);
                executionEnvironment.getConfig().enableObjectReuse();
                Utils.ChecksumHashCode checksumHashCode = DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).join(getDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin()));
                executionEnvironment.getConfig().disableObjectReuse();
                Assert.assertEquals("JoinHint=" + joinHint, DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).join(getDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin())), checksumHashCode);
                if (joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST) {
                    LOG.info("Testing left outer join with JoinHint = {}", joinHint);
                    executionEnvironment.getConfig().enableObjectReuse();
                    Utils.ChecksumHashCode checksumHashCode2 = DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).leftOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin()));
                    executionEnvironment.getConfig().disableObjectReuse();
                    Assert.assertEquals("JoinHint=" + joinHint, DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).leftOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin())), checksumHashCode2);
                }
                if (joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) {
                    LOG.info("Testing right outer join with JoinHint = {}", joinHint);
                    executionEnvironment.getConfig().enableObjectReuse();
                    Utils.ChecksumHashCode checksumHashCode3 = DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).rightOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin()));
                    executionEnvironment.getConfig().disableObjectReuse();
                    Assert.assertEquals("JoinHint=" + joinHint, DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).rightOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin())), checksumHashCode3);
                }
                if (joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST && joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) {
                    LOG.info("Testing full outer join with JoinHint = {}", joinHint);
                    executionEnvironment.getConfig().enableObjectReuse();
                    Utils.ChecksumHashCode checksumHashCode4 = DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).fullOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin()));
                    executionEnvironment.getConfig().disableObjectReuse();
                    Assert.assertEquals("JoinHint=" + joinHint, DataSetUtils.checksumHashCode(getDataSet(executionEnvironment).fullOuterJoin(getFilteredDataSet(executionEnvironment), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with(new OverwriteObjectsJoin())), checksumHashCode4);
                }
            }
        }
    }

    public void testCross(ExecutionEnvironment executionEnvironment) throws Exception {
        LOG.info("Testing cross");
        DataSet<Tuple2<IntValue, IntValue>> dataSet = getDataSet(executionEnvironment, 100, 20);
        DataSet<Tuple2<IntValue, IntValue>> dataSet2 = getDataSet(executionEnvironment, 10000, 2000);
        executionEnvironment.getConfig().enableObjectReuse();
        Utils.ChecksumHashCode checksumHashCode = DataSetUtils.checksumHashCode(dataSet.crossWithHuge(dataSet2).with(new OverwriteObjectsCross()));
        Assert.assertEquals(checksumHashCode, DataSetUtils.checksumHashCode(dataSet.crossWithTiny(dataSet2).with(new OverwriteObjectsCross())));
        executionEnvironment.getConfig().disableObjectReuse();
        Utils.ChecksumHashCode checksumHashCode2 = DataSetUtils.checksumHashCode(dataSet.crossWithHuge(dataSet2).with(new OverwriteObjectsCross()));
        Assert.assertEquals(checksumHashCode2, DataSetUtils.checksumHashCode(dataSet.crossWithTiny(dataSet2).with(new OverwriteObjectsCross())));
        Assert.assertEquals(checksumHashCode, checksumHashCode2);
    }

    private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment executionEnvironment, int i, int i2) {
        return executionEnvironment.fromCollection(new TupleIntValueIntValueIterator(i, i2), TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo(new Class[]{IntValue.class, IntValue.class}));
    }

    private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment executionEnvironment) {
        return getDataSet(executionEnvironment, NUMBER_OF_ELEMENTS, KEY_RANGE);
    }

    private DataSet<Tuple2<IntValue, IntValue>> getFilteredDataSet(ExecutionEnvironment executionEnvironment) {
        return getDataSet(executionEnvironment).filter(new FilterFunction<Tuple2<IntValue, IntValue>>() { // from class: org.apache.flink.test.manual.OverwriteObjects.1
            public boolean filter(Tuple2<IntValue, IntValue> tuple2) throws Exception {
                return ((IntValue) tuple2.f0).getValue() % 2 == 0;
            }
        });
    }
}
