package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.utils.NoOpCommitter;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverable;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverableFsDataOutputStream;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverableWriter;
import org.apache.flink.connector.file.table.FileSystemTableSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest.class */
public class FileWriterBucketTest {
    private static final String BUCKET_ID = "testing-bucket";

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static final Encoder<String> ENCODER = new SimpleStringEncoder();
    private static final Encoder<RowData> rowDataENCODER = new SimpleStringEncoder();
    private static final RollingPolicy<String, String> DEFAULT_ROLLING_POLICY = DefaultRollingPolicy.builder().build();
    private static final RollingPolicy<String, String> ON_CHECKPOING_ROLLING_POLICY = OnCheckpointRollingPolicy.build();
    private static final EachElementRollingPolicy EACH_ELEMENT_ROLLING_POLICY = new EachElementRollingPolicy();

    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest$BaseStubWriter.class */
    private static class BaseStubWriter extends NoOpRecoverableWriter {
        private final boolean supportsResume;

        private BaseStubWriter(boolean z) {
            this.supportsResume = z;
        }

        @Override // org.apache.flink.connector.file.sink.utils.NoOpRecoverableWriter
        public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
            return new NoOpRecoverableFsDataOutputStream() { // from class: org.apache.flink.connector.file.sink.writer.FileWriterBucketTest.BaseStubWriter.1
                @Override // org.apache.flink.connector.file.sink.utils.NoOpRecoverableFsDataOutputStream
                public RecoverableFsDataOutputStream.Committer closeForCommit() throws IOException {
                    return new NoOpCommitter();
                }
            };
        }

        @Override // org.apache.flink.connector.file.sink.utils.NoOpRecoverableWriter
        public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable commitRecoverable) throws IOException {
            Preconditions.checkArgument(commitRecoverable instanceof NoOpRecoverable);
            return new NoOpCommitter();
        }

        @Override // org.apache.flink.connector.file.sink.utils.NoOpRecoverableWriter
        public boolean supportsResume() {
            return this.supportsResume;
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest$EachElementRollingPolicy.class */
    private static class EachElementRollingPolicy implements RollingPolicy<String, String> {
        private EachElementRollingPolicy() {
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileInfo) throws IOException {
            return false;
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileInfo, String str) throws IOException {
            return true;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileInfo, long j) throws IOException {
            return false;
        }

        public /* bridge */ /* synthetic */ boolean shouldRollOnEvent(PartFileInfo partFileInfo, Object obj) throws IOException {
            return shouldRollOnEvent((PartFileInfo<String>) partFileInfo, (String) obj);
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest$StubNonResumableWriter.class */
    private static class StubNonResumableWriter extends BaseStubWriter {
        StubNonResumableWriter() {
            super(false);
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest$StubResumableWriter.class */
    private static class StubResumableWriter extends BaseStubWriter {
        StubResumableWriter() {
            super(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterBucketTest$TestRecoverableWriter.class */
    public static class TestRecoverableWriter extends LocalRecoverableWriter {
        private int cleanupCallCounter;

        TestRecoverableWriter(LocalFileSystem localFileSystem) {
            super(localFileSystem);
            this.cleanupCallCounter = 0;
        }

        int getCleanupCallCounter() {
            return this.cleanupCallCounter;
        }

        public boolean requiresCleanupOfRecoverableState() {
            return true;
        }

        public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumeRecoverable) throws IOException {
            this.cleanupCallCounter++;
            return false;
        }

        public String toString() {
            return "TestRecoverableWriter has called discardRecoverableState() " + this.cleanupCallCounter + " times.";
        }
    }

    @Test
    public void testOnCheckpointNoPendingRecoverable() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        List<FileSinkCommittable> prepareCommit = createBucket.prepareCommit(false);
        FileWriterBucketState snapshotState = createBucket.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 0, 0);
        Assert.assertEquals(BUCKET_ID, snapshotState.getBucketId());
        Assert.assertEquals(path, snapshotState.getBucketPath());
        Assert.assertNotNull("The bucket should have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testOnCheckpointRollingOnCheckpoint() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, ON_CHECKPOING_ROLLING_POLICY, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        List<FileSinkCommittable> prepareCommit = createBucket.prepareCommit(false);
        FileWriterBucketState snapshotState = createBucket.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 1, 0);
        Assert.assertEquals(BUCKET_ID, snapshotState.getBucketId());
        Assert.assertEquals(path, snapshotState.getBucketPath());
        Assert.assertNull("The bucket should not have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testOnCheckpointMultiplePendingFiles() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, EACH_ELEMENT_ROLLING_POLICY, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        createBucket.write("test-element", 0L);
        createBucket.write("test-element", 0L);
        List<FileSinkCommittable> prepareCommit = createBucket.prepareCommit(false);
        FileWriterBucketState snapshotState = createBucket.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 2, 0);
        Assert.assertEquals(BUCKET_ID, snapshotState.getBucketId());
        Assert.assertEquals(path, snapshotState.getBucketPath());
        Assert.assertNotNull("The bucket should not have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testOnCheckpointWithInProgressFileToCleanup() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        createBucket.prepareCommit(false);
        createBucket.snapshotState();
        createBucket.write("test-element", 0L);
        List<FileSinkCommittable> prepareCommit = createBucket.prepareCommit(false);
        FileWriterBucketState snapshotState = createBucket.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 0, 1);
        Assert.assertEquals(BUCKET_ID, snapshotState.getBucketId());
        Assert.assertEquals(path, snapshotState.getBucketPath());
        Assert.assertNotNull("The bucket should not have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testFlush() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        createBucket.write("test-element", 0L);
        compareNumberOfPendingAndInProgress(createBucket.prepareCommit(true), 1, 0);
        Assert.assertNull("The bucket should not have in-progress part after flushed", createBucket.getInProgressPart());
    }

    @Test
    public void testRollingOnProcessingTime() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<String> createBucket = createBucket(getRecoverableWriter(path), path, DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10L)).build(), OutputFileConfig.builder().build());
        createBucket.write("test-element", 11L);
        createBucket.write("test-element", 12L);
        createBucket.onProcessingTime(20L);
        Assert.assertNotNull("The bucket should not roll since interval is not reached", createBucket.getInProgressPart());
        createBucket.write("test-element", 21L);
        createBucket.onProcessingTime(21L);
        Assert.assertNull("The bucket should roll since interval is reached", createBucket.getInProgressPart());
        compareNumberOfPendingAndInProgress(createBucket.prepareCommit(false), 1, 0);
    }

    @Test
    public void testTableRollingOnProcessingTime() throws IOException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriterBucket<RowData> createRowDataBucket = createRowDataBucket(getRecoverableWriter(path), path, new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, Duration.ofMillis(20L).toMillis(), Duration.ofMillis(10L).toMillis()), OutputFileConfig.builder().build());
        createRowDataBucket.write(GenericRowData.of(new Object[]{StringData.fromString("test-element")}), 11L);
        createRowDataBucket.write(GenericRowData.of(new Object[]{StringData.fromString("test-element")}), 12L);
        createRowDataBucket.onProcessingTime(21L);
        Assert.assertNotNull("The bucket should not roll since interval and inactivity not reached", createRowDataBucket.getInProgressPart());
        createRowDataBucket.onProcessingTime(22L);
        Assert.assertNull("The bucket should roll since inactivity is reached", createRowDataBucket.getInProgressPart());
        createRowDataBucket.write(GenericRowData.of(new Object[]{StringData.fromString("test-element")}), 11L);
        createRowDataBucket.write(GenericRowData.of(new Object[]{StringData.fromString("test-element")}), 21L);
        createRowDataBucket.onProcessingTime(30L);
        Assert.assertNotNull("The bucket should not roll since interval and inactivity not reached", createRowDataBucket.getInProgressPart());
        createRowDataBucket.onProcessingTime(31L);
        Assert.assertNull("The bucket should roll since interval is reached", createRowDataBucket.getInProgressPart());
        compareNumberOfPendingAndInProgress(createRowDataBucket.prepareCommit(false), 2, 0);
    }

    @Test
    public void testRestoreWithInprogressFileNotSupportResume() throws IOException {
        FileWriterBucket<String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(new StubNonResumableWriter(), DEFAULT_ROLLING_POLICY);
        Assert.assertNull("The in-progress file should be pre-committed", restoredBucketWithOnlyInProgressPart.getInProgressPart());
        List<FileSinkCommittable> prepareCommit = restoredBucketWithOnlyInProgressPart.prepareCommit(false);
        FileWriterBucketState snapshotState = restoredBucketWithOnlyInProgressPart.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 1, 0);
        Assert.assertNull("The bucket should not have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testRestoreWithInprogressFileSupportResume() throws IOException {
        FileWriterBucket<String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(new StubResumableWriter(), DEFAULT_ROLLING_POLICY);
        Assert.assertNotNull("The in-progress file should be recovered", restoredBucketWithOnlyInProgressPart.getInProgressPart());
        List<FileSinkCommittable> prepareCommit = restoredBucketWithOnlyInProgressPart.prepareCommit(false);
        FileWriterBucketState snapshotState = restoredBucketWithOnlyInProgressPart.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 0, 0);
        Assert.assertNotNull("The bucket should have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testRestoringWithOnlyPendingFiles() throws IOException {
        FileWriterBucket<String> restoredBucketWithOnlyPendingFiles = getRestoredBucketWithOnlyPendingFiles(new StubResumableWriter(), DEFAULT_ROLLING_POLICY, 4);
        Assert.assertNull("There should be no in-progress file", restoredBucketWithOnlyPendingFiles.getInProgressPart());
        Assert.assertEquals(4L, restoredBucketWithOnlyPendingFiles.getPendingFiles().size());
        List<FileSinkCommittable> prepareCommit = restoredBucketWithOnlyPendingFiles.prepareCommit(false);
        restoredBucketWithOnlyPendingFiles.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 4, 0);
    }

    @Test
    public void testMergeWithInprogressFileNotSupportResume() throws IOException {
        FileWriterBucket<String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(new StubNonResumableWriter(), DEFAULT_ROLLING_POLICY);
        restoredBucketWithOnlyInProgressPart.merge(getRestoredBucketWithOnlyInProgressPart(new StubNonResumableWriter(), DEFAULT_ROLLING_POLICY));
        Assert.assertNull("The in-progress file should be pre-committed", restoredBucketWithOnlyInProgressPart.getInProgressPart());
        List<FileSinkCommittable> prepareCommit = restoredBucketWithOnlyInProgressPart.prepareCommit(false);
        FileWriterBucketState snapshotState = restoredBucketWithOnlyInProgressPart.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 2, 0);
        Assert.assertNull("The bucket should have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    @Test
    public void testMergeWithInprogressFileSupportResume() throws IOException {
        FileWriterBucket<String> restoredBucketWithOnlyInProgressPart = getRestoredBucketWithOnlyInProgressPart(new StubResumableWriter(), DEFAULT_ROLLING_POLICY);
        restoredBucketWithOnlyInProgressPart.merge(getRestoredBucketWithOnlyInProgressPart(new StubResumableWriter(), DEFAULT_ROLLING_POLICY));
        Assert.assertNotNull("The in-progress file should be recovered", restoredBucketWithOnlyInProgressPart.getInProgressPart());
        List<FileSinkCommittable> prepareCommit = restoredBucketWithOnlyInProgressPart.prepareCommit(false);
        FileWriterBucketState snapshotState = restoredBucketWithOnlyInProgressPart.snapshotState();
        compareNumberOfPendingAndInProgress(prepareCommit, 1, 0);
        Assert.assertNotNull("The bucket should not have in-progress recoverable", snapshotState.getInProgressFileRecoverable());
    }

    private static FileWriterBucket<String> createBucket(RecoverableWriter recoverableWriter, Path path, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) {
        return FileWriterBucket.getNew(BUCKET_ID, path, new RowWiseBucketWriter(recoverableWriter, ENCODER), rollingPolicy, outputFileConfig);
    }

    private static FileWriterBucket<RowData> createRowDataBucket(RecoverableWriter recoverableWriter, Path path, RollingPolicy<RowData, String> rollingPolicy, OutputFileConfig outputFileConfig) {
        return FileWriterBucket.getNew(BUCKET_ID, path, new RowWiseBucketWriter(recoverableWriter, rowDataENCODER), rollingPolicy, outputFileConfig);
    }

    private static TestRecoverableWriter getRecoverableWriter(Path path) {
        try {
            LocalFileSystem localFileSystem = FileSystem.get(path.toUri());
            if (!(localFileSystem instanceof LocalFileSystem)) {
                Assert.fail("Expected Local FS but got a " + localFileSystem.getClass().getName() + " for path: " + path);
            }
            return new TestRecoverableWriter(localFileSystem);
        } catch (IOException e) {
            Assert.fail();
            return null;
        }
    }

    private void compareNumberOfPendingAndInProgress(List<FileSinkCommittable> list, int i, int i2) {
        int i3 = 0;
        int i4 = 0;
        for (FileSinkCommittable fileSinkCommittable : list) {
            if (fileSinkCommittable.getPendingFile() != null) {
                i3++;
            }
            if (fileSinkCommittable.getInProgressFileToCleanup() != null) {
                i4++;
            }
        }
        Assert.assertEquals(i, i3);
        Assert.assertEquals(i2, i4);
    }

    private FileWriterBucket<String> getRestoredBucketWithOnlyInProgressPart(BaseStubWriter baseStubWriter, RollingPolicy<String, String> rollingPolicy) throws IOException {
        return FileWriterBucket.restore(new RowWiseBucketWriter(baseStubWriter, ENCODER), rollingPolicy, new FileWriterBucketState("test", new Path("file:///fake/fakefile"), 12345L, new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable(new NoOpRecoverable())), OutputFileConfig.builder().build());
    }

    private FileWriterBucket<String> getRestoredBucketWithOnlyPendingFiles(BaseStubWriter baseStubWriter, RollingPolicy<String, String> rollingPolicy, int i) throws IOException {
        return FileWriterBucket.restore(new RowWiseBucketWriter(baseStubWriter, ENCODER), rollingPolicy, new FileWriterBucketState("test", new Path("file:///fake/fakefile"), 12345L, (InProgressFileWriter.InProgressFileRecoverable) null, createPendingPartsPerCheckpoint(i)), OutputFileConfig.builder().build());
    }

    private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable(new NoOpRecoverable()));
            hashMap.put(Long.valueOf(i2), arrayList);
        }
        return hashMap;
    }
}
