package org.apache.flink.connector.opensearch.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.OpensearchUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.http.HttpHost;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.testcontainers.OpensearchContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase.class */
class OpensearchSinkITCase {
    private static boolean failed;
    private RestHighLevelClient client;
    private OpensearchTestClient context;
    protected static final Logger LOG = LoggerFactory.getLogger(OpensearchSinkITCase.class);

    @Container
    private static final OpensearchContainer OS_CONTAINER = OpensearchUtil.createOpensearchContainer("opensearchproject/opensearch:1.3.15", LOG);

    /* loaded from: input_file:org/apache/flink/connector/opensearch/sink/OpensearchSinkITCase$FailingMapper.class */
    private static class FailingMapper implements MapFunction<Long, Long>, CheckpointListener {
        private int emittedRecords;

        private FailingMapper() {
            this.emittedRecords = 0;
        }

        public Long map(Long l) throws Exception {
            Thread.sleep(50L);
            this.emittedRecords++;
            return l;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (OpensearchSinkITCase.failed || this.emittedRecords == 0) {
                return;
            }
            boolean unused = OpensearchSinkITCase.failed = true;
            throw new Exception("Expected failure");
        }
    }

    OpensearchSinkITCase() {
    }

    @BeforeEach
    void setUp() {
        failed = false;
        this.client = OpensearchUtil.createClient(OS_CONTAINER);
        this.context = new OpensearchTestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @EnumSource(DeliveryGuarantee.class)
    @ParameterizedTest
    void testWriteToOpensearchWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception {
        boolean z = false;
        try {
            try {
                runTest("test-opensearch-with-delivery-" + deliveryGuarantee, false, TestEmitter::jsonEmitter, deliveryGuarantee, null);
                Assertions.assertThat(false).isEqualTo(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
            } catch (IllegalStateException e) {
                z = true;
                Assertions.assertThat(deliveryGuarantee).isSameAs(DeliveryGuarantee.EXACTLY_ONCE);
                Assertions.assertThat(true).isEqualTo(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
            }
        } catch (Throwable th) {
            Assertions.assertThat(z).isEqualTo(deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
            throw th;
        }
    }

    @MethodSource({"opensearchEmitters"})
    @ParameterizedTest
    void testWriteJsonToOpensearch(BiFunction<String, String, OpensearchEmitter<Tuple2<Integer, String>>> biFunction) throws Exception {
        runTest("test-opensearch-sink-" + UUID.randomUUID(), false, biFunction, null);
    }

    @Test
    void testRecovery() throws Exception {
        runTest("test-recovery-opensearch-sink", true, TestEmitter::jsonEmitter, new FailingMapper());
        Assertions.assertThat(failed).isTrue();
    }

    private void runTest(String str, boolean z, BiFunction<String, String, OpensearchEmitter<Tuple2<Integer, String>>> biFunction, @Nullable MapFunction<Long, Long> mapFunction) throws Exception {
        runTest(str, z, biFunction, DeliveryGuarantee.AT_LEAST_ONCE, mapFunction);
    }

    private void runTest(String str, boolean z, BiFunction<String, String, OpensearchEmitter<Tuple2<Integer, String>>> biFunction, DeliveryGuarantee deliveryGuarantee, @Nullable MapFunction<Long, Long> mapFunction) throws Exception {
        OpensearchSink build = new OpensearchSinkBuilder().setHosts(new HttpHost[]{HttpHost.create(OS_CONTAINER.getHttpHostAddress())}).setEmitter(biFunction.apply(str, this.context.getDataFieldName())).setBulkFlushMaxActions(5).setConnectionUsername(OS_CONTAINER.getUsername()).setConnectionPassword(OS_CONTAINER.getPassword()).setDeliveryGuarantee(deliveryGuarantee).setAllowInsecure(true).build();
        LocalStreamEnvironment localStreamEnvironment = new LocalStreamEnvironment();
        localStreamEnvironment.enableCheckpointing(100L);
        if (!z) {
            localStreamEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        }
        DataStream fromSequence = localStreamEnvironment.fromSequence(1L, 5L);
        if (mapFunction != null) {
            fromSequence = fromSequence.map(mapFunction);
        }
        fromSequence.map(new MapFunction<Long, Tuple2<Integer, String>>() { // from class: org.apache.flink.connector.opensearch.sink.OpensearchSinkITCase.1
            public Tuple2<Integer, String> map(Long l) throws Exception {
                return Tuple2.of(Integer.valueOf(l.intValue()), OpensearchTestClient.buildMessage(l.intValue()));
            }
        }).sinkTo(build);
        localStreamEnvironment.execute();
        this.context.assertThatIdsAreWritten(str, 1, 2, 3, 4, 5);
    }

    private static List<BiFunction<String, String, OpensearchEmitter<Tuple2<Integer, String>>>> opensearchEmitters() {
        return Arrays.asList(TestEmitter::jsonEmitter, TestEmitter::smileEmitter);
    }
}
