/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.operators.shipping.OutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;

public class OutputEmitterTest {
    @Test
    public void testPartitionHash() {
        RecordComparator intComp = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int numChans = 100;
        int numRecs = 50000;
        int[] hit = new int[numChans];
        for (int i = 0; i < numRecs; ++i) {
            int[] chans;
            IntValue intValue = new IntValue(i);
            Record rec = new Record((Value)intValue);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe1.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n2 = chan = nArray[j];
                hit[n2] = hit[n2] + 1;
            }
        }
        int cnt = 0;
        for (int aHit : hit) {
            Assert.assertTrue((aHit > 0 ? 1 : 0) != 0);
            cnt += aHit;
        }
        Assert.assertTrue((cnt == numRecs ? 1 : 0) != 0);
        RecordComparator recordComparator = new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).createComparator();
        OutputEmitter oe2 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)recordComparator);
        numChans = 100;
        numRecs = 10000;
        hit = new int[numChans];
        for (int i = 0; i < numRecs; ++i) {
            int[] chans;
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe2.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n3 = chan = nArray[j];
                hit[n3] = hit[n3] + 1;
            }
        }
        cnt = 0;
        for (int aHit : hit) {
            Assert.assertTrue((aHit > 0 ? 1 : 0) != 0);
            cnt += aHit;
        }
        Assert.assertTrue((cnt == numRecs ? 1 : 0) != 0);
        TestIntComparator testIntComp = new TestIntComparator();
        OutputEmitter oe3 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)testIntComp);
        SerializationDelegate intDel = new SerializationDelegate((TypeSerializer)new IntSerializer());
        numChans = 100;
        intDel.setInstance((Object)Integer.MIN_VALUE);
        int[] chans = oe3.selectChannels((IOReadableWritable)intDel, numChans);
        Assert.assertTrue((chans.length == 1 ? 1 : 0) != 0);
        Assert.assertTrue((chans[0] >= 0 && chans[0] <= numChans - 1 ? 1 : 0) != 0);
        intDel.setInstance((Object)-1);
        chans = oe3.selectChannels((IOReadableWritable)intDel, hit.length);
        Assert.assertTrue((chans.length == 1 ? 1 : 0) != 0);
        Assert.assertTrue((chans[0] >= 0 && chans[0] <= numChans - 1 ? 1 : 0) != 0);
        intDel.setInstance((Object)0);
        chans = oe3.selectChannels((IOReadableWritable)intDel, hit.length);
        Assert.assertTrue((chans.length == 1 ? 1 : 0) != 0);
        Assert.assertTrue((chans[0] >= 0 && chans[0] <= numChans - 1 ? 1 : 0) != 0);
        intDel.setInstance((Object)1);
        chans = oe3.selectChannels((IOReadableWritable)intDel, hit.length);
        Assert.assertTrue((chans.length == 1 ? 1 : 0) != 0);
        Assert.assertTrue((chans[0] >= 0 && chans[0] <= numChans - 1 ? 1 : 0) != 0);
        intDel.setInstance((Object)Integer.MAX_VALUE);
        chans = oe3.selectChannels((IOReadableWritable)intDel, hit.length);
        Assert.assertTrue((chans.length == 1 ? 1 : 0) != 0);
        Assert.assertTrue((chans[0] >= 0 && chans[0] <= numChans - 1 ? 1 : 0) != 0);
    }

    @Test
    public void testForward() {
        int i;
        int i2;
        RecordComparator intComp = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.FORWARD, (TypeComparator)intComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int numChannels = 100;
        int numRecords = 50000 + numChannels / 2;
        int[] hit = new int[numChannels];
        for (i2 = 0; i2 < numRecords; ++i2) {
            int[] chans;
            IntValue k = new IntValue(i2);
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe1.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n2 = chan = nArray[j];
                hit[n2] = hit[n2] + 1;
            }
        }
        Assert.assertTrue((hit[0] == numRecords ? 1 : 0) != 0);
        for (i2 = 1; i2 < hit.length; ++i2) {
            Assert.assertTrue((hit[i2] == 0 ? 1 : 0) != 0);
        }
        RecordComparator stringComp = new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).createComparator();
        OutputEmitter oe2 = new OutputEmitter(ShipStrategyType.FORWARD, (TypeComparator)stringComp);
        numChannels = 100;
        numRecords = 10000 + numChannels / 2;
        hit = new int[numChannels];
        for (i = 0; i < numRecords; ++i) {
            int[] chans;
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe2.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n3 = chan = nArray[j];
                hit[n3] = hit[n3] + 1;
            }
        }
        Assert.assertTrue((hit[0] == numRecords ? 1 : 0) != 0);
        for (i = 1; i < hit.length; ++i) {
            Assert.assertTrue((hit[i] == 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testForcedRebalance() {
        int i;
        int numChannels = 100;
        int toTaskIndex = numChannels * 6 / 7;
        int fromTaskIndex = toTaskIndex + numChannels;
        int extraRecords = numChannels / 3;
        int numRecords = 50000 + extraRecords;
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int[] hit = new int[numChannels];
        for (int i2 = 0; i2 < numRecords; ++i2) {
            int[] chans;
            IntValue k = new IntValue(i2);
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe1.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n2 = chan = nArray[j];
                hit[n2] = hit[n2] + 1;
            }
        }
        int cnt = 0;
        for (int i3 = 0; i3 < hit.length; ++i3) {
            if (toTaskIndex <= i3 || i3 < toTaskIndex + extraRecords - numChannels) {
                Assert.assertTrue((hit[i3] == numRecords / numChannels + 1 ? 1 : 0) != 0);
            } else {
                Assert.assertTrue((hit[i3] == numRecords / numChannels ? 1 : 0) != 0);
            }
            cnt += hit[i3];
        }
        Assert.assertTrue((cnt == numRecords ? 1 : 0) != 0);
        numChannels = 100;
        toTaskIndex = numChannels / 5;
        fromTaskIndex = toTaskIndex + 2 * numChannels;
        extraRecords = numChannels * 2 / 9;
        numRecords = 10000 + extraRecords;
        OutputEmitter oe2 = new OutputEmitter(ShipStrategyType.PARTITION_FORCED_REBALANCE, fromTaskIndex);
        hit = new int[numChannels];
        for (i = 0; i < numRecords; ++i) {
            int[] chans;
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe2.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n3 = chan = nArray[j];
                hit[n3] = hit[n3] + 1;
            }
        }
        cnt = 0;
        for (i = 0; i < hit.length; ++i) {
            if (toTaskIndex <= i && i < toTaskIndex + extraRecords) {
                Assert.assertTrue((hit[i] == numRecords / numChannels + 1 ? 1 : 0) != 0);
            } else {
                Assert.assertTrue((hit[i] == numRecords / numChannels ? 1 : 0) != 0);
            }
            cnt += hit[i];
        }
        Assert.assertTrue((cnt == numRecords ? 1 : 0) != 0);
    }

    @Test
    public void testBroadcast() {
        RecordComparator intComp = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.BROADCAST, (TypeComparator)intComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int numChannels = 100;
        int numRecords = 50000;
        int[] hit = new int[numChannels];
        for (int i = 0; i < numRecords; ++i) {
            int[] chans;
            IntValue k = new IntValue(i);
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe1.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n2 = chan = nArray[j];
                hit[n2] = hit[n2] + 1;
            }
        }
        for (int aHit : hit) {
            Assert.assertTrue((String)(aHit + ""), (aHit == numRecords ? 1 : 0) != 0);
        }
        RecordComparator stringComp = new RecordComparatorFactory(new int[]{0}, new Class[]{StringValue.class}).createComparator();
        OutputEmitter oe2 = new OutputEmitter(ShipStrategyType.BROADCAST, (TypeComparator)stringComp);
        numChannels = 100;
        numRecords = 5000;
        hit = new int[numChannels];
        for (int i = 0; i < numRecords; ++i) {
            int[] chans;
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe2.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n3 = chan = nArray[j];
                hit[n3] = hit[n3] + 1;
            }
        }
        for (int aHit : hit) {
            Assert.assertTrue((String)(aHit + ""), (aHit == numRecords ? 1 : 0) != 0);
        }
    }

    @Test
    public void testMultiKeys() {
        RecordComparator multiComp = new RecordComparatorFactory(new int[]{0, 1, 3}, new Class[]{IntValue.class, StringValue.class, DoubleValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)multiComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        int numChannels = 100;
        int numRecords = 5000;
        int[] hit = new int[numChannels];
        for (int i = 0; i < numRecords; ++i) {
            int[] chans;
            Record rec = new Record(4);
            rec.setField(0, (Value)new IntValue(i));
            rec.setField(1, (Value)new StringValue((CharSequence)("AB" + i + "CD" + i)));
            rec.setField(3, (Value)new DoubleValue((double)i * 3.141));
            delegate.setInstance((Object)rec);
            int[] nArray = chans = oe1.selectChannels((IOReadableWritable)delegate, hit.length);
            int n = nArray.length;
            for (int j = 0; j < n; ++j) {
                int chan;
                int n2 = chan = nArray[j];
                hit[n2] = hit[n2] + 1;
            }
        }
        int cnt = 0;
        for (int aHit : hit) {
            Assert.assertTrue((aHit > 0 ? 1 : 0) != 0);
            cnt += aHit;
        }
        Assert.assertTrue((cnt == numRecords ? 1 : 0) != 0);
    }

    @Test
    public void testMissingKey() {
        RecordComparator intComp = new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record rec = new Record(0);
        rec.setField(0, (Value)new IntValue(1));
        delegate.setInstance((Object)rec);
        try {
            oe1.selectChannels((IOReadableWritable)delegate, 100);
        }
        catch (KeyFieldOutOfBoundsException re) {
            Assert.assertEquals((long)1L, (long)re.getFieldNumber());
            return;
        }
        Assert.fail((String)"Expected a KeyFieldOutOfBoundsException.");
    }

    @Test
    public void testNullKey() {
        RecordComparator intComp = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record rec = new Record(2);
        rec.setField(1, (Value)new IntValue(1));
        delegate.setInstance((Object)rec);
        try {
            oe1.selectChannels((IOReadableWritable)delegate, 100);
        }
        catch (NullKeyFieldException re) {
            Assert.assertEquals((long)0L, (long)re.getFieldNumber());
            return;
        }
        Assert.fail((String)"Expected a NullKeyFieldException.");
    }

    @Test
    public void testWrongKeyClass() {
        RecordComparator doubleComp = new RecordComparatorFactory(new int[]{0}, new Class[]{DoubleValue.class}).createComparator();
        OutputEmitter oe1 = new OutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)doubleComp);
        SerializationDelegate delegate = new SerializationDelegate(new RecordSerializerFactory().getSerializer());
        Record rec = null;
        try {
            PipedInputStream pipedInput = new PipedInputStream(0x100000);
            DataInputViewStreamWrapper in = new DataInputViewStreamWrapper((InputStream)pipedInput);
            DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper((OutputStream)new PipedOutputStream(pipedInput));
            rec = new Record(1);
            rec.setField(0, (Value)new IntValue());
            rec.write((DataOutputView)out);
            rec = new Record();
            rec.read((DataInputView)in);
        }
        catch (IOException e) {
            Assert.fail((String)"Test erroneous");
        }
        try {
            delegate.setInstance(rec);
            oe1.selectChannels((IOReadableWritable)delegate, 100);
        }
        catch (DeserializationException re) {
            return;
        }
        Assert.fail((String)"Expected a NullKeyFieldException.");
    }

    private static class TestIntComparator
    extends TypeComparator<Integer> {
        private TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};

        private TestIntComparator() {
        }

        public int hash(Integer record) {
            return record;
        }

        public void setReference(Integer toCompare) {
            throw new UnsupportedOperationException();
        }

        public boolean equalToReference(Integer candidate) {
            throw new UnsupportedOperationException();
        }

        public int compareToReference(TypeComparator<Integer> referencedComparator) {
            throw new UnsupportedOperationException();
        }

        public int compare(Integer first, Integer second) {
            throw new UnsupportedOperationException();
        }

        public int compareSerialized(DataInputView firstSource, DataInputView secondSource) {
            throw new UnsupportedOperationException();
        }

        public boolean supportsNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public boolean supportsSerializationWithKeyNormalization() {
            throw new UnsupportedOperationException();
        }

        public int getNormalizeKeyLen() {
            throw new UnsupportedOperationException();
        }

        public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
            throw new UnsupportedOperationException();
        }

        public void putNormalizedKey(Integer record, MemorySegment target, int offset, int numBytes) {
            throw new UnsupportedOperationException();
        }

        public void writeWithKeyNormalization(Integer record, DataOutputView target) throws IOException {
            throw new UnsupportedOperationException();
        }

        public Integer readWithKeyDenormalization(Integer reuse, DataInputView source) throws IOException {
            throw new UnsupportedOperationException();
        }

        public boolean invertNormalizedKey() {
            throw new UnsupportedOperationException();
        }

        public TypeComparator<Integer> duplicate() {
            throw new UnsupportedOperationException();
        }

        public int extractKeys(Object record, Object[] target, int index) {
            target[index] = record;
            return 1;
        }

        public TypeComparator[] getFlatComparators() {
            return this.comparators;
        }
    }
}

