package org.apache.flink.connector.elasticsearch.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.elasticsearch.ElasticsearchUtil;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.assertj.core.api.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase.class */
class ElasticsearchWriterITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriterITCase.class);

    @Container
    private static final ElasticsearchContainer ES_CONTAINER = ElasticsearchUtil.createElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.10.2", LOG);
    private RestHighLevelClient client;
    private TestClientBase context;
    private MetricListener metricListener;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$connector$elasticsearch$sink$FlushBackoffType = new int[FlushBackoffType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$connector$elasticsearch$sink$FlushBackoffType[FlushBackoffType.CONSTANT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$elasticsearch$sink$FlushBackoffType[FlushBackoffType.EXPONENTIAL.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$connector$elasticsearch$sink$FlushBackoffType[FlushBackoffType.NONE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase$TestBulkProcessorBuilderFactory.class */
    public static class TestBulkProcessorBuilderFactory implements BulkProcessorBuilderFactory {
        private TestBulkProcessorBuilderFactory() {
        }

        public BulkProcessor.Builder apply(final RestHighLevelClient restHighLevelClient, BulkProcessorConfig bulkProcessorConfig, BulkProcessor.Listener listener) {
            BackoffPolicy noBackoff;
            BulkProcessor.Builder builder = BulkProcessor.builder(new BulkRequestConsumerFactory() { // from class: org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriterITCase.TestBulkProcessorBuilderFactory.1
                public void accept(BulkRequest bulkRequest, ActionListener<BulkResponse> actionListener) {
                    restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, actionListener);
                }
            }, listener);
            if (bulkProcessorConfig.getBulkFlushMaxActions() != -1) {
                builder.setBulkActions(bulkProcessorConfig.getBulkFlushMaxActions());
            }
            if (bulkProcessorConfig.getBulkFlushMaxMb() != -1) {
                builder.setBulkSize(new ByteSizeValue(bulkProcessorConfig.getBulkFlushMaxMb(), ByteSizeUnit.MB));
            }
            if (bulkProcessorConfig.getBulkFlushInterval() != -1) {
                builder.setFlushInterval(new TimeValue(bulkProcessorConfig.getBulkFlushInterval()));
            }
            TimeValue timeValue = new TimeValue(bulkProcessorConfig.getBulkFlushBackOffDelay());
            int bulkFlushBackoffRetries = bulkProcessorConfig.getBulkFlushBackoffRetries();
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$connector$elasticsearch$sink$FlushBackoffType[bulkProcessorConfig.getFlushBackoffType().ordinal()]) {
                case 1:
                    noBackoff = BackoffPolicy.constantBackoff(timeValue, bulkFlushBackoffRetries);
                    break;
                case 2:
                    noBackoff = BackoffPolicy.exponentialBackoff(timeValue, bulkFlushBackoffRetries);
                    break;
                case 3:
                    noBackoff = BackoffPolicy.noBackoff();
                    break;
                default:
                    throw new IllegalArgumentException("Received unknown backoff policy type " + bulkProcessorConfig.getFlushBackoffType());
            }
            builder.setBackoffPolicy(noBackoff);
            return builder;
        }

        /* synthetic */ TestBulkProcessorBuilderFactory(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase$TestClient.class */
    private static class TestClient extends TestClientBase {
        TestClient(RestHighLevelClient restHighLevelClient) {
            super(restHighLevelClient);
        }

        @Override // org.apache.flink.connector.elasticsearch.sink.TestClientBase
        GetResponse getResponse(String str, int i) throws IOException {
            return this.client.get(new GetRequest(str, Integer.toString(i)), RequestOptions.DEFAULT);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase$TestMailbox.class */
    public static class TestMailbox implements MailboxExecutor {
        private TestMailbox() {
        }

        public void execute(ThrowingRunnable<? extends Exception> throwingRunnable, String str, Object... objArr) {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                throw new RuntimeException("Unexpected error", e);
            }
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
            Thread.sleep(100L);
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }

        /* synthetic */ TestMailbox(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/elasticsearch/sink/ElasticsearchWriterITCase$UpdatingEmitter.class */
    public static class UpdatingEmitter implements ElasticsearchEmitter<Tuple2<Integer, String>> {
        private final String dataFieldName;
        private final String index;

        UpdatingEmitter(String str, String str2) {
            this.index = str;
            this.dataFieldName = str2;
        }

        public void emit(Tuple2<Integer, String> tuple2, SinkWriter.Context context, RequestIndexer requestIndexer) {
            HashMap hashMap = new HashMap();
            hashMap.put(this.dataFieldName, tuple2.f1);
            char charAt = ((String) tuple2.f1).charAt(0);
            String num = ((Integer) tuple2.f0).toString();
            switch (charAt) {
                case 'd':
                    requestIndexer.add(new DeleteRequest[]{new DeleteRequest(this.index).id(num)});
                    return;
                case 'u':
                    requestIndexer.add(new UpdateRequest[]{new UpdateRequest().index(this.index).id(num).doc(hashMap)});
                    return;
                default:
                    requestIndexer.add(new IndexRequest[]{new IndexRequest(this.index).id(num).type("test-document-type").source(hashMap)});
                    return;
            }
        }
    }

    ElasticsearchWriterITCase() {
    }

    @BeforeEach
    void setUp() {
        this.metricListener = new MetricListener();
        this.client = new RestHighLevelClient(RestClient.builder(new HttpHost[]{HttpHost.create(ES_CONTAINER.getHttpHostAddress())}));
        this.context = new TestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @Test
    void testWriteOnBulkFlush() throws Exception {
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-without-checkpoint", false, new BulkProcessorConfig(5, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            try {
                createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(3, TestClientBase.buildMessage(3)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(4, TestClientBase.buildMessage(4)), (SinkWriter.Context) null);
                createWriter.flush(false);
                this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4);
                createWriter.write(Tuple2.of(5, "test-5"), (SinkWriter.Context) null);
                this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5);
                createWriter.write(Tuple2.of(6, "test-6"), (SinkWriter.Context) null);
                this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 6);
                createWriter.blockingFlushAllActions();
                this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5, 6);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testWriteOnBulkIntervalFlush() throws Exception {
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-with-interval", false, new BulkProcessorConfig(-1, -1, 1000L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(3, TestClientBase.buildMessage(3)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(4, TestClientBase.buildMessage(4)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createWriter.close();
                }
            }
            this.context.assertThatIdsAreWritten("test-bulk-flush-with-interval", 1, 2, 3, 4);
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWriteOnCheckpoint() throws Exception {
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-with-checkpoint", true, new BulkProcessorConfig(-1, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(3, TestClientBase.buildMessage(3)), (SinkWriter.Context) null);
            this.context.assertThatIdsAreNotWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
            createWriter.flush(false);
            this.context.assertThatIdsAreWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testIncrementByteOutMetric() throws Exception {
        InternalOperatorIOMetricGroup iOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-inc-byte-out", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L), InternalSinkWriterMetricGroup.mock(this.metricListener.getMetricGroup(), iOMetricGroup));
        Throwable th = null;
        try {
            Counter numBytesOutCounter = iOMetricGroup.getNumBytesOutCounter();
            Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(0L);
            createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            long count = numBytesOutCounter.getCount();
            Assertions.assertThat(count).isGreaterThan(0L);
            createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            Assertions.assertThat(numBytesOutCounter.getCount()).isGreaterThan(count);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testIncrementRecordsSendMetric() throws Exception {
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-inc-records-send", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            Optional counter = this.metricListener.getCounter(new String[]{"numRecordsSend"});
            createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(1, "u" + TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(1, "d" + TestClientBase.buildMessage(3)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            Assertions.assertThat(counter).isPresent();
            Assertions.assertThat(((Counter) counter.get()).getCount()).isEqualTo(3L);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCurrentSendTime() throws Exception {
        ElasticsearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-current-send-time", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            try {
                Optional gauge = this.metricListener.getGauge(new String[]{"currentSendTime"});
                createWriter.write(Tuple2.of(1, TestClientBase.buildMessage(1)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(2, TestClientBase.buildMessage(2)), (SinkWriter.Context) null);
                createWriter.blockingFlushAllActions();
                Assertions.assertThat(gauge).isPresent();
                Assertions.assertThat((Long) ((Gauge) gauge.get()).getValue()).isGreaterThan(0L);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(String str, boolean z, BulkProcessorConfig bulkProcessorConfig) {
        return createWriter(str, z, bulkProcessorConfig, InternalSinkWriterMetricGroup.mock(this.metricListener.getMetricGroup()));
    }

    private ElasticsearchWriter<Tuple2<Integer, String>> createWriter(String str, boolean z, BulkProcessorConfig bulkProcessorConfig, SinkWriterMetricGroup sinkWriterMetricGroup) {
        return new ElasticsearchWriter<>(Collections.singletonList(HttpHost.create(ES_CONTAINER.getHttpHostAddress())), new UpdatingEmitter(str, this.context.getDataFieldName()), z, bulkProcessorConfig, new TestBulkProcessorBuilderFactory(null), new NetworkClientConfig((String) null, (String) null, (String) null, (Integer) null, (Integer) null, (Integer) null), sinkWriterMetricGroup, new TestMailbox(null));
    }
}
