package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.TestUtils;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.hamcrest.Description;
import org.hamcrest.MatcherAssert;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest.class */
public class BucketsTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest$OnProcessingTimePolicy.class */
    private static class OnProcessingTimePolicy<IN, BucketID> implements RollingPolicy<IN, BucketID> {
        private static final long serialVersionUID = 1;
        private int onProcessingTimeRollCounter = 0;
        private final long rolloverInterval;

        OnProcessingTimePolicy(long j) {
            this.rolloverInterval = j;
        }

        public int getOnProcessingTimeRollCounter() {
            return this.onProcessingTimeRollCounter;
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<BucketID> partFileInfo) {
            return false;
        }

        public boolean shouldRollOnEvent(PartFileInfo<BucketID> partFileInfo, IN in) {
            return false;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<BucketID> partFileInfo, long j) {
            boolean z = j - partFileInfo.getCreationTime() >= this.rolloverInterval;
            if (z) {
                this.onProcessingTimeRollCounter++;
            }
            return z;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest$RecordBucketLifeCycleListener.class */
    private static class RecordBucketLifeCycleListener implements BucketLifeCycleListener<String, String> {
        private List<Tuple2<EventType, String>> events;

        /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest$RecordBucketLifeCycleListener$EventType.class */
        public enum EventType {
            CREATED,
            INACTIVE
        }

        private RecordBucketLifeCycleListener() {
            this.events = new ArrayList();
        }

        public void bucketCreated(Bucket<String, String> bucket) {
            this.events.add(new Tuple2<>(EventType.CREATED, bucket.getBucketId()));
        }

        public void bucketInactive(Bucket<String, String> bucket) {
            this.events.add(new Tuple2<>(EventType.INACTIVE, bucket.getBucketId()));
        }

        public List<Tuple2<EventType, String>> getEvents() {
            return this.events;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest$TestFileLifeCycleListener.class */
    private static class TestFileLifeCycleListener implements FileLifeCycleListener<String> {
        private final Map<String, List<String>> files;

        private TestFileLifeCycleListener() {
            this.files = new HashMap();
        }

        public void onPartFileOpened(String str, Path path) {
            this.files.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(path.getName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/BucketsTest$VerifyingBucketAssigner.class */
    public static class VerifyingBucketAssigner implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 7729086510972377578L;
        private final Long expectedTimestamp;
        private final long expectedWatermark;
        private final long expectedProcessingTime;

        VerifyingBucketAssigner(Long l, long j, long j2) {
            this.expectedTimestamp = l;
            this.expectedWatermark = j;
            this.expectedProcessingTime = j2;
        }

        public String getBucketId(String str, BucketAssigner.Context context) {
            Long timestamp = context.timestamp();
            long currentWatermark = context.currentWatermark();
            long currentProcessingTime = context.currentProcessingTime();
            Assert.assertEquals(this.expectedTimestamp, timestamp);
            Assert.assertEquals(this.expectedProcessingTime, currentProcessingTime);
            Assert.assertEquals(this.expectedWatermark, currentWatermark);
            return str;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        OnCheckpointRollingPolicy build = OnCheckpointRollingPolicy.build();
        Buckets<String, String> createBuckets = createBuckets(path, build, 0);
        TestUtils.MockListState mockListState = new TestUtils.MockListState();
        TestUtils.MockListState mockListState2 = new TestUtils.MockListState();
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.snapshotState(0L, mockListState, mockListState2);
        MatcherAssert.assertThat(createBuckets.getActiveBuckets().get("test1"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
        createBuckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.snapshotState(1L, mockListState, mockListState2);
        MatcherAssert.assertThat(createBuckets.getActiveBuckets().get("test1"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test1"));
        MatcherAssert.assertThat(createBuckets.getActiveBuckets().get("test2"), hasSinglePartFileToBeCommittedOnCheckpointAck(path, "test2"));
        Assert.assertTrue(restoreBuckets(path, build, 0, mockListState, mockListState2).getActiveBuckets().isEmpty());
    }

    private static TypeSafeMatcher<Bucket<String, String>> hasSinglePartFileToBeCommittedOnCheckpointAck(final Path path, final String str) {
        return new TypeSafeMatcher<Bucket<String, String>>() { // from class: org.apache.flink.streaming.api.functions.sink.filesystem.BucketsTest.1
            /* JADX INFO: Access modifiers changed from: protected */
            public boolean matchesSafely(Bucket<String, String> bucket) {
                return ((String) bucket.getBucketId()).equals(str) && bucket.getBucketPath().equals(new Path(path, str)) && bucket.getInProgressPart() == null && bucket.getPendingFileRecoverablesForCurrentCheckpoint().isEmpty() && bucket.getPendingFileRecoverablesPerCheckpoint().size() == 1;
            }

            public void describeTo(Description description) {
                description.appendText("a Bucket with a single pending part file @ ").appendValue(new Path(path, str)).appendText("'");
            }
        };
    }

    @Test
    public void testMergeAtScaleInAndMaxCounterAtRecovery() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        DefaultRollingPolicy build = DefaultRollingPolicy.builder().withMaxPartSize(7L).build();
        TestUtils.MockListState mockListState = new TestUtils.MockListState();
        TestUtils.MockListState mockListState2 = new TestUtils.MockListState();
        TestUtils.MockListState mockListState3 = new TestUtils.MockListState();
        TestUtils.MockListState mockListState4 = new TestUtils.MockListState();
        Buckets<String, String> createBuckets = createBuckets(path, build, 0);
        Buckets<String, String> createBuckets2 = createBuckets(path, build, 1);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.snapshotState(0L, mockListState, mockListState3);
        Assert.assertEquals(1L, createBuckets.getMaxPartCounter());
        Assert.assertNotNull(((Bucket) createBuckets.getActiveBuckets().get("test1")).getInProgressPart());
        createBuckets2.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets2.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets2.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets2.snapshotState(0L, mockListState2, mockListState4);
        Assert.assertEquals(2L, createBuckets2.getMaxPartCounter());
        Assert.assertEquals(1L, ((Bucket) createBuckets2.getActiveBuckets().get("test1")).getPendingFileRecoverablesPerCheckpoint().size());
        Assert.assertNotNull(((Bucket) createBuckets2.getActiveBuckets().get("test1")).getInProgressPart());
        TestUtils.MockListState mockListState5 = new TestUtils.MockListState();
        TestUtils.MockListState mockListState6 = new TestUtils.MockListState();
        mockListState5.addAll(mockListState.getBackingList());
        mockListState5.addAll(mockListState2.getBackingList());
        mockListState6.addAll(mockListState3.getBackingList());
        mockListState6.addAll(mockListState4.getBackingList());
        Buckets<String, String> restoreBuckets = restoreBuckets(path, build, 0, mockListState5, mockListState6);
        Assert.assertEquals(2L, restoreBuckets.getMaxPartCounter());
        Map activeBuckets = restoreBuckets.getActiveBuckets();
        Assert.assertEquals(1L, activeBuckets.size());
        Assert.assertTrue(activeBuckets.keySet().contains("test1"));
        Bucket bucket = (Bucket) activeBuckets.get("test1");
        Assert.assertEquals("test1", bucket.getBucketId());
        Assert.assertEquals(new Path(path, "test1"), bucket.getBucketPath());
        Assert.assertNotNull(bucket.getInProgressPart());
        Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
        Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
    }

    @Test
    public void testOnProcessingTime() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        Buckets<String, String> createBuckets = createBuckets(path, new OnProcessingTimePolicy(2L), 0);
        createBuckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));
        createBuckets.onProcessingTime(7L);
        Assert.assertEquals(1L, r0.getOnProcessingTimeRollCounter());
        Map activeBuckets = createBuckets.getActiveBuckets();
        Assert.assertEquals(1L, activeBuckets.size());
        Assert.assertTrue(activeBuckets.keySet().contains("test"));
        Bucket bucket = (Bucket) activeBuckets.get("test");
        Assert.assertEquals("test", bucket.getBucketId());
        Assert.assertEquals(new Path(path, "test"), bucket.getBucketPath());
        Assert.assertEquals("test", bucket.getBucketId());
        Assert.assertNull(bucket.getInProgressPart());
        Assert.assertEquals(1L, bucket.getPendingFileRecoverablesForCurrentCheckpoint().size());
        Assert.assertTrue(bucket.getPendingFileRecoverablesPerCheckpoint().isEmpty());
    }

    @Test
    public void testBucketIsRemovedWhenNotActive() throws Exception {
        Buckets<String, String> createBuckets = createBuckets(new Path(TEMP_FOLDER.newFolder().toURI()), new OnProcessingTimePolicy(2L), 0);
        createBuckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));
        createBuckets.onProcessingTime(7L);
        Assert.assertEquals(1L, r0.getOnProcessingTimeRollCounter());
        createBuckets.snapshotState(0L, new TestUtils.MockListState(), new TestUtils.MockListState());
        createBuckets.commitUpToCheckpoint(0L);
        Assert.assertTrue(createBuckets.getActiveBuckets().isEmpty());
    }

    @Test
    public void testPartCounterAfterBucketResurrection() throws Exception {
        Buckets<String, String> createBuckets = createBuckets(new Path(TEMP_FOLDER.newFolder().toURI()), new OnProcessingTimePolicy(2L), 0);
        createBuckets.onElement("test", new TestUtils.MockSinkContext(1L, 2L, 3L));
        Assert.assertEquals(1L, ((Bucket) createBuckets.getActiveBuckets().get("test")).getPartCounter());
        createBuckets.onProcessingTime(7L);
        Assert.assertEquals(1L, r0.getOnProcessingTimeRollCounter());
        Assert.assertEquals(1L, ((Bucket) createBuckets.getActiveBuckets().get("test")).getPartCounter());
        createBuckets.snapshotState(0L, new TestUtils.MockListState(), new TestUtils.MockListState());
        createBuckets.commitUpToCheckpoint(0L);
        Assert.assertTrue(createBuckets.getActiveBuckets().isEmpty());
        createBuckets.onElement("test", new TestUtils.MockSinkContext(2L, 3L, 4L));
        Assert.assertEquals(2L, ((Bucket) createBuckets.getActiveBuckets().get("test")).getPartCounter());
    }

    @Test
    public void testContextPassingNormalExecution() throws Exception {
        testCorrectTimestampPassingInContext(1L, 2L, 3L);
    }

    @Test
    public void testContextPassingNullTimestamp() throws Exception {
        testCorrectTimestampPassingInContext(null, 2L, 3L);
    }

    private void testCorrectTimestampPassingInContext(Long l, long j, long j2) throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        new Buckets(path, new VerifyingBucketAssigner(l, j, j2), new DefaultBucketFactoryImpl(), new RowWiseBucketWriter(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder()), DefaultRollingPolicy.builder().build(), 2, OutputFileConfig.builder().build()).onElement("test", new TestUtils.MockSinkContext(l, j, j2));
    }

    @Test
    public void testBucketLifeCycleListenerOnCreatingAndInactive() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        OnProcessingTimePolicy onProcessingTimePolicy = new OnProcessingTimePolicy(2L);
        RecordBucketLifeCycleListener recordBucketLifeCycleListener = new RecordBucketLifeCycleListener();
        Buckets<String, String> createBuckets = createBuckets(path, onProcessingTimePolicy, recordBucketLifeCycleListener, null, 0, OutputFileConfig.builder().build());
        TestUtils.MockListState mockListState = new TestUtils.MockListState();
        TestUtils.MockListState mockListState2 = new TestUtils.MockListState();
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));
        createBuckets.onProcessingTime(4L);
        createBuckets.snapshotState(0L, mockListState, mockListState2);
        createBuckets.commitUpToCheckpoint(0L);
        createBuckets.onProcessingTime(6L);
        createBuckets.snapshotState(1L, mockListState, mockListState2);
        createBuckets.commitUpToCheckpoint(1L);
        Assert.assertEquals(Arrays.asList(new Tuple2(RecordBucketLifeCycleListener.EventType.CREATED, "test1"), new Tuple2(RecordBucketLifeCycleListener.EventType.CREATED, "test2"), new Tuple2(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1"), new Tuple2(RecordBucketLifeCycleListener.EventType.INACTIVE, "test2")), recordBucketLifeCycleListener.getEvents());
    }

    @Test
    public void testBucketLifeCycleListenerOnRestoring() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        OnProcessingTimePolicy onProcessingTimePolicy = new OnProcessingTimePolicy(2L);
        RecordBucketLifeCycleListener recordBucketLifeCycleListener = new RecordBucketLifeCycleListener();
        Buckets<String, String> createBuckets = createBuckets(path, onProcessingTimePolicy, recordBucketLifeCycleListener, null, 0, OutputFileConfig.builder().build());
        TestUtils.MockListState mockListState = new TestUtils.MockListState();
        TestUtils.MockListState mockListState2 = new TestUtils.MockListState();
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));
        createBuckets.onProcessingTime(4L);
        createBuckets.snapshotState(0L, mockListState, mockListState2);
        Assert.assertEquals(new HashSet(Collections.singletonList("test2")), restoreBuckets(path, onProcessingTimePolicy, recordBucketLifeCycleListener, null, 0, mockListState, mockListState2, OutputFileConfig.builder().build()).getActiveBuckets().keySet());
        Assert.assertEquals(Arrays.asList(new Tuple2(RecordBucketLifeCycleListener.EventType.CREATED, "test1"), new Tuple2(RecordBucketLifeCycleListener.EventType.CREATED, "test2"), new Tuple2(RecordBucketLifeCycleListener.EventType.INACTIVE, "test1")), recordBucketLifeCycleListener.getEvents());
    }

    @Test
    public void testFileLifeCycleListener() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        OnProcessingTimePolicy onProcessingTimePolicy = new OnProcessingTimePolicy(2L);
        TestFileLifeCycleListener testFileLifeCycleListener = new TestFileLifeCycleListener();
        Buckets<String, String> createBuckets = createBuckets(path, onProcessingTimePolicy, null, testFileLifeCycleListener, 0, OutputFileConfig.builder().build());
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 2L));
        createBuckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 3L));
        createBuckets.onProcessingTime(4L);
        createBuckets.onElement("test1", new TestUtils.MockSinkContext(null, 1L, 5L));
        createBuckets.onElement("test2", new TestUtils.MockSinkContext(null, 1L, 6L));
        Assert.assertEquals(2L, testFileLifeCycleListener.files.size());
        Assert.assertEquals(Arrays.asList("part-0-0", "part-0-1"), testFileLifeCycleListener.files.get("test1"));
        Assert.assertEquals(Collections.singletonList("part-0-1"), testFileLifeCycleListener.files.get("test2"));
    }

    private static Buckets<String, String> createBuckets(Path path, RollingPolicy<String, String> rollingPolicy, int i) throws IOException {
        return createBuckets(path, rollingPolicy, null, null, i, OutputFileConfig.builder().build());
    }

    private static Buckets<String, String> createBuckets(Path path, RollingPolicy<String, String> rollingPolicy, BucketLifeCycleListener<String, String> bucketLifeCycleListener, FileLifeCycleListener<String> fileLifeCycleListener, int i, OutputFileConfig outputFileConfig) throws IOException {
        Buckets<String, String> buckets = new Buckets<>(path, new TestUtils.StringIdentityBucketAssigner(), new DefaultBucketFactoryImpl(), new RowWiseBucketWriter(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder()), rollingPolicy, i, outputFileConfig);
        if (bucketLifeCycleListener != null) {
            buckets.setBucketLifeCycleListener(bucketLifeCycleListener);
        }
        if (fileLifeCycleListener != null) {
            buckets.setFileLifeCycleListener(fileLifeCycleListener);
        }
        return buckets;
    }

    private static Buckets<String, String> restoreBuckets(Path path, RollingPolicy<String, String> rollingPolicy, int i, ListState<byte[]> listState, ListState<Long> listState2) throws Exception {
        return restoreBuckets(path, rollingPolicy, null, null, i, listState, listState2, OutputFileConfig.builder().build());
    }

    private static Buckets<String, String> restoreBuckets(Path path, RollingPolicy<String, String> rollingPolicy, BucketLifeCycleListener<String, String> bucketLifeCycleListener, FileLifeCycleListener<String> fileLifeCycleListener, int i, ListState<byte[]> listState, ListState<Long> listState2, OutputFileConfig outputFileConfig) throws Exception {
        Buckets<String, String> createBuckets = createBuckets(path, rollingPolicy, bucketLifeCycleListener, fileLifeCycleListener, i, outputFileConfig);
        createBuckets.initializeState(listState, listState2);
        return createBuckets;
    }
}
