package org.apache.flink.test.distributedCache;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/distributedCache/DistributedCacheTest.class */
public class DistributedCacheTest extends AbstractTestBase {
    public static final String data = "machen\nzeit\nheerscharen\nkeiner\nmeine\n";
    private static final int PARALLELISM = 4;
    private static LocalFlinkMiniCluster cluster;

    /* loaded from: input_file:org/apache/flink/test/distributedCache/DistributedCacheTest$WordChecker.class */
    public static class WordChecker extends RichFlatMapFunction<String, Tuple1<String>> {
        private static final long serialVersionUID = 1;
        private final List<String> wordList = new ArrayList();

        public void open(Configuration configuration) throws IOException {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(getRuntimeContext().getDistributedCache().getFile("cache_test")));
            Throwable th = null;
            while (true) {
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        } else {
                            this.wordList.add(readLine);
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (bufferedReader != null) {
                        if (th != null) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    throw th3;
                }
            }
            if (bufferedReader != null) {
                if (0 == 0) {
                    bufferedReader.close();
                    return;
                }
                try {
                    bufferedReader.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }

        public void flatMap(String str, Collector<Tuple1<String>> collector) throws Exception {
            Assert.assertTrue("Unexpected word in stream! wordFromStream: " + str + ", shouldBeOneOf: " + this.wordList.toString(), this.wordList.contains(str));
            collector.collect(new Tuple1(str));
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<Tuple1<String>>) collector);
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        cluster = TestBaseUtils.startCluster(1, PARALLELISM, false, false, true);
        TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
        TestEnvironment.setAsContext(cluster, PARALLELISM);
    }

    @AfterClass
    public static void teardown() throws Exception {
        TestStreamEnvironment.unsetAsContext();
        TestEnvironment.unsetAsContext();
        TestBaseUtils.stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
    }

    public DistributedCacheTest() {
        super(new Configuration());
    }

    @Test
    public void testStreamingDistributedCache() throws Exception {
        String createTempFile = createTempFile("count.txt", data);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile(createTempFile, "cache_test");
        executionEnvironment.readTextFile(createTempFile).flatMap(new WordChecker());
        executionEnvironment.execute();
    }

    @Test
    public void testBatchDistributedCache() throws Exception {
        String createTempFile = createTempFile("count.txt", data);
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.registerCachedFile(createTempFile, "cache_test");
        executionEnvironment.readTextFile(createTempFile).flatMap(new WordChecker()).count();
    }
}
