/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ElasticsearchSinkBaseTest {
    @Test
    public void testCollectionArgumentNotModified() {
        HashMap<String, String> userConfig = new HashMap<String, String>();
        userConfig.put("bulk.flush.backoff.delay", "1");
        userConfig.put("bulk.flush.backoff.enable", "true");
        userConfig.put("bulk.flush.backoff.retries", "1");
        userConfig.put("bulk.flush.backoff.type", "CONSTANT");
        userConfig.put("bulk.flush.interval.ms", "1");
        userConfig.put("bulk.flush.max.actions", "1");
        userConfig.put("bulk.flush.max.size.mb", "1");
        new DummyElasticsearchSink(Collections.unmodifiableMap(userConfig), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
    }

    @Test
    public void testItemFailureRethrownOnInvoke() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        try {
            testHarness.processElement(new StreamRecord((Object)"next msg"));
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("artificial failure for record"));
            return;
        }
        Assert.fail();
    }

    @Test
    public void testItemFailureRethrownOnCheckpoint() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        try {
            testHarness.snapshot(1L, 1000L);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("artificial failure for record"));
            return;
        }
        Assert.fail();
    }

    @Test(timeout=5000L)
    public void testItemFailureRethrownOnCheckpointAfterFlush() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        ArrayList<Exception> mockResponsesList = new ArrayList<Exception>(2);
        mockResponsesList.add(null);
        mockResponsesList.add(new Exception("artificial failure for record"));
        sink.setMockItemFailuresListForNextBulkItemResponses(mockResponsesList);
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        testHarness.processElement(new StreamRecord((Object)"msg-2"));
        testHarness.processElement(new StreamRecord((Object)"msg-3"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)3))).add((IndexRequest)Matchers.any(IndexRequest.class));
        CheckedThread snapshotThread = new CheckedThread(){

            public void go() throws Exception {
                testHarness.snapshot(1L, 1000L);
            }
        };
        snapshotThread.start();
        while (snapshotThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        sink.continueFlush();
        try {
            snapshotThread.sync();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("artificial failure for record"));
            return;
        }
        Assert.fail();
    }

    @Test
    public void testBulkFailureRethrownOnInvoke() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        try {
            testHarness.processElement(new StreamRecord((Object)"next msg"));
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getMessage().contains("artificial failure for bulk request"));
            return;
        }
        Assert.fail();
    }

    @Test
    public void testBulkFailureRethrownOnCheckpoint() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        try {
            testHarness.snapshot(1L, 1000L);
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
            return;
        }
        Assert.fail();
    }

    @Test(timeout=5000L)
    public void testBulkFailureRethrownOnOnCheckpointAfterFlush() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), (ActionRequestFailureHandler)new NoOpFailureHandler());
        final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(null));
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        sink.manualBulkRequestWithAllPendingRequests();
        testHarness.processElement(new StreamRecord((Object)"msg-2"));
        testHarness.processElement(new StreamRecord((Object)"msg-3"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)3))).add((IndexRequest)Matchers.any(IndexRequest.class));
        CheckedThread snapshotThread = new CheckedThread(){

            public void go() throws Exception {
                testHarness.snapshot(1L, 1000L);
            }
        };
        snapshotThread.start();
        while (snapshotThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        sink.setFailNextBulkRequestCompletely(new Exception("artificial failure for bulk request"));
        sink.continueFlush();
        try {
            snapshotThread.sync();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)e.getCause().getCause().getMessage().contains("artificial failure for bulk request"));
            return;
        }
        Assert.fail();
    }

    @Test(timeout=5000L)
    public void testAtLeastOnceSink() throws Throwable {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), new DummyRetryFailureHandler());
        final OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        testHarness.processElement(new StreamRecord((Object)"msg"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        CheckedThread snapshotThread = new CheckedThread(){

            public void go() throws Exception {
                testHarness.snapshot(1L, 1000L);
            }
        };
        snapshotThread.start();
        while (snapshotThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        sink.continueFlush();
        while (snapshotThread.getState() != Thread.State.WAITING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)1L, (long)sink.getNumPendingRequests());
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(null));
        sink.continueFlush();
        snapshotThread.sync();
        testHarness.close();
    }

    @Test(timeout=5000L)
    public void testDoesNotWaitForPendingRequestsIfFlushingDisabled() throws Exception {
        DummyElasticsearchSink sink = new DummyElasticsearchSink(new HashMap<String, String>(), new SimpleSinkFunction(), new DummyRetryFailureHandler());
        sink.disableFlushOnCheckpoint();
        OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new StreamSink(sink));
        testHarness.open();
        sink.setMockItemFailuresListForNextBulkItemResponses(Collections.singletonList(new Exception("artificial failure for record")));
        testHarness.processElement(new StreamRecord((Object)"msg-1"));
        ((BulkProcessor)Mockito.verify((Object)sink.getMockBulkProcessor(), (VerificationMode)Mockito.times((int)1))).add((IndexRequest)Matchers.any(IndexRequest.class));
        testHarness.snapshot(1L, 1000L);
        testHarness.close();
    }

    private static class DummyRetryFailureHandler
    implements ActionRequestFailureHandler {
        private static final long serialVersionUID = 5400023700099200745L;

        private DummyRetryFailureHandler() {
        }

        public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
            indexer.add(new ActionRequest[]{action});
        }
    }

    private static class SimpleSinkFunction<String>
    implements ElasticsearchSinkFunction<String> {
        private static final long serialVersionUID = -176739293659135148L;

        private SimpleSinkFunction() {
        }

        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
            HashMap<String, String> json = new HashMap<String, String>();
            json.put("data", element);
            indexer.add(new IndexRequest[]{((IndexRequest)Requests.indexRequest().index("index")).type("type").id("id").source(json)});
        }
    }

    private static class DummyElasticsearchApiCallBridge
    implements ElasticsearchApiCallBridge<Client> {
        private static final long serialVersionUID = -4272760730959041699L;

        private DummyElasticsearchApiCallBridge() {
        }

        public Client createClient(Map<String, String> clientConfig) {
            return (Client)Mockito.mock(Client.class);
        }

        public BulkProcessor.Builder createBulkProcessorBuilder(Client client, BulkProcessor.Listener listener) {
            return null;
        }

        @Nullable
        public Throwable extractFailureCauseFromBulkItemResponse(BulkItemResponse bulkItemResponse) {
            if (bulkItemResponse.isFailed()) {
                return new Exception(bulkItemResponse.getFailure().getMessage());
            }
            return null;
        }

        public void configureBulkProcessorBackoff(BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy) {
        }
    }

    private static class DummyElasticsearchSink<T>
    extends ElasticsearchSinkBase<T, Client> {
        private static final long serialVersionUID = 5051907841570096991L;
        private transient BulkProcessor mockBulkProcessor;
        private transient BulkRequest nextBulkRequest = new BulkRequest();
        private transient MultiShotLatch flushLatch = new MultiShotLatch();
        private List<? extends Throwable> mockItemFailuresList;
        private Throwable nextBulkFailure;

        public DummyElasticsearchSink(Map<String, String> userConfig, ElasticsearchSinkFunction<T> sinkFunction, ActionRequestFailureHandler failureHandler) {
            super((ElasticsearchApiCallBridge)new DummyElasticsearchApiCallBridge(), userConfig, sinkFunction, failureHandler);
        }

        public void manualBulkRequestWithAllPendingRequests() {
            this.flushLatch.trigger();
            this.mockBulkProcessor.flush();
        }

        public void continueFlush() {
            this.flushLatch.trigger();
        }

        public void setMockItemFailuresListForNextBulkItemResponses(List<? extends Throwable> mockItemFailuresList) {
            this.mockItemFailuresList = mockItemFailuresList;
        }

        public void setFailNextBulkRequestCompletely(Throwable failure) {
            this.nextBulkFailure = failure;
        }

        public BulkProcessor getMockBulkProcessor() {
            return this.mockBulkProcessor;
        }

        protected BulkProcessor buildBulkProcessor(final BulkProcessor.Listener listener) {
            this.mockBulkProcessor = (BulkProcessor)Mockito.mock(BulkProcessor.class);
            Mockito.when((Object)this.mockBulkProcessor.add((IndexRequest)Matchers.any(IndexRequest.class))).thenAnswer((Answer)new Answer<Object>(){

                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    nextBulkRequest.add((IndexRequest)invocationOnMock.getArgument(0));
                    return null;
                }
            });
            ((BulkProcessor)Mockito.doAnswer((Answer)new Answer(){

                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    while (nextBulkRequest.numberOfActions() > 0) {
                        flushLatch.await();
                        BulkRequest currentBulkRequest = nextBulkRequest;
                        nextBulkRequest = new BulkRequest();
                        listener.beforeBulk(123L, currentBulkRequest);
                        if (nextBulkFailure == null) {
                            BulkItemResponse[] mockResponses = new BulkItemResponse[currentBulkRequest.requests().size()];
                            for (int i = 0; i < currentBulkRequest.requests().size(); ++i) {
                                Throwable mockItemFailure = (Throwable)mockItemFailuresList.get(i);
                                mockResponses[i] = mockItemFailure == null ? new BulkItemResponse(i, "opType", (ActionResponse)Mockito.mock(ActionResponse.class)) : new BulkItemResponse(i, "opType", new BulkItemResponse.Failure("index", "type", "id", mockItemFailure));
                            }
                            listener.afterBulk(123L, currentBulkRequest, new BulkResponse(mockResponses, 1000L));
                            continue;
                        }
                        listener.afterBulk(123L, currentBulkRequest, nextBulkFailure);
                    }
                    return null;
                }
            }).when((Object)this.mockBulkProcessor)).flush();
            return this.mockBulkProcessor;
        }
    }
}

