/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.File;
import java.io.IOException;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWisePartWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RollingPolicyTest {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testDefaultRollingPolicy() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        DefaultRollingPolicy originalRollingPolicy = DefaultRollingPolicy.create().withMaxPartSize(10L).withInactivityInterval(4L).withRolloverInterval(11L).build();
        MethodCallCountingPolicyWrapper<String, String> rollingPolicy = new MethodCallCountingPolicyWrapper<String, String>((RollingPolicy<String, String>)originalRollingPolicy);
        Buckets<String, String> buckets = RollingPolicyTest.createBuckets(path, rollingPolicy);
        rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 1L, 1L));
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 1L, 2L));
        rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(3L, 1L, 3L));
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
        buckets.onProcessingTime(5L);
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 1L, 0L);
        buckets.onProcessingTime(7L);
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 2L, 1L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(3L, 1L, 3L));
        buckets.onProcessingTime(20L);
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
        buckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 3L, 2L);
    }

    @Test
    public void testRollOnCheckpointPolicy() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        MethodCallCountingPolicyWrapper<String, String> rollingPolicy = new MethodCallCountingPolicyWrapper<String, String>((RollingPolicy<String, String>)OnCheckpointRollingPolicy.build());
        Buckets<String, String> buckets = RollingPolicyTest.createBuckets(path, rollingPolicy);
        rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 1L, 2L));
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 1L, 2L));
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(3L, 1L, 3L));
        buckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        rollingPolicy.verifyCallCounters(1L, 1L, 2L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(5L, 1L, 5L));
        buckets.snapshotState(2L, new TestUtils.MockListState(), new TestUtils.MockListState());
        rollingPolicy.verifyCallCounters(2L, 2L, 2L, 0L, 0L, 0L);
        buckets.close();
    }

    @Test
    public void testCustomRollingPolicy() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        MethodCallCountingPolicyWrapper<String, String> rollingPolicy = new MethodCallCountingPolicyWrapper<String, String>(new RollingPolicy<String, String>(){
            private static final long serialVersionUID = 1L;

            public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) {
                return true;
            }

            public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, String element) throws IOException {
                return partFileState.getSize() > 9L;
            }

            public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) {
                return currentTime - partFileState.getLastUpdateTime() >= 10L;
            }
        });
        Buckets<String, String> buckets = RollingPolicyTest.createBuckets(path, rollingPolicy);
        rollingPolicy.verifyCallCounters(0L, 0L, 0L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(1L, 1L, 2L));
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 1L, 2L));
        rollingPolicy.verifyCallCounters(0L, 0L, 1L, 0L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 1L, 2L));
        rollingPolicy.verifyCallCounters(0L, 0L, 2L, 1L, 0L, 0L);
        buckets.snapshotState(1L, new TestUtils.MockListState(), new TestUtils.MockListState());
        rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 0L, 0L);
        buckets.onElement((Object)"test1", (SinkFunction.Context)new TestUtils.MockSinkContext(2L, 1L, 5L));
        buckets.onProcessingTime(12L);
        rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 1L, 0L);
        buckets.onProcessingTime(16L);
        rollingPolicy.verifyCallCounters(1L, 1L, 2L, 1L, 2L, 1L);
        buckets.close();
    }

    private static Buckets<String, String> createBuckets(Path basePath, MethodCallCountingPolicyWrapper<String, String> rollingPolicyToTest) throws IOException {
        return new Buckets(basePath, (BucketAssigner)new TestUtils.StringIdentityBucketAssigner(), (BucketFactory)new DefaultBucketFactoryImpl(), (PartFileWriter.PartFileFactory)new RowWisePartWriter.Factory((Encoder)new SimpleStringEncoder()), rollingPolicyToTest, 0);
    }

    private static class MethodCallCountingPolicyWrapper<IN, BucketID>
    implements RollingPolicy<IN, BucketID> {
        private static final long serialVersionUID = 1L;
        private final RollingPolicy<IN, BucketID> originalPolicy;
        private long onCheckpointCallCounter;
        private long onCheckpointRollCounter;
        private long onEventCallCounter;
        private long onEventRollCounter;
        private long onProcessingTimeCallCounter;
        private long onProcessingTimeRollCounter;

        MethodCallCountingPolicyWrapper(RollingPolicy<IN, BucketID> policy) {
            this.originalPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            this.onCheckpointCallCounter = 0L;
            this.onCheckpointRollCounter = 0L;
            this.onEventCallCounter = 0L;
            this.onEventRollCounter = 0L;
            this.onProcessingTimeCallCounter = 0L;
            this.onProcessingTimeRollCounter = 0L;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileState) throws IOException {
            boolean shouldRoll = this.originalPolicy.shouldRollOnCheckpoint(partFileState);
            ++this.onCheckpointCallCounter;
            if (shouldRoll) {
                ++this.onCheckpointRollCounter;
            }
            return shouldRoll;
        }

        public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileState, IN element) throws IOException {
            boolean shouldRoll = this.originalPolicy.shouldRollOnEvent(partFileState, element);
            ++this.onEventCallCounter;
            if (shouldRoll) {
                ++this.onEventRollCounter;
            }
            return shouldRoll;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileState, long currentTime) throws IOException {
            boolean shouldRoll = this.originalPolicy.shouldRollOnProcessingTime(partFileState, currentTime);
            ++this.onProcessingTimeCallCounter;
            if (shouldRoll) {
                ++this.onProcessingTimeRollCounter;
            }
            return shouldRoll;
        }

        void verifyCallCounters(long onCheckpointCalls, long onCheckpointRolls, long onEventCalls, long onEventRolls, long onProcessingTimeCalls, long onProcessingTimeRolls) {
            Assert.assertEquals((long)onCheckpointCalls, (long)this.onCheckpointCallCounter);
            Assert.assertEquals((long)onCheckpointRolls, (long)this.onCheckpointRollCounter);
            Assert.assertEquals((long)onEventCalls, (long)this.onEventCallCounter);
            Assert.assertEquals((long)onEventRolls, (long)this.onEventRollCounter);
            Assert.assertEquals((long)onProcessingTimeCalls, (long)this.onProcessingTimeCallCounter);
            Assert.assertEquals((long)onProcessingTimeRolls, (long)this.onProcessingTimeRollCounter);
        }
    }
}

