/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.WritableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamExecutionEnvironmentComplexConfigurationTest {
    @Test
    public void testLoadingStateBackendFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.setString("state.backend", "jobmanager");
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        StateBackend actualStateBackend = envFromConfiguration.getStateBackend();
        Assert.assertThat((Object)actualStateBackend, (Matcher)CoreMatchers.instanceOf(MemoryStateBackend.class));
    }

    @Test
    public void testLoadingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        envFromConfiguration.registerCachedFile("/tmp4", "file4", true);
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.cached-files", "name:file1,path:/tmp1,executable:true;name:file2,path:/tmp2;name:file3,path:'oss://bucket/file1'");
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertThat((Object)envFromConfiguration.getCachedFiles(), (Matcher)CoreMatchers.equalTo(Arrays.asList(Tuple2.of((Object)"file1", (Object)new DistributedCache.DistributedCacheEntry("/tmp1", Boolean.valueOf(true))), Tuple2.of((Object)"file2", (Object)new DistributedCache.DistributedCacheEntry("/tmp2", Boolean.valueOf(false))), Tuple2.of((Object)"file3", (Object)new DistributedCache.DistributedCacheEntry("oss://bucket/file1", Boolean.valueOf(false))))));
    }

    @Test
    public void testLoadingKryoSerializersFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        configuration.setString("pipeline.default-kryo-serializers", "class:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojo',serializer:'org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentComplexConfigurationTest$CustomPojoSerializer'");
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        LinkedHashMap<Class<CustomPojo>, Class<CustomPojoSerializer>> serializers = new LinkedHashMap<Class<CustomPojo>, Class<CustomPojoSerializer>>();
        serializers.put(CustomPojo.class, CustomPojoSerializer.class);
        Assert.assertThat((Object)envFromConfiguration.getConfig().getDefaultKryoSerializerClasses(), (Matcher)CoreMatchers.equalTo(serializers));
    }

    @Test
    public void testNotOverridingStateBackendWithDefaultsFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        envFromConfiguration.setStateBackend((AbstractStateBackend)new MemoryStateBackend());
        envFromConfiguration.configure((ReadableConfig)new Configuration(), Thread.currentThread().getContextClassLoader());
        StateBackend actualStateBackend = envFromConfiguration.getStateBackend();
        Assert.assertThat((Object)actualStateBackend, (Matcher)CoreMatchers.instanceOf(MemoryStateBackend.class));
    }

    @Test
    public void testNotOverridingCachedFilesFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        envFromConfiguration.registerCachedFile("/tmp3", "file3", true);
        Configuration configuration = new Configuration();
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertThat((Object)envFromConfiguration.getCachedFiles(), (Matcher)CoreMatchers.equalTo(Arrays.asList(Tuple2.of((Object)"file3", (Object)new DistributedCache.DistributedCacheEntry("/tmp3", Boolean.valueOf(true))))));
    }

    @Test
    public void testLoadingListenersFromConfiguration() {
        StreamExecutionEnvironment envFromConfiguration = StreamExecutionEnvironment.getExecutionEnvironment();
        List<Class> listenersClass = Arrays.asList(BasicJobSubmittedCounter.class, BasicJobExecutedCounter.class);
        Configuration configuration = new Configuration();
        ConfigUtils.encodeCollectionToConfig((WritableConfig)configuration, (ConfigOption)DeploymentOptions.JOB_LISTENERS, listenersClass, Class::getName);
        envFromConfiguration.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        Assert.assertEquals((long)envFromConfiguration.getJobListeners().size(), (long)2L);
        Assert.assertThat(envFromConfiguration.getJobListeners().get(0), (Matcher)CoreMatchers.instanceOf(BasicJobSubmittedCounter.class));
        Assert.assertThat(envFromConfiguration.getJobListeners().get(1), (Matcher)CoreMatchers.instanceOf(BasicJobExecutedCounter.class));
    }

    public static class CustomPojoSerializer
    extends Serializer<CustomPojo> {
        public void write(Kryo kryo, Output output, CustomPojo object) {
        }

        public CustomPojo read(Kryo kryo, Input input, Class<CustomPojo> type) {
            return null;
        }
    }

    public static class CustomPojo {
    }

    public static class BasicJobExecutedCounter
    implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
            ++this.count;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
        }
    }

    public static class BasicJobSubmittedCounter
    implements JobListener {
        private int count = 0;

        public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
            ++this.count;
        }

        public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
        }
    }
}

