package org.apache.flink.connector.opensearch.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.opensearch.OpensearchUtil;
import org.apache.flink.connector.opensearch.test.DockerImageVersions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.groups.InternalOperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.http.HttpHost;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.testcontainers.OpensearchContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@ExtendWith({TestLoggerExtension.class})
/* loaded from: input_file:org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.class */
class OpensearchWriterITCase {
    private static final Logger LOG = LoggerFactory.getLogger(OpensearchWriterITCase.class);

    @Container
    private static final OpensearchContainer OS_CONTAINER = OpensearchUtil.createOpensearchContainer(DockerImageVersions.OPENSEARCH_1, LOG);
    private RestHighLevelClient client;
    private OpensearchTestClient context;
    private MetricListener metricListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase$TestMailbox.class */
    public static class TestMailbox implements MailboxExecutor {
        private TestMailbox() {
        }

        public void execute(ThrowingRunnable<? extends Exception> throwingRunnable, String str, Object... objArr) {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
                throw new RuntimeException("Unexpected error", e);
            }
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
            Thread.sleep(100L);
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase$UpdatingEmitter.class */
    public static class UpdatingEmitter implements OpensearchEmitter<Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1;
        private final String dataFieldName;
        private final String index;

        UpdatingEmitter(String str, String str2) {
            this.index = str;
            this.dataFieldName = str2;
        }

        public void emit(Tuple2<Integer, String> tuple2, SinkWriter.Context context, RequestIndexer requestIndexer) {
            HashMap hashMap = new HashMap();
            hashMap.put(this.dataFieldName, tuple2.f1);
            char charAt = ((String) tuple2.f1).charAt(0);
            String num = ((Integer) tuple2.f0).toString();
            switch (charAt) {
                case 'd':
                    requestIndexer.add(new DeleteRequest[]{new DeleteRequest(this.index).id(num)});
                    return;
                case 'u':
                    requestIndexer.add(new UpdateRequest[]{new UpdateRequest().index(this.index).id(num).doc(hashMap)});
                    return;
                default:
                    requestIndexer.add(new IndexRequest[]{new IndexRequest(this.index).id(num).source(hashMap)});
                    return;
            }
        }
    }

    OpensearchWriterITCase() {
    }

    @BeforeEach
    void setUp() {
        this.metricListener = new MetricListener();
        this.client = OpensearchUtil.createClient(OS_CONTAINER);
        this.context = new OpensearchTestClient(this.client);
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.client != null) {
            this.client.close();
        }
    }

    @Test
    void testWriteOnBulkFlush() throws Exception {
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-without-checkpoint", false, new BulkProcessorConfig(5, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            try {
                createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(3, OpensearchTestClient.buildMessage(3)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(4, OpensearchTestClient.buildMessage(4)), (SinkWriter.Context) null);
                createWriter.flush(false);
                this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4);
                createWriter.write(Tuple2.of(5, "test-5"), (SinkWriter.Context) null);
                this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5);
                createWriter.write(Tuple2.of(6, "test-6"), (SinkWriter.Context) null);
                this.context.assertThatIdsAreNotWritten("test-bulk-flush-without-checkpoint", 6);
                createWriter.blockingFlushAllActions();
                this.context.assertThatIdsAreWritten("test-bulk-flush-without-checkpoint", 1, 2, 3, 4, 5, 6);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testWriteOnBulkIntervalFlush() throws Exception {
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-with-interval", false, new BulkProcessorConfig(-1, -1, 1000L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(3, OpensearchTestClient.buildMessage(3)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(4, OpensearchTestClient.buildMessage(4)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    createWriter.close();
                }
            }
            this.context.assertThatIdsAreWritten("test-bulk-flush-with-interval", 1, 2, 3, 4);
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWriteOnCheckpoint() throws Exception {
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-bulk-flush-with-checkpoint", true, new BulkProcessorConfig(-1, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(3, OpensearchTestClient.buildMessage(3)), (SinkWriter.Context) null);
            this.context.assertThatIdsAreNotWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
            createWriter.flush(false);
            this.context.assertThatIdsAreWritten("test-bulk-flush-with-checkpoint", 1, 2, 3);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testIncrementByteOutMetric() throws Exception {
        InternalOperatorIOMetricGroup iOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-inc-byte-out", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L), InternalSinkWriterMetricGroup.mock(this.metricListener.getMetricGroup(), iOMetricGroup));
        Throwable th = null;
        try {
            Counter numBytesOutCounter = iOMetricGroup.getNumBytesOutCounter();
            Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(0L);
            createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            long count = numBytesOutCounter.getCount();
            Assertions.assertThat(count).isGreaterThan(0L);
            createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            Assertions.assertThat(numBytesOutCounter.getCount()).isGreaterThan(count);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testIncrementRecordsSendMetric() throws Exception {
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-inc-records-send", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            Optional counter = this.metricListener.getCounter(new String[]{"numRecordsSend"});
            createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(1, "u" + OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
            createWriter.write(Tuple2.of(1, "d" + OpensearchTestClient.buildMessage(3)), (SinkWriter.Context) null);
            createWriter.blockingFlushAllActions();
            Assertions.assertThat(counter).isPresent();
            Assertions.assertThat(((Counter) counter.get()).getCount()).isEqualTo(3L);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testCurrentSendTime() throws Exception {
        OpensearchWriter<Tuple2<Integer, String>> createWriter = createWriter("test-current-send-time", false, new BulkProcessorConfig(2, -1, -1L, FlushBackoffType.NONE, 0, 0L));
        Throwable th = null;
        try {
            try {
                Optional gauge = this.metricListener.getGauge(new String[]{"currentSendTime"});
                createWriter.write(Tuple2.of(1, OpensearchTestClient.buildMessage(1)), (SinkWriter.Context) null);
                createWriter.write(Tuple2.of(2, OpensearchTestClient.buildMessage(2)), (SinkWriter.Context) null);
                createWriter.blockingFlushAllActions();
                Assertions.assertThat(gauge).isPresent();
                Assertions.assertThat((Long) ((Gauge) gauge.get()).getValue()).isGreaterThan(0L);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    private OpensearchWriter<Tuple2<Integer, String>> createWriter(String str, boolean z, BulkProcessorConfig bulkProcessorConfig) {
        return createWriter(str, z, bulkProcessorConfig, InternalSinkWriterMetricGroup.mock(this.metricListener.getMetricGroup()));
    }

    private OpensearchWriter<Tuple2<Integer, String>> createWriter(String str, boolean z, BulkProcessorConfig bulkProcessorConfig, SinkWriterMetricGroup sinkWriterMetricGroup) {
        return new OpensearchWriter<>(Collections.singletonList(HttpHost.create(OS_CONTAINER.getHttpHostAddress())), new UpdatingEmitter(str, this.context.getDataFieldName()), z, bulkProcessorConfig, new NetworkClientConfig(OS_CONTAINER.getUsername(), OS_CONTAINER.getPassword(), (String) null, (Integer) null, (Integer) null, (Integer) null, true), sinkWriterMetricGroup, new TestMailbox());
    }
}
