/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
import org.apache.flink.test.util.AbstractTestBase;
import org.elasticsearch.client.Client;
import org.junit.Assert;

public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A>
extends AbstractTestBase {
    protected abstract Client getClient();

    protected abstract String getClusterName();

    public void runElasticsearchSinkTest() throws Exception {
        this.runElasticSearchSinkTest("elasticsearch-sink-test-json-index", SourceSinkDataTestKit::getJsonSinkFunction);
    }

    public void runElasticsearchSinkCborTest() throws Exception {
        this.runElasticSearchSinkTest("elasticsearch-sink-test-cbor-index", SourceSinkDataTestKit::getCborSinkFunction);
    }

    public void runElasticsearchSinkSmileTest() throws Exception {
        this.runElasticSearchSinkTest("elasticsearch-sink-test-smile-index", SourceSinkDataTestKit::getSmileSinkFunction);
    }

    public void runElasticsearchSinkYamlTest() throws Exception {
        this.runElasticSearchSinkTest("elasticsearch-sink-test-yaml-index", SourceSinkDataTestKit::getYamlSinkFunction);
    }

    private void runElasticSearchSinkTest(String index, Function<String, ElasticsearchSinkFunction<Tuple2<Integer, String>>> functionFactory) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new SourceSinkDataTestKit.TestDataSourceFunction());
        source.addSink(this.createElasticsearchSinkForEmbeddedNode(1, this.getClusterName(), functionFactory.apply(index)));
        env.execute("Elasticsearch Sink Test");
        Client client = this.getClient();
        SourceSinkDataTestKit.verifyProducedSinkData(client, index);
        client.close();
    }

    public void runNullAddressesTest() {
        try {
            this.createElasticsearchSink(1, this.getClusterName(), null, SourceSinkDataTestKit.getJsonSinkFunction("test"));
        }
        catch (IllegalArgumentException | NullPointerException expectedException) {
            return;
        }
        Assert.fail();
    }

    public void runEmptyAddressesTest() {
        try {
            this.createElasticsearchSink(1, this.getClusterName(), Collections.emptyList(), SourceSinkDataTestKit.getJsonSinkFunction("test"));
        }
        catch (IllegalArgumentException expectedException) {
            return;
        }
        Assert.fail();
    }

    public void runInvalidElasticsearchClusterTest() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.addSource((SourceFunction)new SourceSinkDataTestKit.TestDataSourceFunction());
        source.addSink(this.createElasticsearchSinkForNode(1, "invalid-cluster-name", SourceSinkDataTestKit.getJsonSinkFunction("test"), "123.123.123.123"));
        try {
            env.execute("Elasticsearch Sink Test");
        }
        catch (JobExecutionException expectedException) {
            return;
        }
        Assert.fail();
    }

    protected Map<String, String> createUserConfig(int bulkFlushMaxActions, String clusterName) {
        HashMap<String, String> userConfig = new HashMap<String, String>();
        userConfig.put("cluster.name", clusterName);
        userConfig.put("bulk.flush.max.actions", String.valueOf(bulkFlushMaxActions));
        userConfig.put("transport.tcp.connect_timeout", "5s");
        return userConfig;
    }

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink(int var1, String var2, List<A> var3, ElasticsearchSinkFunction<Tuple2<Integer, String>> var4);

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode(int var1, String var2, ElasticsearchSinkFunction<Tuple2<Integer, String>> var3) throws Exception;

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode(int var1, String var2, ElasticsearchSinkFunction<Tuple2<Integer, String>> var3, String var4) throws Exception;
}

