package org.apache.flink.streaming.connectors.opensearch;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.opensearch.OpensearchSink;
import org.apache.flink.streaming.connectors.opensearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.bootstrap.HttpServer;
import org.apache.http.impl.bootstrap.ServerBootstrap;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.opensearch.action.ActionRequest;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkItemResponse;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.Requests;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.index.shard.ShardId;

/* loaded from: input_file:org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest.class */
public class OpensearchSinkTest {
    private HttpServer server;
    private final Deque<Consumer<HttpResponse>> responses = new ConcurrentLinkedDeque();
    private final Lock lock = new ReentrantLock();
    private final Condition flushed = this.lock.newCondition();

    /* loaded from: input_file:org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest$DummyRetryFailureHandler.class */
    private static class DummyRetryFailureHandler implements ActionRequestFailureHandler {
        private static final long serialVersionUID = 5400023700099200745L;

        private DummyRetryFailureHandler() {
        }

        public void onFailure(ActionRequest actionRequest, Throwable th, int i, RequestIndexer requestIndexer) throws Throwable {
            requestIndexer.add(new ActionRequest[]{actionRequest});
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest$SimpleClosableSinkFunction.class */
    private static class SimpleClosableSinkFunction<String> implements OpensearchSinkFunction<String> {
        private static final long serialVersionUID = 1872065917794006848L;
        private boolean openCalled;
        private boolean closeCalled;

        private SimpleClosableSinkFunction() {
        }

        public void open() {
            this.openCalled = true;
        }

        public void close() {
            this.closeCalled = true;
        }

        public void process(String string, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/opensearch/OpensearchSinkTest$SimpleSinkFunction.class */
    private static class SimpleSinkFunction<String> implements OpensearchSinkFunction<String> {
        private static final long serialVersionUID = -176739293659135148L;

        private SimpleSinkFunction() {
        }

        public void process(String string, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
            HashMap hashMap = new HashMap();
            hashMap.put("data", string);
            requestIndexer.add(new IndexRequest[]{Requests.indexRequest().index("index").type("type").id("id").source(hashMap)});
        }
    }

    @BeforeEach
    public void setUp() throws IOException {
        this.server = ServerBootstrap.bootstrap().setHandlerMapper(httpRequest -> {
            String method = httpRequest.getRequestLine().getMethod();
            if (method.equalsIgnoreCase("HEAD")) {
                return (httpRequest, httpResponse, httpContext) -> {
                    httpResponse.setStatusCode(200);
                };
            }
            if (method.equalsIgnoreCase("POST")) {
                return (httpRequest2, httpResponse2, httpContext2) -> {
                    this.lock.lock();
                    try {
                        this.responses.poll().accept(httpResponse2);
                        this.flushed.signalAll();
                        this.lock.unlock();
                    } catch (Throwable th) {
                        this.lock.unlock();
                        throw th;
                    }
                };
            }
            return null;
        }).create();
        this.server.start();
    }

    @AfterEach
    public void tearDown() {
        this.server.stop();
        this.server = null;
        this.responses.clear();
    }

    @Test
    public void testItemFailureRethrownOnInvoke() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushMaxActions(1);
        builder.setFailureHandler(new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("test", "_doc", "1", new Exception("artificial failure for record")))));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        Assertions.assertThatThrownBy(() -> {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("next msg"));
        }).getCause().hasMessageContaining("artificial failure for record");
    }

    @Test
    public void testItemFailureRethrownOnCheckpoint() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushMaxActions(1);
        builder.setFailureHandler(new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("test", "_doc", "1", new Exception("artificial failure for record")))));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        Assertions.assertThatThrownBy(() -> {
            oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
        }).getCause().getCause().hasMessageContaining("artificial failure for record");
    }

    @Timeout(5)
    @Test
    public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushInterval(1000L);
        builder.setFailureHandler(new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new IndexResponse(new ShardId("test", "-", 0), "_doc", "1", 0L, 0L, 1L, true))));
        this.responses.add(createResponse(new BulkItemResponse(2, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("test", "_doc", "2", new Exception("artificial failure for record")))));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        awaitForFlushToFinish();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.1
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        awaitForFlushToFinish();
        Objects.requireNonNull(checkedThread);
        Assertions.assertThatThrownBy(checkedThread::sync).getCause().getCause().hasMessageContaining("artificial failure for record");
    }

    @Test
    public void testBulkFailureRethrownOnInvoke() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushMaxActions(1);
        builder.setFailureHandler(new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(httpResponse -> {
            httpResponse.setStatusCode(500);
        });
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        Assertions.assertThatThrownBy(() -> {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord("next msg"));
        }).getCause().hasMessageContaining("Unable to parse response body");
    }

    @Test
    public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushMaxActions(1);
        builder.setFailureHandler(new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(httpResponse -> {
            httpResponse.setStatusCode(500);
        });
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        Assertions.assertThatThrownBy(() -> {
            oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
        }).getCause().getCause().hasMessageContaining("Unable to parse response body");
    }

    @Timeout(5)
    @Test
    public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushInterval(500L);
        builder.setFailureHandler(new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(builder.build()));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new IndexResponse(new ShardId("test", "-", 0), "_doc", "1", 0L, 0L, 1L, true))));
        this.responses.add(httpResponse -> {
            httpResponse.setStatusCode(500);
        });
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        awaitForFlushToFinish();
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-2"));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-3"));
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.2
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        awaitForFlushToFinish();
        Objects.requireNonNull(checkedThread);
        Assertions.assertThatThrownBy(checkedThread::sync).getCause().getCause().hasMessageContaining("Unable to parse response body");
    }

    @Timeout(5)
    @Test
    public void testAtLeastOnceSink() throws Throwable {
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction());
        builder.setBulkFlushInterval(500L);
        builder.setFailureHandler(new DummyRetryFailureHandler());
        OpensearchSink build = builder.build();
        final OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(build));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("test", "_doc", "1", new Exception("artificial failure for record")))));
        this.responses.add(createResponse(new BulkItemResponse(2, DocWriteRequest.OpType.INDEX, new IndexResponse(new ShardId("test", "-", 0), "_doc", "2", 0L, 0L, 1L, true))));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg"));
        Assertions.assertThat(build.getNumPendingRequests()).isEqualTo(1L);
        CheckedThread checkedThread = new CheckedThread() { // from class: org.apache.flink.streaming.connectors.opensearch.OpensearchSinkTest.3
            public void go() throws Exception {
                oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
            }
        };
        checkedThread.start();
        awaitForFlushToFinish();
        awaitForCondition(() -> {
            return Boolean.valueOf(build.getNumPendingRequests() == 1);
        });
        awaitForFlushToFinish();
        checkedThread.sync();
        oneInputStreamOperatorTestHarness.close();
    }

    @Timeout(5)
    @Test
    public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
        OpensearchSink build = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), new SimpleSinkFunction()).build();
        build.disableFlushOnCheckpoint();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new StreamSink(build));
        oneInputStreamOperatorTestHarness.open();
        this.responses.add(createResponse(new BulkItemResponse(1, DocWriteRequest.OpType.INDEX, new BulkItemResponse.Failure("test", "_doc", "1", new Exception("artificial failure for record")))));
        oneInputStreamOperatorTestHarness.processElement(new StreamRecord("msg-1"));
        oneInputStreamOperatorTestHarness.snapshot(1L, 1000L);
        Assertions.assertThatThrownBy(() -> {
            oneInputStreamOperatorTestHarness.close();
        }).getCause().hasMessageContaining("artificial failure for record");
    }

    @Test
    public void testOpenAndCloseInSinkFunction() throws Exception {
        SimpleClosableSinkFunction simpleClosableSinkFunction = new SimpleClosableSinkFunction();
        OpensearchSink.Builder builder = new OpensearchSink.Builder(Arrays.asList(new HttpHost("localhost", this.server.getLocalPort())), simpleClosableSinkFunction);
        builder.setFailureHandler(new DummyRetryFailureHandler());
        OpensearchSink build = builder.build();
        build.setRuntimeContext(new MockStreamingRuntimeContext(false, 1, 0));
        build.open(new Configuration());
        build.close();
        Assertions.assertThat(simpleClosableSinkFunction.openCalled).isTrue();
        Assertions.assertThat(simpleClosableSinkFunction.closeCalled).isTrue();
    }

    private static Consumer<HttpResponse> createResponse(BulkItemResponse bulkItemResponse) {
        return httpResponse -> {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                try {
                    httpResponse.setStatusCode(200);
                    XContentBuilder xContentBuilder = new XContentBuilder(JsonXContent.jsonXContent, byteArrayOutputStream);
                    try {
                        new BulkResponse(new BulkItemResponse[]{bulkItemResponse}, 200L).toXContent(xContentBuilder, ToXContent.EMPTY_PARAMS);
                        xContentBuilder.close();
                        httpResponse.setEntity(new ByteArrayEntity(byteArrayOutputStream.toByteArray(), ContentType.APPLICATION_JSON));
                        byteArrayOutputStream.close();
                    } catch (Throwable th) {
                        try {
                            xContentBuilder.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } finally {
                }
            } catch (IOException e) {
                httpResponse.setStatusCode(500);
            }
        };
    }

    private static void awaitForCondition(Supplier<Boolean> supplier) throws InterruptedException {
        while (!supplier.get().booleanValue()) {
            Thread.sleep(10L);
        }
    }

    private void awaitForFlushToFinish() throws InterruptedException {
        this.lock.lock();
        try {
            this.flushed.await();
        } finally {
            this.lock.unlock();
        }
    }
}
