package org.apache.flink.streaming.api.operators;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.class */
public class StateInitializationContextImplTest {
    static final int NUM_HANDLES = 10;
    private StateInitializationContextImpl initializationContext;
    private CloseableRegistry closableRegistry;
    private int writtenKeyGroups;
    private Set<Integer> writtenOperatorStates;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StateInitializationContextImplTest$ByteStateHandleCloseChecking.class */
    static final class ByteStateHandleCloseChecking extends ByteStreamStateHandle {
        private static final long serialVersionUID = -6201941296931334140L;

        public ByteStateHandleCloseChecking(String str, byte[] bArr) {
            super(str, bArr);
        }

        public FSDataInputStream openInputStream() throws IOException {
            return new FSDataInputStream() { // from class: org.apache.flink.streaming.api.operators.StateInitializationContextImplTest.ByteStateHandleCloseChecking.1
                private int index = 0;
                private boolean closed = false;

                public void seek(long j) throws IOException {
                    Preconditions.checkArgument(j >= 0 && j < 2147483647L);
                    this.index = (int) j;
                }

                public long getPos() throws IOException {
                    return this.index;
                }

                public int read() throws IOException {
                    if (this.closed) {
                        throw new IOException("Stream closed");
                    }
                    if (this.index >= ByteStateHandleCloseChecking.this.data.length) {
                        return -1;
                    }
                    byte[] bArr = ByteStateHandleCloseChecking.this.data;
                    int i = this.index;
                    this.index = i + 1;
                    return bArr[i] & 255;
                }

                public void close() throws IOException {
                    super.close();
                    this.closed = true;
                }
            };
        }
    }

    @Before
    public void setUp() throws Exception {
        this.writtenKeyGroups = 0;
        this.writtenOperatorStates = new HashSet();
        this.closableRegistry = new CloseableRegistry();
        OperatorStateStore operatorStateStore = (OperatorStateStore) Mockito.mock(OperatorStateStore.class);
        ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos(64);
        ArrayList arrayList = new ArrayList(NUM_HANDLES);
        int i = 0;
        int i2 = 0;
        while (i2 < NUM_HANDLES) {
            byteArrayOutputStreamWithPos.reset();
            int i3 = i + (i2 % 4);
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos);
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(i2 == 9 ? KeyGroupRange.EMPTY_KEY_GROUP_RANGE : new KeyGroupRange(i, i3));
            i = i3 + 1;
            Iterator it = keyGroupRangeOffsets.getKeyGroupRange().iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                keyGroupRangeOffsets.setKeyGroupOffset(intValue, byteArrayOutputStreamWithPos.getPosition());
                dataOutputViewStreamWrapper.writeInt(intValue);
                this.writtenKeyGroups++;
            }
            arrayList.add(new KeyGroupsStateHandle(keyGroupRangeOffsets, new ByteStateHandleCloseChecking("kg-" + i2, byteArrayOutputStreamWithPos.toByteArray())));
            i2++;
        }
        ArrayList arrayList2 = new ArrayList(NUM_HANDLES);
        for (int i4 = 0; i4 < NUM_HANDLES; i4++) {
            int i5 = i4 % 4;
            byteArrayOutputStreamWithPos.reset();
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper2 = new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos);
            LongArrayList longArrayList = new LongArrayList(i5);
            for (int i6 = 0; i6 < i5; i6++) {
                longArrayList.add(byteArrayOutputStreamWithPos.getPosition());
                int i7 = (i4 * NUM_HANDLES) + i6;
                dataOutputViewStreamWrapper2.writeInt(i7);
                this.writtenOperatorStates.add(Integer.valueOf(i7));
            }
            HashMap hashMap = new HashMap();
            hashMap.put("_default_", new OperatorStateHandle.StateMetaInfo(longArrayList.toArray(), OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
            arrayList2.add(new OperatorStateHandle(hashMap, new ByteStateHandleCloseChecking("os-" + i4, byteArrayOutputStreamWithPos.toByteArray())));
        }
        this.initializationContext = new StateInitializationContextImpl(true, operatorStateStore, (KeyedStateStore) Mockito.mock(KeyedStateStore.class), arrayList, arrayList2, this.closableRegistry);
    }

    @Test
    public void getOperatorStateStreams() throws Exception {
        int i = 0;
        int i2 = 0;
        for (StatePartitionStreamProvider statePartitionStreamProvider : this.initializationContext.getRawOperatorStateInputs()) {
            if (0 == i % 4) {
                i++;
            }
            Assert.assertNotNull(statePartitionStreamProvider);
            InputStream stream = statePartitionStreamProvider.getStream();
            Throwable th = null;
            try {
                try {
                    Assert.assertEquals((i * NUM_HANDLES) + i2, new DataInputViewStreamWrapper(stream).readInt());
                    if (stream != null) {
                        if (0 != 0) {
                            try {
                                stream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stream.close();
                        }
                    }
                    i2++;
                    if (i2 == i % 4) {
                        i2 = 0;
                        i++;
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (stream != null) {
                    if (th != null) {
                        try {
                            stream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        stream.close();
                    }
                }
                throw th3;
            }
        }
    }

    @Test
    public void getKeyedStateStreams() throws Exception {
        int i = 0;
        for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : this.initializationContext.getRawKeyedStateInputs()) {
            Assert.assertNotNull(keyGroupStatePartitionStreamProvider);
            InputStream stream = keyGroupStatePartitionStreamProvider.getStream();
            Throwable th = null;
            try {
                try {
                    i++;
                    Assert.assertEquals(keyGroupStatePartitionStreamProvider.getKeyGroupId(), new DataInputViewStreamWrapper(stream).readInt());
                    if (stream != null) {
                        if (0 != 0) {
                            try {
                                stream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stream.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (stream != null) {
                        if (th != null) {
                            try {
                                stream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            stream.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        Assert.assertEquals(this.writtenKeyGroups, i);
    }

    @Test
    public void getOperatorStateStore() throws Exception {
        HashSet hashSet = new HashSet();
        for (StatePartitionStreamProvider statePartitionStreamProvider : this.initializationContext.getRawOperatorStateInputs()) {
            Assert.assertNotNull(statePartitionStreamProvider);
            InputStream stream = statePartitionStreamProvider.getStream();
            Throwable th = null;
            try {
                try {
                    Assert.assertTrue(hashSet.add(Integer.valueOf(new DataInputViewStreamWrapper(stream).readInt())));
                    if (stream != null) {
                        if (0 != 0) {
                            try {
                                stream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            stream.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (stream != null) {
                        if (th != null) {
                            try {
                                stream.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            stream.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
        Assert.assertEquals(this.writtenOperatorStates, hashSet);
    }

    @Test
    public void close() throws Exception {
        int i = 0;
        boolean z = false;
        try {
            for (KeyGroupStatePartitionStreamProvider keyGroupStatePartitionStreamProvider : this.initializationContext.getRawKeyedStateInputs()) {
                Assert.assertNotNull(keyGroupStatePartitionStreamProvider);
                if (i == 5) {
                    this.initializationContext.close();
                    z = true;
                }
                InputStream stream = keyGroupStatePartitionStreamProvider.getStream();
                Throwable th = null;
                try {
                    try {
                        try {
                            Assert.assertEquals(keyGroupStatePartitionStreamProvider.getKeyGroupId(), new DataInputViewStreamWrapper(stream).readInt());
                            if (z) {
                                Assert.fail("Close was ignored: stream");
                            }
                            i++;
                        } catch (IOException e) {
                            if (!z) {
                                throw e;
                            }
                        }
                        if (stream != null) {
                            if (0 != 0) {
                                try {
                                    stream.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                stream.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            }
            Assert.fail("Close was ignored: registry");
        } catch (IOException e2) {
            Assert.assertTrue(z);
            Assert.assertEquals(5, i);
        }
    }
}
