/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSinkBuilderBase;
import org.apache.flink.connector.elasticsearch.sink.TestClientBase;
import org.apache.flink.connector.elasticsearch.sink.TestEmitter;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.Credentials;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
abstract class ElasticsearchSinkBaseITCase {
    protected static final Logger LOG = LoggerFactory.getLogger(ElasticsearchSinkBaseITCase.class);
    protected static final String ELASTICSEARCH_PASSWORD = "test-password";
    protected static final String ELASTICSEARCH_USER = "elastic";
    private static boolean failed;
    private RestHighLevelClient client;
    private TestClientBase context;

    ElasticsearchSinkBaseITCase() {
    }

    abstract String getElasticsearchHttpHostAddress();

    abstract TestClientBase createTestClient(RestHighLevelClient var1);

    abstract ElasticsearchSinkBuilderBase<Tuple2<Integer, String>, ? extends ElasticsearchSinkBuilderBase> getSinkBuilder();

    private RestHighLevelClient createRestHighLevelClient() {
        BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, (Credentials)new UsernamePasswordCredentials(ELASTICSEARCH_USER, ELASTICSEARCH_PASSWORD));
        return new RestHighLevelClient(RestClient.builder((HttpHost[])new HttpHost[]{HttpHost.create((String)this.getElasticsearchHttpHostAddress())}).setHttpClientConfigCallback(arg_0 -> ElasticsearchSinkBaseITCase.lambda$createRestHighLevelClient$0((CredentialsProvider)credentialsProvider, arg_0)));
    }

    @BeforeEach
    void setUp() {
        failed = false;
        this.client = this.createRestHighLevelClient();
        this.context = this.createTestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @EnumSource(value=DeliveryGuarantee.class)
    void testWriteToElasticSearchWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception {
        String index = "test-es-with-delivery-" + deliveryGuarantee;
        boolean failure = false;
        try {
            this.runTest(index, false, TestEmitter::jsonEmitter, deliveryGuarantee, null);
        }
        catch (IllegalStateException e) {
            try {
                failure = true;
                Assertions.assertSame((Object)deliveryGuarantee, (Object)DeliveryGuarantee.EXACTLY_ONCE);
            }
            catch (Throwable throwable) {
                Assertions.assertEquals((Object)failure, (Object)(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? 1 : 0));
                throw throwable;
            }
            Assertions.assertEquals((Object)failure, (Object)(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? 1 : 0));
        }
        Assertions.assertEquals((Object)failure, (Object)(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE ? 1 : 0));
    }

    @ParameterizedTest
    @MethodSource(value={"elasticsearchEmitters"})
    void testWriteJsonToElasticsearch(BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> emitterProvider) throws Exception {
        String index = "test-elasticsearch-sink-" + UUID.randomUUID();
        this.runTest(index, false, emitterProvider, null);
    }

    @Test
    void testRecovery() throws Exception {
        String index = "test-recovery-elasticsearch-sink";
        this.runTest("test-recovery-elasticsearch-sink", true, TestEmitter::jsonEmitter, new FailingMapper());
        Assertions.assertTrue((boolean)failed);
    }

    private void runTest(String index, boolean allowRestarts, BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> emitterProvider, @Nullable MapFunction<Long, Long> additionalMapper) throws Exception {
        this.runTest(index, allowRestarts, emitterProvider, DeliveryGuarantee.AT_LEAST_ONCE, additionalMapper);
    }

    private void runTest(String index, boolean allowRestarts, BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>> emitterProvider, DeliveryGuarantee deliveryGuarantee, @Nullable MapFunction<Long, Long> additionalMapper) throws Exception {
        ElasticsearchSink sink = this.getSinkBuilder().setHosts(new HttpHost[]{HttpHost.create((String)this.getElasticsearchHttpHostAddress())}).setEmitter(emitterProvider.apply(index, this.context.getDataFieldName())).setBulkFlushMaxActions(5).setConnectionUsername(ELASTICSEARCH_USER).setConnectionPassword(ELASTICSEARCH_PASSWORD).setDeliveryGuarantee(deliveryGuarantee).build();
        LocalStreamEnvironment env = new LocalStreamEnvironment();
        env.enableCheckpointing(100L);
        if (!allowRestarts) {
            env.setRestartStrategy(RestartStrategies.noRestart());
        }
        DataStreamSource stream = env.fromSequence(1L, 5L);
        if (additionalMapper != null) {
            stream = stream.map(additionalMapper);
        }
        stream.map((MapFunction)new MapFunction<Long, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> map(Long value) throws Exception {
                return Tuple2.of((Object)value.intValue(), (Object)TestClientBase.buildMessage(value.intValue()));
            }
        }).sinkTo((Sink)sink);
        env.execute();
        this.context.assertThatIdsAreWritten(index, 1, 2, 3, 4, 5);
    }

    private static List<BiFunction<String, String, ElasticsearchEmitter<Tuple2<Integer, String>>>> elasticsearchEmitters() {
        return Lists.newArrayList((Object[])new BiFunction[]{TestEmitter::jsonEmitter, TestEmitter::smileEmitter});
    }

    private static /* synthetic */ HttpAsyncClientBuilder lambda$createRestHighLevelClient$0(CredentialsProvider credentialsProvider, HttpAsyncClientBuilder httpClientBuilder) {
        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
    }

    private static class FailingMapper
    implements MapFunction<Long, Long>,
    CheckpointListener {
        private int emittedRecords = 0;

        private FailingMapper() {
        }

        public Long map(Long value) throws Exception {
            Thread.sleep(50L);
            ++this.emittedRecords;
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (failed || this.emittedRecords == 0) {
                return;
            }
            failed = true;
            throw new Exception("Expected failure");
        }
    }
}

