package org.apache.flink.statefun.flink.core.feedback;

import java.io.ByteArrayOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
import org.apache.flink.util.Preconditions;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/CheckpointsTest.class */
public class CheckpointsTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/CheckpointsTest$FakeLogger.class */
    public static final class FakeLogger implements FeedbackLogger<String> {
        List<String> items;
        LoggerState state;

        private FakeLogger() {
            this.items = new ArrayList();
            this.state = LoggerState.IDLE;
        }

        public void startLogging(OutputStream outputStream) {
            Preconditions.checkState(this.state == LoggerState.IDLE);
            this.state = LoggerState.LOGGING;
        }

        public void append(String str) {
            Preconditions.checkState(this.state != LoggerState.COMMITTED);
            Preconditions.checkState(this.state != LoggerState.CLOSED);
            this.items.add(str);
        }

        public void commit() {
            Preconditions.checkState(this.state == LoggerState.LOGGING);
            this.state = LoggerState.COMMITTED;
        }

        public void close() {
            this.state = LoggerState.CLOSED;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/CheckpointsTest$LoggerState.class */
    public enum LoggerState {
        IDLE,
        LOGGING,
        COMMITTED,
        CLOSED
    }

    /* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/CheckpointsTest$Loggers.class */
    private static final class Loggers implements Supplier<FeedbackLogger<String>> {
        private final List<FakeLogger> loggers;

        private Loggers() {
            this.loggers = new ArrayList();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.function.Supplier
        public FeedbackLogger<String> get() {
            FakeLogger fakeLogger = new FakeLogger();
            this.loggers.add(fakeLogger);
            return fakeLogger;
        }

        List<String> items(int i) {
            Preconditions.checkElementIndex(i, this.loggers.size());
            return this.loggers.get(i).items;
        }

        LoggerState state(int i) {
            Preconditions.checkElementIndex(i, this.loggers.size());
            return this.loggers.get(i).state;
        }
    }

    @Test
    public void usageExample() {
        Loggers loggers = new Loggers();
        Checkpoints checkpoints = new Checkpoints(loggers);
        checkpoints.startLogging(1L, new ByteArrayOutputStream());
        checkpoints.append("hello");
        checkpoints.append("world");
        checkpoints.commitCheckpointsUntil(1L);
        Assert.assertThat(loggers.items(0), Matchers.contains(new String[]{"hello", "world"}));
        Assert.assertThat(loggers.state(0), Matchers.is(LoggerState.COMMITTED));
    }

    @Test
    public void dataIsAppendedToMultipleLoggers() {
        Loggers loggers = new Loggers();
        Checkpoints checkpoints = new Checkpoints(loggers);
        checkpoints.startLogging(1L, new ByteArrayOutputStream());
        checkpoints.append("a");
        checkpoints.startLogging(2L, new ByteArrayOutputStream());
        checkpoints.append("b");
        checkpoints.commitCheckpointsUntil(1L);
        checkpoints.append("c");
        checkpoints.commitCheckpointsUntil(2L);
        Assert.assertThat(loggers.items(0), Matchers.contains(new String[]{"a", "b"}));
        Assert.assertThat(loggers.items(1), Matchers.contains(new String[]{"b", "c"}));
    }

    @Test
    public void committingALaterCheckpointCommitsPreviousCheckpoints() {
        Loggers loggers = new Loggers();
        Checkpoints checkpoints = new Checkpoints(loggers);
        checkpoints.startLogging(1L, new ByteArrayOutputStream());
        checkpoints.startLogging(2L, new ByteArrayOutputStream());
        checkpoints.commitCheckpointsUntil(2L);
        Assert.assertThat(loggers.state(0), Matchers.is(LoggerState.COMMITTED));
        Assert.assertThat(loggers.state(1), Matchers.is(LoggerState.COMMITTED));
    }
}
