package org.apache.flink.statefun.flink.core.logger;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.statefun.flink.core.logger.UnboundedFeedbackLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest.class */
public class UnboundedFeedbackLoggerTest {
    private static IOManagerAsync IO_MANAGER;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/logger/UnboundedFeedbackLoggerTest$NoopStreamOps.class */
    public static final class NoopStreamOps implements CheckpointedStreamOperations {
        private final int maxParallelism;

        NoopStreamOps(int i) {
            this.maxParallelism = i;
        }

        public void requireKeyedStateCheckpointed(OutputStream outputStream) {
        }

        public Iterable<Integer> keyGroupList(OutputStream outputStream) {
            IntStream range = IntStream.range(0, this.maxParallelism);
            range.getClass();
            return range::iterator;
        }

        public void startNewKeyGroup(OutputStream outputStream, int i) {
        }

        public Closeable acquireLease(OutputStream outputStream) {
            return () -> {
            };
        }
    }

    @BeforeClass
    public static void beforeClass() {
        IO_MANAGER = new IOManagerAsync();
    }

    @AfterClass
    public static void afterClass() throws Exception {
        if (IO_MANAGER != null) {
            IO_MANAGER.close();
            IO_MANAGER = null;
        }
    }

    @Test
    public void sanity() {
        UnboundedFeedbackLogger<Integer> instanceUnderTest = instanceUnderTest(128, 1L);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        instanceUnderTest.startLogging(byteArrayOutputStream);
        instanceUnderTest.commit();
        Assert.assertThat(Integer.valueOf(byteArrayOutputStream.size()), Matchers.greaterThan(0));
    }

    @Test(expected = IllegalStateException.class)
    public void commitWithoutStartLoggingShouldBeIllegal() {
        instanceUnderTest(128, 1L).commit();
    }

    @Test
    public void roundTrip() throws Exception {
        roundTrip(100, 1024);
    }

    @Test
    public void roundTripWithoutElements() throws Exception {
        roundTrip(0, 1024);
    }

    @Test
    @Ignore
    public void roundTripWithSpill() throws Exception {
        roundTrip(1000000, 0);
    }

    @Test
    public void roundTripWithHeader() throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(32);
        UnboundedFeedbackLogger.Header.writeHeader(dataOutputSerializer);
        dataOutputSerializer.writeInt(123);
        dataOutputSerializer.writeInt(456);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(UnboundedFeedbackLogger.Header.skipHeaderSilently(new ByteArrayInputStream(dataOutputSerializer.getCopyOfBuffer())));
        Assert.assertThat(Integer.valueOf(dataInputViewStreamWrapper.readInt()), CoreMatchers.is(123));
        Assert.assertThat(Integer.valueOf(dataInputViewStreamWrapper.readInt()), CoreMatchers.is(456));
    }

    @Test
    public void roundTripWithoutHeader() throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(32);
        dataOutputSerializer.writeInt(123);
        dataOutputSerializer.writeInt(456);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(UnboundedFeedbackLogger.Header.skipHeaderSilently(new ByteArrayInputStream(dataOutputSerializer.getCopyOfBuffer())));
        Assert.assertThat(Integer.valueOf(dataInputViewStreamWrapper.readInt()), CoreMatchers.is(123));
        Assert.assertThat(Integer.valueOf(dataInputViewStreamWrapper.readInt()), CoreMatchers.is(456));
    }

    @Test
    public void emptyKeyGroupWithHeader() throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(32);
        UnboundedFeedbackLogger.Header.writeHeader(dataOutputSerializer);
        Assert.assertThat(Integer.valueOf(new DataInputViewStreamWrapper(UnboundedFeedbackLogger.Header.skipHeaderSilently(new ByteArrayInputStream(dataOutputSerializer.getCopyOfBuffer()))).read()), CoreMatchers.is(-1));
    }

    @Test
    public void emptyKeyGroupWithoutHeader() throws IOException {
        Assert.assertThat(Integer.valueOf(new DataInputViewStreamWrapper(UnboundedFeedbackLogger.Header.skipHeaderSilently(new ByteArrayInputStream(new byte[0]))).read()), CoreMatchers.is(-1));
    }

    private void roundTrip(int i, int i2) throws Exception {
        ByteArrayInputStream serializeKeyGroup = serializeKeyGroup(1, i2, i);
        ArrayList arrayList = new ArrayList();
        UnboundedFeedbackLogger<Integer> instanceUnderTest = instanceUnderTest(1, 0L);
        arrayList.getClass();
        instanceUnderTest.replyLoggedEnvelops(serializeKeyGroup, (v1) -> {
            r2.add(v1);
        });
        for (int i3 = 0; i3 < i; i3++) {
            Assert.assertThat((Integer) arrayList.get(i3), CoreMatchers.is(Integer.valueOf(i3)));
        }
    }

    private ByteArrayInputStream serializeKeyGroup(int i, long j, int i2) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        UnboundedFeedbackLogger<Integer> instanceUnderTest = instanceUnderTest(i, j);
        instanceUnderTest.startLogging(byteArrayOutputStream);
        for (int i3 = 0; i3 < i2; i3++) {
            instanceUnderTest.append(Integer.valueOf(i3));
        }
        instanceUnderTest.commit();
        return new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
    }

    private UnboundedFeedbackLogger<Integer> instanceUnderTest(int i, long j) {
        ObjectContainer unboundedSpillableLoggerContainer = Loggers.unboundedSpillableLoggerContainer(IO_MANAGER, i, j, IntSerializer.INSTANCE, Function.identity());
        unboundedSpillableLoggerContainer.add("checkpoint-stream-ops", CheckpointedStreamOperations.class, new NoopStreamOps(i));
        return ((UnboundedFeedbackLoggerFactory) unboundedSpillableLoggerContainer.get(UnboundedFeedbackLoggerFactory.class)).create();
    }
}
