package org.apache.flink.streaming.connectors.opensearch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
import org.apache.flink.connector.opensearch.test.DockerImageVersions;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.opensearch.OpensearchSink;
import org.apache.flink.streaming.connectors.opensearch.testutils.SourceSinkDataTestKit;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.http.HttpHost;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.testcontainers.OpensearchContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/streaming/connectors/opensearch/OpensearchSinkITCase.class */
public class OpensearchSinkITCase extends AbstractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(OpensearchSinkITCase.class);

    @Container
    private static final OpensearchContainer OS_CONTAINER = OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);

    @Test
    public void testOpensearchSink() throws Exception {
        runOpensearchSinkTest("opensearch-sink-test-json-index", SourceSinkDataTestKit::getJsonSinkFunction);
    }

    @Test
    public void testOpensearchSinkWithSmile() throws Exception {
        runOpensearchSinkTest("opensearch-sink-test-smile-index", SourceSinkDataTestKit::getSmileSinkFunction);
    }

    @Test
    public void testNullAddresses() {
        Assertions.assertThatThrownBy(() -> {
            createOpensearchSink(1, null, SourceSinkDataTestKit.getJsonSinkFunction("test"));
        }).isInstanceOfAny(new Class[]{IllegalArgumentException.class, NullPointerException.class});
    }

    @Test
    public void testEmptyAddresses() {
        Assertions.assertThatThrownBy(() -> {
            createOpensearchSink(1, Collections.emptyList(), SourceSinkDataTestKit.getJsonSinkFunction("test"));
        }).isInstanceOf(IllegalArgumentException.class);
    }

    @Test
    public void testInvalidOpensearchCluster() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()).addSink(createOpensearchSinkForNode(1, SourceSinkDataTestKit.getJsonSinkFunction("test"), "123.123.123.123"));
        Assertions.assertThatThrownBy(() -> {
            executionEnvironment.execute("Opensearch Sink Test");
        }).isInstanceOf(JobExecutionException.class).hasCauseInstanceOf(JobException.class);
    }

    private OpensearchSink<Tuple2<Integer, String>> createOpensearchSink(int i, List<HttpHost> list, OpensearchSinkFunction<Tuple2<Integer, String>> opensearchSinkFunction) {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(list, opensearchSinkFunction);
        builder.setBulkFlushMaxActions(i);
        return builder.build();
    }

    private OpensearchSink<Tuple2<Integer, String>> createOpensearchSinkForNode(int i, OpensearchSinkFunction<Tuple2<Integer, String>> opensearchSinkFunction, String str) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(HttpHost.create(str));
        OpensearchSink.Builder builder = new OpensearchSink.Builder(arrayList, opensearchSinkFunction);
        builder.setBulkFlushMaxActions(i);
        builder.setRestClientFactory(OpensearchUtil.createClientFactory(OS_CONTAINER));
        return builder.build();
    }

    private void runOpensearchSinkTest(String str, Function<String, OpensearchSinkFunction<Tuple2<Integer, String>>> function) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()).addSink(createOpensearchSinkForNode(1, function.apply(str), OS_CONTAINER.getHttpHostAddress()));
        executionEnvironment.execute("Opensearch Sink Test");
        RestHighLevelClient createClient = OpensearchUtil.createClient(OS_CONTAINER);
        SourceSinkDataTestKit.verifyProducedSinkData(createClient, str);
        createClient.close();
    }
}
