/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorBuilderFactory;
import org.apache.flink.connector.elasticsearch.sink.BulkProcessorConfig;
import org.apache.flink.connector.elasticsearch.sink.BulkRequestConsumerFactory;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchEmitter;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter;
import org.apache.flink.connector.elasticsearch.sink.FlushBackoffType;
import org.apache.flink.connector.elasticsearch.sink.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.sink.RequestIndexer;
import org.apache.flink.connector.elasticsearch.sink.TestClientBase;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@ExtendWith(value={TestLoggerExtension.class})
class ElasticsearchWriterITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriterITCase.class);
    @Container
    private static final ElasticsearchContainer ES_CONTAINER = ElasticsearchUtil.createElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.10.2", LOG);
    private RestHighLevelClient client;
    private TestClientBase context;
    private MetricListener metricListener;

    ElasticsearchWriterITCase() {
    }

    @BeforeEach
    void setUp() {
        this.metricListener = new MetricListener();
        this.client = new RestHighLevelClient(RestClient.builder((HttpHost[])new HttpHost[]{HttpHost.create((String)ES_CONTAINER.getHttpHostAddress())}));
        this.context = new TestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @Test
    void testWriteOnBulkFlush() throws Exception {
        String index = "test-bulk-flush-without-checkpoint";
        int flushAfterNActions = 5;
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(5, -1, -1L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-bulk-flush-without-checkpoint", false, bulkProcessorConfig);){
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.write((Object)Tuple2.of((Object)3, (Object)TestClientBase.buildMessage(3)), null);
            writer.write((Object)Tuple2.of((Object)4, (Object)TestClientBase.buildMessage(4)), null);
            writer.flush(false);
            this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4);
            writer.write((Object)Tuple2.of((Object)5, (Object)"test-5"), null);
            this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5);
            writer.write((Object)Tuple2.of((Object)6, (Object)"test-6"), null);
            this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 6);
            writer.blockingFlushAllActions();
            this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5, 6);
        }
    }

    @Test
    void testWriteOnBulkIntervalFlush() throws Exception {
        String index = "test-bulk-flush-with-interval";
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(-1, -1, 1000L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-bulk-flush-with-interval", false, bulkProcessorConfig);){
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.write((Object)Tuple2.of((Object)3, (Object)TestClientBase.buildMessage(3)), null);
            writer.write((Object)Tuple2.of((Object)4, (Object)TestClientBase.buildMessage(4)), null);
            writer.blockingFlushAllActions();
        }
        this.context.assertThatIdsAreWritten("test-bulk-flush-with-interval", 1, 2, 3, 4);
    }

    @Test
    void testWriteOnCheckpoint() throws Exception {
        String index = "test-bulk-flush-with-checkpoint";
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(-1, -1, -1L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-bulk-flush-with-checkpoint", true, bulkProcessorConfig);){
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.write((Object)Tuple2.of((Object)3, (Object)TestClientBase.buildMessage(3)), null);
            this.context.assertThatIdsAreNotWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
            writer.flush(false);
            this.context.assertThatIdsAreWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
        }
    }

    @Test
    void testIncrementByteOutMetric() throws Exception {
        String index = "test-inc-byte-out";
        InternalOperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
        InternalSinkWriterMetricGroup metricGroup = InternalSinkWriterMetricGroup.mock((MetricGroup)this.metricListener.getMetricGroup(), (OperatorIOMetricGroup)operatorIOMetricGroup);
        int flushAfterNActions = 2;
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-inc-byte-out", false, bulkProcessorConfig, (SinkWriterMetricGroup)metricGroup);){
            Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
            Assertions.assertEquals((long)numBytesOut.getCount(), (long)0L);
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.blockingFlushAllActions();
            long first = numBytesOut.getCount();
            Assertions.assertTrue((first > 0L ? 1 : 0) != 0);
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.blockingFlushAllActions();
            Assertions.assertTrue((numBytesOut.getCount() > first ? 1 : 0) != 0);
        }
    }

    @Test
    void testIncrementRecordsSendMetric() throws Exception {
        String index = "test-inc-records-send";
        int flushAfterNActions = 2;
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-inc-records-send", false, bulkProcessorConfig);){
            Optional recordsSend = this.metricListener.getCounter(new String[]{"numRecordsSend"});
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)1, (Object)("u" + TestClientBase.buildMessage(2))), null);
            writer.write((Object)Tuple2.of((Object)1, (Object)("d" + TestClientBase.buildMessage(3))), null);
            writer.blockingFlushAllActions();
            Assertions.assertTrue((boolean)recordsSend.isPresent());
            Assertions.assertEquals((long)((Counter)recordsSend.get()).getCount(), (long)3L);
        }
    }

    @Test
    void testCurrentSendTime() throws Exception {
        String index = "test-current-send-time";
        int flushAfterNActions = 2;
        BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L);
        try (ElasticsearchWriter<Tuple2<Integer, String>> writer = this.createWriter("test-current-send-time", false, bulkProcessorConfig);){
            Optional currentSendTime = this.metricListener.getGauge(new String[]{"currentSendTime"});
            writer.write((Object)Tuple2.of((Object)1, (Object)TestClientBase.buildMessage(1)), null);
            writer.write((Object)Tuple2.of((Object)2, (Object)TestClientBase.buildMessage(2)), null);
            writer.blockingFlushAllActions();
            Assertions.assertTrue((boolean)currentSendTime.isPresent());
            MatcherAssert.assertThat((Object)((Gauge)currentSendTime.get()).getValue(), (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
        }
    }

    private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig) {
        return this.createWriter(index, flushOnCheckpoint, bulkProcessorConfig, (SinkWriterMetricGroup)InternalSinkWriterMetricGroup.mock((MetricGroup)this.metricListener.getMetricGroup()));
    }

    private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(String index, boolean flushOnCheckpoint, BulkProcessorConfig bulkProcessorConfig, SinkWriterMetricGroup metricGroup) {
        return new ElasticsearchWriter(Collections.singletonList(HttpHost.create((String)ES_CONTAINER.getHttpHostAddress())), (ElasticsearchEmitter)new UpdatingEmitter(index, this.context.getDataFieldName()), flushOnCheckpoint, bulkProcessorConfig, (BulkProcessorBuilderFactory)new TestBulkProcessorBuilderFactory(), new NetworkClientConfig(null, null, null, null, null, null), metricGroup, (MailboxExecutor)new TestMailbox());
    }

    private static class TestMailbox
    implements MailboxExecutor {
        private TestMailbox() {
        }

        public void execute(ThrowingRunnable<? extends Exception> command, String descriptionFormat, Object ... descriptionArgs) {
            try {
                command.run();
            }
            catch (Exception e) {
                throw new RuntimeException("Unexpected error", e);
            }
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
            Thread.sleep(100L);
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }
    }

    private static class TestClient
    extends TestClientBase {
        TestClient(RestHighLevelClient client) {
            super(client);
        }

        @Override
        GetResponse getResponse(String index, int id) throws IOException {
            return this.client.get(new GetRequest(index, Integer.toString(id)), RequestOptions.DEFAULT);
        }
    }

    private static class UpdatingEmitter
    implements ElasticsearchEmitter<Tuple2<Integer, String>> {
        private final String dataFieldName;
        private final String index;

        UpdatingEmitter(String index, String dataFieldName) {
            this.index = index;
            this.dataFieldName = dataFieldName;
        }

        public void emit(Tuple2<Integer, String> element, SinkWriter.Context context, RequestIndexer indexer) {
            HashMap<String, Object> document = new HashMap<String, Object>();
            document.put(this.dataFieldName, element.f1);
            char action = ((String)element.f1).charAt(0);
            String id = ((Integer)element.f0).toString();
            switch (action) {
                case 'd': {
                    indexer.add(new DeleteRequest[]{new DeleteRequest(this.index).id(id)});
                    break;
                }
                case 'u': {
                    indexer.add(new UpdateRequest[]{((UpdateRequest)new UpdateRequest().index(this.index)).id(id).doc(document)});
                    break;
                }
                default: {
                    indexer.add(new IndexRequest[]{new IndexRequest(this.index).id(id).type("test-document-type").source(document)});
                }
            }
        }
    }

    private static class TestBulkProcessorBuilderFactory
    implements BulkProcessorBuilderFactory {
        private TestBulkProcessorBuilderFactory() {
        }

        public BulkProcessor.Builder apply(final RestHighLevelClient client, BulkProcessorConfig bulkProcessorConfig, BulkProcessor.Listener listener) {
            BackoffPolicy backoffPolicy;
            BulkProcessor.Builder builder = BulkProcessor.builder((BiConsumer)new BulkRequestConsumerFactory(){

                public void accept(BulkRequest bulkRequest, ActionListener<BulkResponse> bulkResponseActionListener) {
                    client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
                }
            }, (BulkProcessor.Listener)listener);
            if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
                builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
            }
            if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
                builder.setBulkSize(new ByteSizeValue((long)bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
            }
            if (bulkProcessorConfig.getBulkFlushInterval() != -1L) {
                builder.setFlushInterval(new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
            }
            TimeValue backoffDelay = new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
            int maxRetryCount = bulkProcessorConfig.getBulkFlushBackoffRetries();
            switch (bulkProcessorConfig.getFlushBackoffType()) {
                case CONSTANT: {
                    backoffPolicy = BackoffPolicy.constantBackoff((TimeValue)backoffDelay, (int)maxRetryCount);
                    break;
                }
                case EXPONENTIAL: {
                    backoffPolicy = BackoffPolicy.exponentialBackoff((TimeValue)backoffDelay, (int)maxRetryCount);
                    break;
                }
                case NONE: {
                    backoffPolicy = BackoffPolicy.noBackoff();
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Received unknown backoff policy type " + bulkProcessorConfig.getFlushBackoffType());
                }
            }
            builder.setBackoffPolicy(backoffPolicy);
            return builder;
        }
    }
}

