package org.apache.flink.connector.file.sink;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase.class */
public class FileSinkCompactionSwitchITCase extends TestLogger {
    private static final int PARALLELISM = 4;
    protected static final int NUM_SOURCES = 4;
    protected static final int NUM_SINKS = 3;
    protected static final int NUM_RECORDS = 10000;
    protected static final int NUM_BUCKETS = 4;
    private String latchId;

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap();

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());

    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/file/sink/FileSinkCompactionSwitchITCase$CountingTestSource.class */
    public static class CountingTestSource extends RichParallelSourceFunction<Integer> implements CheckpointListener, CheckpointedFunction {
        private final String latchId;
        private final int numberOfRecords;
        private final boolean isFinite;
        private final SharedReference<ConcurrentHashMap<Integer, Integer>> sendCountMap;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean isCanceled;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;

        public CountingTestSource(String str, int i, boolean z, SharedReference<ConcurrentHashMap<Integer, Integer>> sharedReference) {
            this.latchId = str;
            this.numberOfRecords = i;
            this.isFinite = z;
            this.sendCountMap = sharedReference;
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.nextValueState = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() == null || !((Iterable) this.nextValueState.get()).iterator().hasNext()) {
                return;
            }
            this.nextValue = ((Integer) ((Iterable) this.nextValueState.get()).iterator().next()).intValue();
        }

        public void run(SourceFunction.SourceContext<Integer> sourceContext) throws Exception {
            sendRecordsUntil(this.isFinite ? this.nextValue + this.numberOfRecords : Integer.MAX_VALUE, sourceContext);
            this.isWaitingCheckpointComplete = true;
            ((CountDownLatch) FileSinkCompactionSwitchITCase.LATCH_MAP.get(this.latchId)).await();
        }

        private void sendRecordsUntil(int i, SourceFunction.SourceContext<Integer> sourceContext) throws InterruptedException {
            while (!this.isCanceled && this.nextValue < i) {
                synchronized (sourceContext.getCheckpointLock()) {
                    int i2 = this.nextValue;
                    this.nextValue = i2 + 1;
                    sourceContext.collect(Integer.valueOf(i2));
                    if (!this.isFinite && this.nextValue % 100 == 0) {
                        Thread.sleep(1L);
                    }
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.nextValueState.update(Collections.singletonList(Integer.valueOf(this.nextValue)));
            this.sendCountMap.consumeSync(concurrentHashMap -> {
            });
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (!this.isFinite || (this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput)) {
                ((CountDownLatch) FileSinkCompactionSwitchITCase.LATCH_MAP.get(this.latchId)).countDown();
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }

    @Before
    public void setup() {
        this.latchId = UUID.randomUUID().toString();
        LATCH_MAP.put(this.latchId, new CountDownLatch(12));
    }

    @After
    public void teardown() {
        LATCH_MAP.remove(this.latchId);
    }

    @Test
    public void testSwitchNeverEnabledToEnabled() throws Exception {
        String absolutePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        testSwitchingCompaction(absolutePath, createFileSink(absolutePath, null, false), createFileSink(absolutePath, createFileCompactStrategy(), false));
    }

    @Test
    public void testSwitchDisabledToEnabled() throws Exception {
        String absolutePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        testSwitchingCompaction(absolutePath, createFileSink(absolutePath, null, true), createFileSink(absolutePath, createFileCompactStrategy(), false));
    }

    @Test
    public void testSwitchEnabledToDisabled() throws Exception {
        String absolutePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        testSwitchingCompaction(absolutePath, createFileSink(absolutePath, createFileCompactStrategy(), false), createFileSink(absolutePath, null, true));
    }

    @Test
    public void testSwitchEnabledToDisabledImproperly() throws Exception {
        String absolutePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        try {
            testSwitchingCompaction(absolutePath, createFileSink(absolutePath, createFileCompactStrategy(), false), createFileSink(absolutePath, null, false));
            Assert.fail("Job is not failing when compaction is disabled improperly");
        } catch (JobExecutionException e) {
        }
    }

    private void testSwitchingCompaction(String str, FileSink<Integer> fileSink, FileSink<Integer> fileSink2) throws Exception {
        String str2 = "file://" + TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        SharedReference<ConcurrentHashMap<Integer, Integer>> add = this.sharedObjects.add(new ConcurrentHashMap());
        JobGraph createJobGraph = createJobGraph(str2, fileSink, false, add);
        JobGraph createJobGraph2 = createJobGraph(str2, fileSink2, true, add);
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "18081-19000");
        MiniCluster miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).setConfiguration(configuration).build());
        Throwable th = null;
        try {
            try {
                miniCluster.start();
                miniCluster.submitJob(createJobGraph);
                LATCH_MAP.get(this.latchId).await();
                String str3 = (String) miniCluster.triggerSavepoint(createJobGraph.getJobID(), TEMPORARY_FOLDER.newFolder().getAbsolutePath(), true, SavepointFormatType.CANONICAL).get();
                LATCH_MAP.put(this.latchId, new CountDownLatch(8));
                createJobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str3, false));
                miniCluster.executeJobBlocking(createJobGraph2);
                if (miniCluster != null) {
                    if (0 != 0) {
                        try {
                            miniCluster.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        miniCluster.close();
                    }
                }
                checkIntegerSequenceSinkOutput(str, (Map) add.get(), 4, 4);
            } finally {
            }
        } catch (Throwable th3) {
            if (miniCluster != null) {
                if (th != null) {
                    try {
                        miniCluster.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    miniCluster.close();
                }
            }
            throw th3;
        }
    }

    private JobGraph createJobGraph(String str, FileSink<Integer> fileSink, boolean z, SharedReference<ConcurrentHashMap<Integer, Integer>> sharedReference) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, false);
        executionEnvironment.configure(configuration, getClass().getClassLoader());
        executionEnvironment.enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(str));
        executionEnvironment.setStateBackend(new HashMapStateBackend());
        executionEnvironment.addSource(new CountingTestSource(this.latchId, NUM_RECORDS, z, sharedReference)).setParallelism(4).sinkTo(fileSink).uid("sink").setParallelism(NUM_SINKS);
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private FileSink<Integer> createFileSink(String str, FileCompactStrategy fileCompactStrategy, boolean z) {
        FileSink.DefaultRowFormatBuilder withRollingPolicy = FileSink.forRowFormat(new Path(str), new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner(new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4)).withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024L, false));
        if (fileCompactStrategy != null) {
            withRollingPolicy = (FileSink.DefaultRowFormatBuilder) withRollingPolicy.enableCompact(fileCompactStrategy, createFileCompactor());
        } else if (z) {
            withRollingPolicy = (FileSink.DefaultRowFormatBuilder) withRollingPolicy.disableCompact();
        }
        return withRollingPolicy.build();
    }

    private static FileCompactor createFileCompactor() {
        return new RecordWiseFileCompactor(new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
    }

    private static FileCompactStrategy createFileCompactStrategy() {
        return FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(2).build();
    }

    private static void checkIntegerSequenceSinkOutput(String str, Map<Integer, Integer> map, int i, int i2) throws Exception {
        Assert.assertEquals(i2, map.size());
        String[] list = new File(str).list();
        Assert.assertNotNull(list);
        Arrays.sort(list, Comparator.comparingInt(Integer::parseInt));
        Assert.assertEquals(i, list.length);
        loop0: for (int i3 = 0; i3 < i; i3++) {
            Assert.assertEquals(Integer.toString(i3), list[i3]);
            File file = new File(str, list[i3]);
            Assert.assertTrue(file.getAbsolutePath() + " Should be a existing directory", file.isDirectory());
            HashMap hashMap = new HashMap();
            File[] listFiles = file.listFiles(file2 -> {
                return !file2.getName().startsWith(".");
            });
            Assert.assertNotNull(listFiles);
            for (File file3 : listFiles) {
                Assert.assertTrue(file3.isFile());
                try {
                    DataInputStream dataInputStream = new DataInputStream(new FileInputStream(file3));
                    Throwable th = null;
                    while (true) {
                        try {
                            try {
                                hashMap.compute(Integer.valueOf(dataInputStream.readInt()), (num, num2) -> {
                                    return Integer.valueOf(num2 == null ? 1 : num2.intValue() + 1);
                                });
                            } catch (Throwable th2) {
                                if (dataInputStream != null) {
                                    if (th != null) {
                                        try {
                                            dataInputStream.close();
                                        } catch (Throwable th3) {
                                            th.addSuppressed(th3);
                                        }
                                    } else {
                                        dataInputStream.close();
                                    }
                                }
                                throw th2;
                                break loop0;
                            }
                        } catch (Throwable th4) {
                            th = th4;
                            throw th4;
                            break loop0;
                        }
                    }
                } catch (EOFException e) {
                }
            }
            int i4 = i3;
            Assert.assertEquals(map.values().stream().map(num3 -> {
                return Integer.valueOf((num3.intValue() / i) + (i4 < num3.intValue() % i ? 1 : 0));
            }).mapToInt(num4 -> {
                return num4.intValue();
            }).max().getAsInt(), hashMap.size());
            ArrayList arrayList = new ArrayList(map.values());
            Collections.sort(arrayList);
            int i5 = 0;
            while (i5 < arrayList.size()) {
                int intValue = i5 == 0 ? 0 : ((Integer) arrayList.get(i5 - 1)).intValue();
                int i6 = i4 + (intValue % i == 0 ? intValue : (intValue + i) - (intValue % i));
                int intValue2 = ((Integer) arrayList.get(i5)).intValue();
                int i7 = i6;
                while (true) {
                    int i8 = i7;
                    if (i8 < intValue2) {
                        Assert.assertEquals("The record " + i8 + " should occur " + (i - i5) + " times,  but only occurs " + hashMap.getOrDefault(Integer.valueOf(i8), 0) + "time", i - i5, ((Integer) hashMap.getOrDefault(Integer.valueOf(i8), 0)).intValue());
                        i7 = i8 + i;
                    }
                }
                i5++;
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/file/sink/compactor/DecoderBasedReader$Decoder$Factory") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/flink/connector/file/sink/compactor/DecoderBasedReader$Decoder;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/file/sink/utils/IntegerFileSinkTestDataUtils$IntDecoder") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return IntegerFileSinkTestDataUtils.IntDecoder::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
