package org.apache.flink.streaming.connectors.elasticsearch;

import java.lang.AutoCloseable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
import org.apache.flink.test.util.AbstractTestBase;
import org.elasticsearch.client.Client;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkTestBase.class */
public abstract class ElasticsearchSinkTestBase<C extends AutoCloseable, A> extends AbstractTestBase {
    protected abstract Client getClient();

    protected abstract String getClusterName();

    public void runElasticsearchSinkTest() throws Exception {
        runElasticSearchSinkTest("elasticsearch-sink-test-json-index", SourceSinkDataTestKit::getJsonSinkFunction);
    }

    public void runElasticsearchSinkCborTest() throws Exception {
        runElasticSearchSinkTest("elasticsearch-sink-test-cbor-index", SourceSinkDataTestKit::getCborSinkFunction);
    }

    public void runElasticsearchSinkSmileTest() throws Exception {
        runElasticSearchSinkTest("elasticsearch-sink-test-smile-index", SourceSinkDataTestKit::getSmileSinkFunction);
    }

    public void runElasticsearchSinkYamlTest() throws Exception {
        runElasticSearchSinkTest("elasticsearch-sink-test-yaml-index", SourceSinkDataTestKit::getYamlSinkFunction);
    }

    private void runElasticSearchSinkTest(String str, Function<String, ElasticsearchSinkFunction<Tuple2<Integer, String>>> function) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()).addSink(createElasticsearchSinkForEmbeddedNode(1, getClusterName(), function.apply(str)));
        executionEnvironment.execute("Elasticsearch Sink Test");
        Client client = getClient();
        SourceSinkDataTestKit.verifyProducedSinkData(client, str);
        client.close();
    }

    public void runNullAddressesTest() {
        try {
            createElasticsearchSink(1, getClusterName(), null, SourceSinkDataTestKit.getJsonSinkFunction("test"));
            Assert.fail();
        } catch (IllegalArgumentException | NullPointerException e) {
        }
    }

    public void runEmptyAddressesTest() {
        try {
            createElasticsearchSink(1, getClusterName(), Collections.emptyList(), SourceSinkDataTestKit.getJsonSinkFunction("test"));
            Assert.fail();
        } catch (IllegalArgumentException e) {
        }
    }

    public void runInvalidElasticsearchClusterTest() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()).addSink(createElasticsearchSinkForNode(1, "invalid-cluster-name", SourceSinkDataTestKit.getJsonSinkFunction("test"), "123.123.123.123"));
        try {
            executionEnvironment.execute("Elasticsearch Sink Test");
            Assert.fail();
        } catch (JobExecutionException e) {
        }
    }

    protected Map<String, String> createUserConfig(int i, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("cluster.name", str);
        hashMap.put("bulk.flush.max.actions", String.valueOf(i));
        hashMap.put("transport.tcp.connect_timeout", "5s");
        return hashMap;
    }

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSink(int i, String str, List<A> list, ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction);

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForEmbeddedNode(int i, String str, ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction) throws Exception;

    protected abstract ElasticsearchSinkBase<Tuple2<Integer, String>, C> createElasticsearchSinkForNode(int i, String str, ElasticsearchSinkFunction<Tuple2<Integer, String>> elasticsearchSinkFunction, String str2) throws Exception;
}
