package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase.class */
abstract class ElasticsearchSinkBaseITCase {
    protected static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class);
    protected static final String ELASTICSEARCH_PASSWORD = "test-password";
    protected static final String ELASTICSEARCH_USER = "elastic";
    private static boolean failed;
    private RestHighLevelClient client;
    private TestClientBase context;

    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBaseITCase$FailingMapper.class */
    private static class FailingMapper implements MapFunction<Long, Long>, CheckpointListener {
        private int emittedRecords;

        private FailingMapper() {
            this.emittedRecords = 0;
        }

        public Long map(Long l) throws Exception {
            Thread.sleep(50L);
            this.emittedRecords++;
            return l;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (ElasticsearchSinkBaseITCase.failed || this.emittedRecords == 0) {
                return;
            }
            boolean unused = ElasticsearchSinkBaseITCase.failed = true;
            throw new Exception("Expected failure");
        }
    }

    ElasticsearchSinkBaseITCase() {
    }

    abstract String getElasticsearchHttpHostAddress();

    abstract TestClientBase createTestClient(RestHighLevelClient restHighLevelClient);

    abstract ElasticsearchSinkBuilderBase<Tuple2<Integer, String>, ? extends ElasticsearchSinkBuilderBase> getSinkBuilder();

    private RestHighLevelClient createRestHighLevelClient() {
        BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
        basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD));
        return new RestHighLevelClient(RestClient.builder(new HttpHost[]{HttpHost.create(getElasticsearchHttpHostAddress())}).setHttpClientConfigCallback(httpAsyncClientBuilder -> {
            return httpAsyncClientBuilder.setDefaultCredentialsProvider(basicCredentialsProvider);
        }));
    }

    @BeforeEach
    void setUp() {
        failed = false;
        this.client = createRestHighLevelClient();
        this.context = createTestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @EnumSource(DeliveryGuarantee.class)
    @ParameterizedTest
    void testWriteToElasticSearchWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception {
        boolean z = false;
        try {
            try {
                runTest("test-es-with-delivery-" + deliveryGuarantee, false, TestEmitter::jsonEmitter, deliveryGuarantee, null);
                Assertions.assertEquals(false, Boolean.valueOf(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE));
            } catch (IllegalStateException e) {
                z = true;
                Assertions.assertSame(deliveryGuarantee, DeliveryGuarantee.EXACTLY_ONCE);
                Assertions.assertEquals(true, Boolean.valueOf(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE));
            }
        } catch (Throwable th) {
            Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE));
            throw th;
        }
    }

    @MethodSource({"elasticsearchEmitters"})
    @ParameterizedTest
    void testWriteJsonToElasticsearch(BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> biFunction) throws Exception {
        runTest("test-elasticsearch-sink-" + UUID.randomUUID(), false, biFunction, null);
    }

    @Test
    void testRecovery() throws Exception {
        runTest("test-recovery-elasticsearch-sink", true, TestEmitter::jsonEmitter, new FailingMapper());
        Assertions.assertTrue(failed);
    }

    private void runTest(String str, boolean z, BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> biFunction, @Nullable MapFunction<Long, Long> mapFunction) throws Exception {
        runTest(str, z, biFunction, DeliveryGuarantee.AT_LEAST_ONCE, mapFunction);
    }

    private void runTest(String str, boolean z, BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> biFunction, DeliveryGuarantee deliveryGuarantee, @Nullable MapFunction<Long, Long> mapFunction) throws Exception {
        ElasticsearchSink build = getSinkBuilder().setHosts(new HttpHost[]{HttpHost.create(getElasticsearchHttpHostAddress())}).setEmitter(biFunction.apply(str, this.context.getDataFieldName())).setBulkFlushMaxActions(5).setConnectionUsername(ELASTICSEARCH_USER).setConnectionPassword(ELASTICSEARCH_PASSWORD).setDeliveryGuarantee(deliveryGuarantee).build();
        LocalStreamEnvironment localStreamEnvironment = new LocalStreamEnvironment();
        localStreamEnvironment.enableCheckpointing(100L);
        if (!z) {
            localStreamEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        }
        DataStream fromSequence = localStreamEnvironment.fromSequence(1L, 5L);
        if (mapFunction != null) {
            fromSequence = fromSequence.map(mapFunction);
        }
        fromSequence.map(new MapFunction<Long, Tuple2<Integer, String>>() { // from class: org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBaseITCase.1
            public Tuple2<Integer, String> map(Long l) throws Exception {
                return Tuple2.of(Integer.valueOf(l.intValue()), TestClientBase.buildMessage(l.intValue()));
            }
        }).sinkTo(build);
        localStreamEnvironment.execute();
        this.context.assertThatIdsAreWritten(str, 1, 2, 3, 4, 5);
    }

    private static List<BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>>> elasticsearchEmitters() {
        return Lists.newArrayList(new BiFunction[]{TestEmitter::jsonEmitter, TestEmitter::smileEmitter});
    }
}
