package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterTest.class */
public class FileWriterTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterTest$ContextImpl.class */
    public static class ContextImpl implements SinkWriter.Context {
        private final long watermark;
        private final Long timestamp;

        public ContextImpl() {
            this(0L, 0L);
        }

        private ContextImpl(long j, Long l) {
            this.watermark = j;
            this.timestamp = l;
        }

        public long currentWatermark() {
            return this.watermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterTest$ManuallyTriggeredProcessingTimeService.class */
    public static class ManuallyTriggeredProcessingTimeService implements Sink.ProcessingTimeService {
        private long now;
        private final Queue<Tuple2<Long, Sink.ProcessingTimeService.ProcessingTimeCallback>> timers;

        private ManuallyTriggeredProcessingTimeService() {
            this.timers = new PriorityQueue(Comparator.comparingLong(tuple2 -> {
                return ((Long) tuple2.f0).longValue();
            }));
        }

        public long getCurrentProcessingTime() {
            return this.now;
        }

        public void registerProcessingTimer(long j, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            if (j > this.now) {
                this.timers.add(new Tuple2<>(Long.valueOf(j), processingTimeCallback));
                return;
            }
            try {
                processingTimeCallback.onProcessingTime(this.now);
            } catch (IOException | InterruptedException e) {
                ExceptionUtils.rethrow(e);
            }
        }

        public void advanceTo(long j) throws IOException, InterruptedException {
            if (j <= this.now) {
                return;
            }
            this.now = j;
            while (true) {
                Tuple2<Long, Sink.ProcessingTimeService.ProcessingTimeCallback> peek = this.timers.peek();
                if (peek == null || ((Long) peek.f0).longValue() > this.now) {
                    return;
                }
                ((Sink.ProcessingTimeService.ProcessingTimeCallback) peek.f1).onProcessingTime(this.now);
                this.timers.poll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/writer/FileWriterTest$VerifyingBucketAssigner.class */
    public static class VerifyingBucketAssigner implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 7729086510972377578L;
        private final Long expectedTimestamp;
        private final long expectedWatermark;
        private final long expectedProcessingTime;

        VerifyingBucketAssigner(Long l, long j, long j2) {
            this.expectedTimestamp = l;
            this.expectedWatermark = j;
            this.expectedProcessingTime = j2;
        }

        public String getBucketId(String str, BucketAssigner.Context context) {
            Long timestamp = context.timestamp();
            long currentWatermark = context.currentWatermark();
            long currentProcessingTime = context.currentProcessingTime();
            Assert.assertEquals(this.expectedTimestamp, timestamp);
            Assert.assertEquals(this.expectedProcessingTime, currentProcessingTime);
            Assert.assertEquals(this.expectedWatermark, currentWatermark);
            return str;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    @Test
    public void testPreCommit() throws Exception {
        FileWriter<String> createWriter = createWriter(new Path(TEMP_FOLDER.newFolder().toURI()), OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        createWriter.write("test1", new ContextImpl());
        createWriter.write("test1", new ContextImpl());
        createWriter.write("test2", new ContextImpl());
        createWriter.write("test2", new ContextImpl());
        createWriter.write("test3", new ContextImpl());
        Assert.assertEquals(3L, createWriter.prepareCommit(false).size());
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriter<String> createWriter = createWriter(path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        createWriter.write("test1", new ContextImpl());
        createWriter.write("test2", new ContextImpl());
        createWriter.write("test3", new ContextImpl());
        Assert.assertEquals(3L, createWriter.getActiveBuckets().size());
        createWriter.prepareCommit(false);
        List snapshotState = createWriter.snapshotState(1L);
        Assert.assertEquals(3L, snapshotState.size());
        FileWriter<String> restoreWriter = restoreWriter(snapshotState, path, OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        Assert.assertEquals(restoreWriter.getActiveBuckets().keySet(), new HashSet(Arrays.asList("test1", "test2", "test3")));
        Iterator it = restoreWriter.getActiveBuckets().values().iterator();
        while (it.hasNext()) {
            Assert.assertNotNull("The in-progress file should be recovered", ((FileWriterBucket) it.next()).getInProgressPart());
        }
    }

    @Test
    public void testMergingForRescaling() throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        FileWriter<String> createWriter = createWriter(path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        createWriter.write("test1", new ContextImpl());
        createWriter.write("test2", new ContextImpl());
        createWriter.write("test3", new ContextImpl());
        createWriter.prepareCommit(false);
        List snapshotState = createWriter.snapshotState(1L);
        FileWriter<String> createWriter2 = createWriter(path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        createWriter2.write("test1", new ContextImpl());
        createWriter2.write("test2", new ContextImpl());
        createWriter2.prepareCommit(false);
        List snapshotState2 = createWriter2.snapshotState(1L);
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(snapshotState);
        arrayList.addAll(snapshotState2);
        FileWriter<String> restoreWriter = restoreWriter(arrayList, path, DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        Assert.assertEquals(3L, restoreWriter.getActiveBuckets().size());
        Iterator it = Arrays.asList("test1", "test2").iterator();
        while (it.hasNext()) {
            Assert.assertNotNull("The in-progress file should be recovered", ((FileWriterBucket) restoreWriter.getActiveBuckets().get((String) it.next())).getInProgressPart());
            Assert.assertEquals(1L, r0.getPendingFiles().size());
        }
        Iterator it2 = Collections.singletonList("test3").iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull("The in-progress file should be recovered", ((FileWriterBucket) restoreWriter.getActiveBuckets().get((String) it2.next())).getInProgressPart());
            Assert.assertEquals(0L, r0.getPendingFiles().size());
        }
    }

    @Test
    public void testBucketIsRemovedWhenNotActive() throws Exception {
        FileWriter<String> createWriter = createWriter(new Path(TEMP_FOLDER.newFolder().toURI()), OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        createWriter.write("test", new ContextImpl());
        createWriter.prepareCommit(false);
        createWriter.snapshotState(1L);
        createWriter.prepareCommit(false);
        Assert.assertTrue(createWriter.getActiveBuckets().isEmpty());
    }

    @Test
    public void testOnProcessingTime() throws IOException, InterruptedException {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        ManuallyTriggeredProcessingTimeService manuallyTriggeredProcessingTimeService = new ManuallyTriggeredProcessingTimeService();
        manuallyTriggeredProcessingTimeService.advanceTo(10L);
        FileWriter<String> createWriter = createWriter(path, new FileSinkTestUtils.StringIdentityBucketAssigner(), DefaultRollingPolicy.builder().withRolloverInterval(10L).build(), new OutputFileConfig("part-", ""), manuallyTriggeredProcessingTimeService, 5L);
        createWriter.initializeState(Collections.emptyList());
        createWriter.write("test1", new ContextImpl());
        manuallyTriggeredProcessingTimeService.advanceTo(15L);
        createWriter.write("test2", new ContextImpl());
        manuallyTriggeredProcessingTimeService.advanceTo(20L);
        Assert.assertNull("The in-progress part of test1 should be rolled", ((FileWriterBucket) createWriter.getActiveBuckets().get("test1")).getInProgressPart());
        Assert.assertEquals(1L, r0.getPendingFiles().size());
        Assert.assertNotNull("The in-progress part of test2 should not be rolled", ((FileWriterBucket) createWriter.getActiveBuckets().get("test2")).getInProgressPart());
        Assert.assertEquals(0L, r0.getPendingFiles().size());
        manuallyTriggeredProcessingTimeService.advanceTo(30L);
        createWriter.prepareCommit(false);
        createWriter.write("test1", new ContextImpl());
        manuallyTriggeredProcessingTimeService.advanceTo(35L);
        createWriter.write("test2", new ContextImpl());
        manuallyTriggeredProcessingTimeService.advanceTo(40L);
        Assert.assertNull("The in-progress part of test1 should be rolled", ((FileWriterBucket) createWriter.getActiveBuckets().get("test1")).getInProgressPart());
        Assert.assertEquals(1L, r0.getPendingFiles().size());
        Assert.assertNotNull("The in-progress part of test2 should not be rolled", ((FileWriterBucket) createWriter.getActiveBuckets().get("test2")).getInProgressPart());
        Assert.assertEquals(0L, r0.getPendingFiles().size());
    }

    @Test
    public void testContextPassingNormalExecution() throws Exception {
        testCorrectTimestampPassingInContext(1L, 2L, 3L);
    }

    @Test
    public void testContextPassingNullTimestamp() throws Exception {
        testCorrectTimestampPassingInContext(null, 4L, 5L);
    }

    private void testCorrectTimestampPassingInContext(Long l, long j, long j2) throws Exception {
        Path path = new Path(TEMP_FOLDER.newFolder().toURI());
        ManuallyTriggeredProcessingTimeService manuallyTriggeredProcessingTimeService = new ManuallyTriggeredProcessingTimeService();
        manuallyTriggeredProcessingTimeService.advanceTo(j2);
        FileWriter<String> createWriter = createWriter(path, new VerifyingBucketAssigner(l, j, j2), DefaultRollingPolicy.builder().withRolloverInterval(10L).build(), new OutputFileConfig("part-", ""), manuallyTriggeredProcessingTimeService, 5L);
        createWriter.initializeState(Collections.emptyList());
        createWriter.write("test", new ContextImpl(j, l));
    }

    private static FileWriter<String> createWriter(Path path, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        return new FileWriter<>(path, new FileSinkTestUtils.StringIdentityBucketAssigner(), new DefaultFileWriterBucketFactory(), new RowWiseBucketWriter(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder()), rollingPolicy, outputFileConfig, new ManuallyTriggeredProcessingTimeService(), 10L);
    }

    private static FileWriter<String> createWriter(Path path, BucketAssigner<String, String> bucketAssigner, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, Sink.ProcessingTimeService processingTimeService, long j) throws IOException {
        return new FileWriter<>(path, bucketAssigner, new DefaultFileWriterBucketFactory(), new RowWiseBucketWriter(FileSystem.get(path.toUri()).createRecoverableWriter(), new SimpleStringEncoder()), rollingPolicy, outputFileConfig, processingTimeService, j);
    }

    private static FileWriter<String> restoreWriter(List<FileWriterBucketState> list, Path path, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        FileWriter<String> createWriter = createWriter(path, rollingPolicy, outputFileConfig);
        createWriter.initializeState(list);
        return createWriter;
    }
}
