package org.apache.flink.runtime.operators.util;

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/operators/util/OutputEmitterTest.class */
public class OutputEmitterTest {

    /* loaded from: input_file:org/apache/flink/runtime/operators/util/OutputEmitterTest$TestIntComparator.class */
    private static class TestIntComparator extends TypeComparator<Integer> {
        private TypeComparator[] comparators;

        private TestIntComparator() {
            this.comparators = new TypeComparator[]{new IntComparator(true)};
        }

        public int hash(Integer num) {
            return num.intValue();
        }

        public void setReference(Integer num) {
            throw new UnsupportedOperationException();
        }

        public boolean equalToReference(Integer num) {
            throw new UnsupportedOperationException();
        }

        public int compareToReference(TypeComparator<Integer> typeComparator) {
            throw new UnsupportedOperationException();
        }

        public int compare(Integer num, Integer num2) {
            throw new UnsupportedOperationException();
        }

        public int compareSerialized(DataInputView dataInputView, DataInputView dataInputView2) {
            throw new UnsupportedOperationException();
        }

        public boolean supportsNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public boolean supportsSerializationWithKeyNormalization() {
            throw new UnsupportedOperationException();
        }

        public int getNormalizeKeyLen() {
            throw new UnsupportedOperationException();
        }

        public boolean isNormalizedKeyPrefixOnly(int i) {
            throw new UnsupportedOperationException();
        }

        public void putNormalizedKey(Integer num, MemorySegment memorySegment, int i, int i2) {
            throw new UnsupportedOperationException();
        }

        public void writeWithKeyNormalization(Integer num, DataOutputView dataOutputView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Integer readWithKeyDenormalization(Integer num, DataInputView dataInputView) throws IOException {
            throw new UnsupportedOperationException();
        }

        public boolean invertNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public TypeComparator<Integer> duplicate() {
            throw new UnsupportedOperationException();
        }

        public int extractKeys(Object obj, Object[] objArr, int i) {
            objArr[i] = obj;
            return 1;
        }

        public TypeComparator[] getFlatComparators() {
            return this.comparators;
        }
    }

    @Test
    public void testPartitionHash() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] iArr = new int[100];
        for (int i = 0; i < 50000; i++) {
            serializationDelegate.setInstance(new Record(new IntValue(i)));
            for (int i2 : outputEmitter.selectChannels(serializationDelegate, iArr.length)) {
                iArr[i2] = iArr[i2] + 1;
            }
        }
        int i3 = 0;
        int length = iArr.length;
        for (int i4 = 0; i4 < length; i4++) {
            int i5 = iArr[i4];
            Assert.assertTrue(i5 > 0);
            i3 += i5;
        }
        Assert.assertTrue(i3 == 50000);
        OutputEmitter outputEmitter2 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).m292createComparator());
        int[] iArr2 = new int[100];
        for (int i6 = 0; i6 < 10000; i6++) {
            serializationDelegate.setInstance(new Record(new StringValue(i6 + "")));
            for (int i7 : outputEmitter2.selectChannels(serializationDelegate, iArr2.length)) {
                iArr2[i7] = iArr2[i7] + 1;
            }
        }
        int i8 = 0;
        int length2 = iArr2.length;
        for (int i9 = 0; i9 < length2; i9++) {
            int i10 = iArr2[i9];
            Assert.assertTrue(i10 > 0);
            i8 += i10;
        }
        Assert.assertTrue(i8 == 10000);
        OutputEmitter outputEmitter3 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new TestIntComparator());
        SerializationDelegate serializationDelegate2 = new SerializationDelegate(new IntSerializer());
        serializationDelegate2.setInstance(Integer.MIN_VALUE);
        int[] selectChannels = outputEmitter3.selectChannels(serializationDelegate2, 100);
        Assert.assertTrue(selectChannels.length == 1);
        Assert.assertTrue(selectChannels[0] >= 0 && selectChannels[0] <= 100 - 1);
        serializationDelegate2.setInstance(-1);
        int[] selectChannels2 = outputEmitter3.selectChannels(serializationDelegate2, iArr2.length);
        Assert.assertTrue(selectChannels2.length == 1);
        Assert.assertTrue(selectChannels2[0] >= 0 && selectChannels2[0] <= 100 - 1);
        serializationDelegate2.setInstance(0);
        int[] selectChannels3 = outputEmitter3.selectChannels(serializationDelegate2, iArr2.length);
        Assert.assertTrue(selectChannels3.length == 1);
        Assert.assertTrue(selectChannels3[0] >= 0 && selectChannels3[0] <= 100 - 1);
        serializationDelegate2.setInstance(1);
        int[] selectChannels4 = outputEmitter3.selectChannels(serializationDelegate2, iArr2.length);
        Assert.assertTrue(selectChannels4.length == 1);
        Assert.assertTrue(selectChannels4[0] >= 0 && selectChannels4[0] <= 100 - 1);
        serializationDelegate2.setInstance(Integer.MAX_VALUE);
        int[] selectChannels5 = outputEmitter3.selectChannels(serializationDelegate2, iArr2.length);
        Assert.assertTrue(selectChannels5.length == 1);
        Assert.assertTrue(selectChannels5[0] >= 0 && selectChannels5[0] <= 100 - 1);
    }

    @Test
    public void testForward() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.FORWARD, new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int i = 50000 + (100 / 2);
        int[] iArr = new int[100];
        for (int i2 = 0; i2 < i; i2++) {
            serializationDelegate.setInstance(new Record(new IntValue(i2)));
            for (int i3 : outputEmitter.selectChannels(serializationDelegate, iArr.length)) {
                iArr[i3] = iArr[i3] + 1;
            }
        }
        Assert.assertTrue(iArr[0] == i);
        for (int i4 = 1; i4 < iArr.length; i4++) {
            Assert.assertTrue(iArr[i4] == 0);
        }
        OutputEmitter outputEmitter2 = new OutputEmitter(ShipStrategyType.FORWARD, new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).m292createComparator());
        int i5 = 10000 + (100 / 2);
        int[] iArr2 = new int[100];
        for (int i6 = 0; i6 < i5; i6++) {
            serializationDelegate.setInstance(new Record(new StringValue(i6 + "")));
            for (int i7 : outputEmitter2.selectChannels(serializationDelegate, iArr2.length)) {
                iArr2[i7] = iArr2[i7] + 1;
            }
        }
        Assert.assertTrue(iArr2[0] == i5);
        for (int i8 = 1; i8 < iArr2.length; i8++) {
            Assert.assertTrue(iArr2[i8] == 0);
        }
    }

    @Test
    public void testForcedRebalance() {
        int i = (100 * 6) / 7;
        int i2 = i + 100;
        int i3 = 100 / 3;
        int i4 = 50000 + i3;
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, i2);
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] iArr = new int[100];
        for (int i5 = 0; i5 < i4; i5++) {
            serializationDelegate.setInstance(new Record(new IntValue(i5)));
            for (int i6 : outputEmitter.selectChannels(serializationDelegate, iArr.length)) {
                iArr[i6] = iArr[i6] + 1;
            }
        }
        int i7 = 0;
        for (int i8 = 0; i8 < iArr.length; i8++) {
            if (i <= i8 || i8 < (i + i3) - 100) {
                Assert.assertTrue(iArr[i8] == (i4 / 100) + 1);
            } else {
                Assert.assertTrue(iArr[i8] == i4 / 100);
            }
            i7 += iArr[i8];
        }
        Assert.assertTrue(i7 == i4);
        int i9 = 100 / 5;
        int i10 = i9 + (2 * 100);
        int i11 = (100 * 2) / 9;
        int i12 = 10000 + i11;
        OutputEmitter outputEmitter2 = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, i10);
        int[] iArr2 = new int[100];
        for (int i13 = 0; i13 < i12; i13++) {
            serializationDelegate.setInstance(new Record(new StringValue(i13 + "")));
            for (int i14 : outputEmitter2.selectChannels(serializationDelegate, iArr2.length)) {
                iArr2[i14] = iArr2[i14] + 1;
            }
        }
        int i15 = 0;
        for (int i16 = 0; i16 < iArr2.length; i16++) {
            if (i9 > i16 || i16 >= i9 + i11) {
                Assert.assertTrue(iArr2[i16] == i12 / 100);
            } else {
                Assert.assertTrue(iArr2[i16] == (i12 / 100) + 1);
            }
            i15 += iArr2[i16];
        }
        Assert.assertTrue(i15 == i12);
    }

    @Test
    public void testBroadcast() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.BROADCAST, new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] iArr = new int[100];
        for (int i = 0; i < 50000; i++) {
            serializationDelegate.setInstance(new Record(new IntValue(i)));
            for (int i2 : outputEmitter.selectChannels(serializationDelegate, iArr.length)) {
                iArr[i2] = iArr[i2] + 1;
            }
        }
        int length = iArr.length;
        for (int i3 = 0; i3 < length; i3++) {
            int i4 = iArr[i3];
            Assert.assertTrue(i4 + "", i4 == 50000);
        }
        OutputEmitter outputEmitter2 = new OutputEmitter(ShipStrategyType.BROADCAST, new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).m292createComparator());
        int[] iArr2 = new int[100];
        for (int i5 = 0; i5 < 5000; i5++) {
            serializationDelegate.setInstance(new Record(new StringValue(i5 + "")));
            for (int i6 : outputEmitter2.selectChannels(serializationDelegate, iArr2.length)) {
                iArr2[i6] = iArr2[i6] + 1;
            }
        }
        int length2 = iArr2.length;
        for (int i7 = 0; i7 < length2; i7++) {
            int i8 = iArr2[i7];
            Assert.assertTrue(i8 + "", i8 == 5000);
        }
    }

    @Test
    public void testMultiKeys() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0, 1, 3}, new Class[]{IntValue.class, StringValue.class, DoubleValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] iArr = new int[100];
        for (int i = 0; i < 5000; i++) {
            Record record = new Record(4);
            record.setField(0, new IntValue(i));
            record.setField(1, new StringValue("AB" + i + "CD" + i));
            record.setField(3, new DoubleValue(i * 3.141d));
            serializationDelegate.setInstance(record);
            for (int i2 : outputEmitter.selectChannels(serializationDelegate, iArr.length)) {
                iArr[i2] = iArr[i2] + 1;
            }
        }
        int i3 = 0;
        int length = iArr.length;
        for (int i4 = 0; i4 < length; i4++) {
            int i5 = iArr[i4];
            Assert.assertTrue(i5 > 0);
            i3 += i5;
        }
        Assert.assertTrue(i3 == 5000);
    }

    @Test
    public void testMissingKey() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record record = new Record(0);
        record.setField(0, new IntValue(1));
        serializationDelegate.setInstance(record);
        try {
            outputEmitter.selectChannels(serializationDelegate, 100);
            Assert.fail("Expected a KeyFieldOutOfBoundsException.");
        } catch (KeyFieldOutOfBoundsException e) {
            Assert.assertEquals(1L, e.getFieldNumber());
        }
    }

    @Test
    public void testNullKey() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record record = new Record(2);
        record.setField(1, new IntValue(1));
        serializationDelegate.setInstance(record);
        try {
            outputEmitter.selectChannels(serializationDelegate, 100);
            Assert.fail("Expected a NullKeyFieldException.");
        } catch (NullKeyFieldException e) {
            Assert.assertEquals(0L, e.getFieldNumber());
        }
    }

    @Test
    public void testWrongKeyClass() {
        OutputEmitter outputEmitter = new OutputEmitter(ShipStrategyType.PARTITION_HASH, new RecordComparatorFactory(new int[]{0}, new Class[]{DoubleValue.class}).m292createComparator());
        SerializationDelegate serializationDelegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record record = null;
        try {
            PipedInputStream pipedInputStream = new PipedInputStream(1048576);
            DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(pipedInputStream);
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(new PipedOutputStream(pipedInputStream));
            Record record2 = new Record(1);
            record2.setField(0, new IntValue());
            record2.write(dataOutputViewStreamWrapper);
            record = new Record();
            record.read(dataInputViewStreamWrapper);
        } catch (IOException e) {
            Assert.fail("Test erroneous");
        }
        try {
            serializationDelegate.setInstance(record);
            outputEmitter.selectChannels(serializationDelegate, 100);
            Assert.fail("Expected a NullKeyFieldException.");
        } catch (DeserializationException e2) {
        }
    }
}
