package org.apache.flink.test.streaming.runtime;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.CachedDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/CacheITCase.class */
public class CacheITCase extends AbstractTestBase {
    private StreamExecutionEnvironment env;
    private MiniClusterWithClientResource miniClusterWithClientResource;

    @BeforeEach
    void setUp() throws Exception {
        Configuration configuration = new Configuration();
        this.miniClusterWithClientResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(8).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
        this.miniClusterWithClientResource.before();
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, false);
        this.env = new TestStreamEnvironment(this.miniClusterWithClientResource.getMiniCluster(), configuration, 8, Collections.emptyList(), Collections.emptyList());
        this.env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    }

    @AfterEach
    void tearDown() {
        this.miniClusterWithClientResource.after();
    }

    @Test
    void testCacheProduceAndConsume(@TempDir Path path) throws Exception {
        CachedDataStream cache = this.env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(), new org.apache.flink.core.fs.Path[]{new org.apache.flink.core.fs.Path(prepareTestData(path).getPath())}).build(), WatermarkStrategy.noWatermarks(), "source").map(str -> {
            return Integer.valueOf(Integer.parseInt(str) + 1);
        }).cache();
        executeAndVerifyResult(path, cache, "2", "3", "4");
        executeAndVerifyResult(path, cache, "2", "3", "4");
    }

    @Test
    void testInvalidateCache(@TempDir Path path) throws Exception {
        File prepareTestData = prepareTestData(path);
        CachedDataStream cache = this.env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(), new org.apache.flink.core.fs.Path[]{new org.apache.flink.core.fs.Path(prepareTestData.getPath())}).build(), WatermarkStrategy.noWatermarks(), "source").map(str -> {
            return Integer.valueOf(Integer.parseInt(str) + 1);
        }).cache();
        executeAndVerifyResult(path, cache, "2", "3", "4");
        cache.invalidate();
        Assertions.assertThat(prepareTestData.delete()).isTrue();
        FileWriter fileWriter = new FileWriter(prepareTestData);
        Throwable th = null;
        try {
            try {
                fileWriter.write("4\n5\n6\n");
                if (fileWriter != null) {
                    if (0 != 0) {
                        try {
                            fileWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileWriter.close();
                    }
                }
                executeAndVerifyResult(path, cache, "5", "6", "7");
            } finally {
            }
        } catch (Throwable th3) {
            if (fileWriter != null) {
                if (th != null) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testBatchProduceCacheStreamConsume(@TempDir Path path) throws Exception {
        CachedDataStream cache = this.env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(), new org.apache.flink.core.fs.Path[]{new org.apache.flink.core.fs.Path(prepareTestData(path).getPath())}).build(), WatermarkStrategy.noWatermarks(), "source").map(Integer::parseInt).map(num -> {
            return Integer.valueOf(num.intValue() + 1);
        }).cache();
        executeAndVerifyResult(path, cache, "2", "3", "4");
        this.env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executeAndVerifyResult(path, cache.map(num2 -> {
            return Integer.valueOf(num2.intValue() + 1);
        }), "3", "4", "5");
    }

    @Test
    void testCacheProduceAndConsumeWithDifferentPartitioner(@TempDir Path path) throws Exception {
        CachedDataStream cache = this.env.fromData(new Tuple2[]{new Tuple2(1, 1), new Tuple2(2, 1), new Tuple2(2, 1)}).cache();
        executeAndVerifyResult(path, cache.keyBy(tuple2 -> {
            return (Integer) tuple2.f0;
        }).reduce((tuple22, tuple23) -> {
            return new Tuple2(tuple22.f0, Integer.valueOf(((Integer) tuple22.f1).intValue() + ((Integer) tuple23.f1).intValue()));
        }), "(1,1)", "(2,2)");
        executeAndVerifyResult(path, cache.keyBy(tuple24 -> {
            return (Integer) tuple24.f1;
        }).reduce((tuple25, tuple26) -> {
            return new Tuple2(Integer.valueOf(((Integer) tuple25.f0).intValue() + ((Integer) tuple26.f0).intValue()), tuple25.f1);
        }), "(5,1)");
    }

    @Test
    void testCacheSideOutput(@TempDir Path path) throws Exception {
        final OutputTag<Integer> outputTag = new OutputTag<Integer>("2") { // from class: org.apache.flink.test.streaming.runtime.CacheITCase.1
        };
        CachedDataStream cache = this.env.fromData(new Tuple2[]{new Tuple2(1, 1), new Tuple2(2, 1), new Tuple2(2, 2)}).process(new ProcessFunction<Tuple2<Integer, Integer>, Integer>() { // from class: org.apache.flink.test.streaming.runtime.CacheITCase.2
            public void processElement(Tuple2<Integer, Integer> tuple2, ProcessFunction<Tuple2<Integer, Integer>, Integer>.Context context, Collector<Integer> collector) {
                if (((Integer) tuple2.f0).intValue() == 2) {
                    context.output(outputTag, tuple2.f1);
                } else {
                    collector.collect(tuple2.f1);
                }
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<Integer, Integer>) obj, (ProcessFunction<Tuple2<Integer, Integer>, Integer>.Context) context, (Collector<Integer>) collector);
            }
        }).getSideOutput(outputTag).cache();
        executeAndVerifyResult(path, cache, "1", "2");
        executeAndVerifyResult(path, cache, "1", "2");
    }

    @Test
    void testRetryOnCorruptedClusterDataset(@TempDir Path path) throws Exception {
        File prepareTestData = prepareTestData(path);
        CachedDataStream cache = this.env.fromSource(FileSource.forRecordStreamFormat(new TextLineInputFormat(), new org.apache.flink.core.fs.Path[]{new org.apache.flink.core.fs.Path(prepareTestData.getPath())}).build(), WatermarkStrategy.noWatermarks(), "source").map(str -> {
            return Integer.valueOf(Integer.parseInt(str) + 1);
        }).cache();
        executeAndVerifyResult(path, cache, "2", "3", "4");
        AbstractID datasetId = cache.getTransformation().getDatasetId();
        Assertions.assertThat(prepareTestData.delete()).isTrue();
        FileWriter fileWriter = new FileWriter(prepareTestData);
        Throwable th = null;
        try {
            try {
                fileWriter.write("4\n5\n6\n");
                if (fileWriter != null) {
                    if (0 != 0) {
                        try {
                            fileWriter.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileWriter.close();
                    }
                }
                executeAndVerifyResult(path, cache.flatMap((num, collector) -> {
                    if (num.intValue() < 5) {
                        throw new ClusterDatasetCorruptedException((Throwable) null, Collections.singletonList(new IntermediateDataSetID(datasetId)));
                    }
                    collector.collect(num);
                }).returns(Integer.class), "5", "6", "7");
            } finally {
            }
        } catch (Throwable th3) {
            if (fileWriter != null) {
                if (th != null) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileWriter.close();
                }
            }
            throw th3;
        }
    }

    private <T> void executeAndVerifyResult(Path path, DataStream<T> dataStream, String... strArr) throws Exception {
        File file = new File(path.toFile(), UUID.randomUUID().toString());
        dataStream.sinkTo(getFileSink(file));
        this.env.execute();
        Assertions.assertThat(getFileContent(file)).containsExactlyInAnyOrder(strArr);
    }

    private <T> FileSink<T> getFileSink(File file) {
        return FileSink.forRowFormat(new org.apache.flink.core.fs.Path(file.getPath()), new SimpleStringEncoder()).build();
    }

    private static List<String> getFileContent(File file) throws IOException {
        ArrayList arrayList = new ArrayList();
        Iterator it = FileUtils.listFiles(file, (String[]) null, true).iterator();
        while (it.hasNext()) {
            arrayList.addAll(Arrays.asList(FileUtils.readFileToString((File) it.next()).split("\n")));
        }
        return arrayList;
    }

    private File prepareTestData(Path path) throws IOException {
        File file = new File(path.toFile(), UUID.randomUUID().toString());
        FileWriter fileWriter = new FileWriter(file);
        Throwable th = null;
        try {
            fileWriter.write("1\n2\n3\n");
            if (fileWriter != null) {
                if (0 != 0) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fileWriter.close();
                }
            }
            return file;
        } catch (Throwable th3) {
            if (fileWriter != null) {
                if (0 != 0) {
                    try {
                        fileWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileWriter.close();
                }
            }
            throw th3;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1819860080:
                if (implMethodName.equals("lambda$testCacheProduceAndConsumeWithDifferentPartitioner$6a25f162$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1780778342:
                if (implMethodName.equals("lambda$testBatchProduceCacheStreamConsume$b011a281$1")) {
                    z = true;
                    break;
                }
                break;
            case -1672960873:
                if (implMethodName.equals("lambda$testRetryOnCorruptedClusterDataset$b011a281$1")) {
                    z = 10;
                    break;
                }
                break;
            case -680349476:
                if (implMethodName.equals("lambda$testCacheProduceAndConsumeWithDifferentPartitioner$119b7c9e$1")) {
                    z = 8;
                    break;
                }
                break;
            case 372908369:
                if (implMethodName.equals("lambda$testInvalidateCache$b011a281$1")) {
                    z = 6;
                    break;
                }
                break;
            case 767396434:
                if (implMethodName.equals("lambda$testBatchProduceCacheStreamConsume$4771a103$1")) {
                    z = 9;
                    break;
                }
                break;
            case 1187783740:
                if (implMethodName.equals("parseInt")) {
                    z = false;
                    break;
                }
                break;
            case 1328902770:
                if (implMethodName.equals("lambda$testCacheProduceAndConsumeWithDifferentPartitioner$e170774f$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1537856859:
                if (implMethodName.equals("lambda$testRetryOnCorruptedClusterDataset$96f20b45$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1656752797:
                if (implMethodName.equals("lambda$testCacheProduceAndConsumeWithDifferentPartitioner$9c53bd91$1")) {
                    z = 7;
                    break;
                }
                break;
            case 2128512727:
                if (implMethodName.equals("lambda$testCacheProduceAndConsume$b011a281$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)I")) {
                    return Integer::parseInt;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(num.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/util/AbstractID;Ljava/lang/Integer;Lorg/apache/flink/util/Collector;)V")) {
                    AbstractID abstractID = (AbstractID) serializedLambda.getCapturedArg(0);
                    return (num2, collector) -> {
                        if (num2.intValue() < 5) {
                            throw new ClusterDatasetCorruptedException((Throwable) null, Collections.singletonList(new IntermediateDataSetID(abstractID)));
                        }
                        collector.collect(num2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return (tuple22, tuple23) -> {
                        return new Tuple2(tuple22.f0, Integer.valueOf(((Integer) tuple22.f1).intValue() + ((Integer) tuple23.f1).intValue()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple24 -> {
                        return (Integer) tuple24.f1;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Integer;")) {
                    return str -> {
                        return Integer.valueOf(Integer.parseInt(str) + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Integer;")) {
                    return str2 -> {
                        return Integer.valueOf(Integer.parseInt(str2) + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/ReduceFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("reduce") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;Lorg/apache/flink/api/java/tuple/Tuple2;)Lorg/apache/flink/api/java/tuple/Tuple2;")) {
                    return (tuple25, tuple26) -> {
                        return new Tuple2(Integer.valueOf(((Integer) tuple25.f0).intValue() + ((Integer) tuple26.f0).intValue()), tuple25.f1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num22 -> {
                        return Integer.valueOf(num22.intValue() + 1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("map") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/streaming/runtime/CacheITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Integer;")) {
                    return str3 -> {
                        return Integer.valueOf(Integer.parseInt(str3) + 1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
