package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.time.Duration;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest.class */
public class BucketsRollingPolicyTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsRollingPolicyTest$MethodCallCountingPolicyWrapper.class */
    private static class MethodCallCountingPolicyWrapper<IN, BucketID> implements RollingPolicy<IN, BucketID> {
        private static final long serialVersionUID = 1;
        private final RollingPolicy<IN, BucketID> originalPolicy;
        private long onCheckpointCallCounter = 0;
        private long onCheckpointRollCounter = 0;
        private long onEventCallCounter = 0;
        private long onEventRollCounter = 0;
        private long onProcessingTimeCallCounter = 0;
        private long onProcessingTimeRollCounter = 0;

        MethodCallCountingPolicyWrapper(RollingPolicy<IN, BucketID> rollingPolicy) {
            this.originalPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileInfo) throws IOException {
            boolean shouldRollOnCheckpoint = this.originalPolicy.shouldRollOnCheckpoint(partFileInfo);
            this.onCheckpointCallCounter++;
            if (shouldRollOnCheckpoint) {
                this.onCheckpointRollCounter++;
            }
            return shouldRollOnCheckpoint;
        }

        public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileInfo, IN in) throws IOException {
            boolean shouldRollOnEvent = this.originalPolicy.shouldRollOnEvent(partFileInfo, in);
            this.onEventCallCounter++;
            if (shouldRollOnEvent) {
                this.onEventRollCounter++;
            }
            return shouldRollOnEvent;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileInfo, long j) throws IOException {
            boolean shouldRollOnProcessingTime = this.originalPolicy.shouldRollOnProcessingTime(partFileInfo, j);
            this.onProcessingTimeCallCounter++;
            if (shouldRollOnProcessingTime) {
                this.onProcessingTimeRollCounter++;
            }
            return shouldRollOnProcessingTime;
        }

        void verifyCallCounters(long j, long j2, long j3, long j4, long j5, long j6) {
            Assert.assertEquals(j, this.onCheckpointCallCounter);
            Assert.assertEquals(j2, this.onCheckpointRollCounter);
            Assert.assertEquals(j3, this.onEventCallCounter);
            Assert.assertEquals(j4, this.onEventRollCounter);
            Assert.assertEquals(j5, this.onProcessingTimeCallCounter);
            Assert.assertEquals(j6, this.onProcessingTimeRollCounter);
        }
    }

    @Test
    public void testDefaultRollingPolicy() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        MethodCallCountingPolicyWrapper methodCallCountingPolicyWrapper = new MethodCallCountingPolicyWrapper(DefaultRollingPolicy.builder().withMaxPartSize(new MemorySize(10L)).withInactivityInterval(Duration.ofMillis(4L)).withRolloverInterval(Duration.ofMillis(11L)).build());
        Buckets<String, String> createBuckets = createBuckets(path, methodCallCountingPolicyWrapper);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 1L));
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
        createBuckets.onProcessingTime(5L);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L);
        createBuckets.onProcessingTime(7L);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
        createBuckets.onProcessingTime(20L);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
        createBuckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
    }

    @Test
    public void testDefaultRollingPolicyDeprecatedCreate() throws Exception {
        DefaultRollingPolicy build = DefaultRollingPolicy.builder().withInactivityInterval(Duration.ofMillis(10L)).withMaxPartSize(new MemorySize(20L)).withRolloverInterval(Duration.ofMillis(30L)).build();
        Assert.assertEquals(10L, build.getInactivityInterval());
        Assert.assertEquals(20L, build.getMaxPartSize());
        Assert.assertEquals(30L, build.getRolloverInterval());
    }

    @Test
    public void testRollOnCheckpointPolicy() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        MethodCallCountingPolicyWrapper methodCallCountingPolicyWrapper = new MethodCallCountingPolicyWrapper(OnCheckpointRollingPolicy.build());
        Buckets<String, String> createBuckets = createBuckets(path, methodCallCountingPolicyWrapper);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 2L));
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(3L, 1L, 3L));
        createBuckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        methodCallCountingPolicyWrapper.verifyCallCounters(1L, 1L, 2L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(5L, 1L, 5L));
        createBuckets.snapshotState(2L, new TestUtils.MockListState(), new TestUtils.MockListState());
        methodCallCountingPolicyWrapper.verifyCallCounters(2L, 2L, 2L, 0L, 0L, 0L);
        createBuckets.close();
    }

    @Test
    public void testCustomRollingPolicy() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        MethodCallCountingPolicyWrapper methodCallCountingPolicyWrapper = new MethodCallCountingPolicyWrapper(new RollingPolicy<String, String>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketsRollingPolicyTest.1
            private static final long serialVersionUID = 1;

            public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileInfo) {
                return true;
            }

            public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String str) throws IOException {
                return partFileInfo.getSize() > 9;
            }

            public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long j) {
                return j - partFileInfo.getLastUpdateTime() >= 10;
            }

            public /* bridge */ /* synthetic */ boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object obj) throws IOException {
                return shouldRollOnEvent((PartFileInfo<String>) partFileInfo, (String) obj);
            }
        });
        Buckets<String, String> createBuckets = createBuckets(path, methodCallCountingPolicyWrapper);
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(1L, 1L, 2L));
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 2L));
        methodCallCountingPolicyWrapper.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
        createBuckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        methodCallCountingPolicyWrapper.verifyCallCounters(1L, 1L, 2L, 1L, 0L, 0L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(2L, 1L, 5L));
        createBuckets.onProcessingTime(12L);
        methodCallCountingPolicyWrapper.verifyCallCounters(1L, 1L, 2L, 1L, 1L, 0L);
        createBuckets.onProcessingTime(16L);
        methodCallCountingPolicyWrapper.verifyCallCounters(1L, 1L, 2L, 1L, 2L, 1L);
        createBuckets.close();
    }

    private static Buckets<String, String> createBuckets(Path path, MethodCallCountingPolicyWrapper<String, String> methodCallCountingPolicyWrapper) throws IOException {
        return new Buckets<>(path, new TestUtils.StringIdentityBucketAssigner(), new DefaultBucketFactoryImpl(), new RowWiseBucketWriter(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder()), methodCallCountingPolicyWrapper, 0, OutputFileConfig.builder().build());
    }
}
