/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.druid.client.BrokerViewOfCoordinatorConfig;
import org.apache.druid.client.DirectDruidClient;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.QueryableDruidServer;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.HistoricalFilter;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.client.selector.ServerSelectorStrategy;
import org.apache.druid.client.selector.TierSelectorStrategy;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.CloneQueryMode;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.Result;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordination.TestCoordinatorClient;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class DirectDruidClientTest {
    @ClassRule
    public static QueryStackTests.Junit4ConglomerateRule conglomerateRule = new QueryStackTests.Junit4ConglomerateRule();
    private final String hostName = "localhost:8080";
    private final DataSegment dataSegment = new DataSegment("test", Intervals.of((String)"2013-01-01/2013-01-02"), DateTimes.of((String)"2013-01-01").toString(), new HashMap(), new ArrayList(), new ArrayList(), (ShardSpec)NoneShardSpec.instance(), Integer.valueOf(0), 0L);
    private ServerSelector serverSelector;
    private HttpClient httpClient;
    private DirectDruidClient client;
    private QueryableDruidServer queryableDruidServer;
    private ScheduledExecutorService queryCancellationExecutor;

    @Before
    public void setup() {
        BrokerViewOfCoordinatorConfig filter = new BrokerViewOfCoordinatorConfig((CoordinatorClient)new TestCoordinatorClient());
        filter.start();
        this.httpClient = (HttpClient)EasyMock.createMock(HttpClient.class);
        this.serverSelector = new ServerSelector(this.dataSegment, (TierSelectorStrategy)new HighestPriorityTierSelectorStrategy((ServerSelectorStrategy)new ConnectionCountServerSelectorStrategy()), (HistoricalFilter)filter);
        this.queryCancellationExecutor = Execs.scheduledSingleThreaded((String)"query-cancellation-executor");
        this.client = new DirectDruidClient(conglomerateRule.getConglomerate(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, (ObjectMapper)new DefaultObjectMapper(), this.httpClient, "http", "localhost:8080", (ServiceEmitter)new NoopServiceEmitter(), this.queryCancellationExecutor);
        this.queryableDruidServer = new QueryableDruidServer(new DruidServer("test1", "localhost", null, 0L, ServerType.HISTORICAL, "_default_tier", 0), (QueryRunner)this.client);
        this.serverSelector.addServerAndUpdateSegment(this.queryableDruidServer, this.serverSelector.getSegment());
    }

    @After
    public void teardown() throws InterruptedException {
        this.queryCancellationExecutor.shutdown();
        this.queryCancellationExecutor.awaitTermination(1L, TimeUnit.SECONDS);
    }

    @Test
    public void testRun() throws Exception {
        URL url = new URL(StringUtils.format((String)"http://%s/druid/v2/", (Object[])new Object[]{"localhost:8080"}));
        SettableFuture futureResult = SettableFuture.create();
        Capture capturedRequest = EasyMock.newCapture();
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)futureResult).times(1);
        SettableFuture futureException = SettableFuture.create();
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)futureException).times(1);
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)SettableFuture.create()).atLeastOnce();
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        DirectDruidClient client2 = new DirectDruidClient(conglomerateRule.getConglomerate(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, (ObjectMapper)new DefaultObjectMapper(), this.httpClient, "http", "foo2", (ServiceEmitter)new NoopServiceEmitter(), this.queryCancellationExecutor);
        QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(new DruidServer("test1", "localhost", null, 0L, ServerType.HISTORICAL, "_default_tier", 0), (QueryRunner)client2);
        this.serverSelector.addServerAndUpdateSegment(queryableDruidServer2, this.serverSelector.getSegment());
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)Long.MAX_VALUE));
        Sequence s1 = this.client.run(QueryPlus.wrap((Query)query));
        Assert.assertTrue((boolean)capturedRequest.hasCaptured());
        Assert.assertEquals((Object)url, (Object)((Request)capturedRequest.getValue()).getUrl());
        Assert.assertEquals((Object)HttpMethod.POST, (Object)((Request)capturedRequest.getValue()).getMethod());
        Assert.assertEquals((long)1L, (long)this.client.getNumOpenConnections());
        this.client.run(QueryPlus.wrap((Query)query));
        Assert.assertEquals((long)2L, (long)this.client.getNumOpenConnections());
        futureException.setException((Throwable)new ReadTimeoutException());
        Assert.assertEquals((long)1L, (long)this.client.getNumOpenConnections());
        this.client.run(QueryPlus.wrap((Query)query));
        this.client.run(QueryPlus.wrap((Query)query));
        this.client.run(QueryPlus.wrap((Query)query));
        Assert.assertTrue((this.client.getNumOpenConnections() == 4 ? 1 : 0) != 0);
        futureResult.set((Object)new ByteArrayInputStream(StringUtils.toUtf8((String)"[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]")));
        List results = s1.toList();
        Assert.assertEquals((long)1L, (long)results.size());
        Assert.assertEquals((Object)DateTimes.of((String)"2014-01-01T01:02:03Z"), (Object)((Result)results.get(0)).getTimestamp());
        Assert.assertEquals((long)3L, (long)this.client.getNumOpenConnections());
        client2.run(QueryPlus.wrap((Query)query));
        client2.run(QueryPlus.wrap((Query)query));
        Assert.assertEquals((long)2L, (long)client2.getNumOpenConnections());
        Assert.assertEquals((Object)this.serverSelector.pick(null, CloneQueryMode.EXCLUDECLONES), (Object)queryableDruidServer2);
        EasyMock.verify((Object[])new Object[]{this.httpClient});
    }

    @Test
    public void testCancel() {
        Capture capturedRequest = EasyMock.newCapture();
        ListenableFuture cancelledFuture = Futures.immediateCancelledFuture();
        SettableFuture cancellationFuture = SettableFuture.create();
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)cancelledFuture).once();
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)cancellationFuture).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)Long.MAX_VALUE));
        cancellationFuture.set((Object)new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
        Sequence results = this.client.run(QueryPlus.wrap((Query)query));
        Assert.assertEquals((Object)HttpMethod.POST, (Object)((Request)capturedRequest.getValue()).getMethod());
        Assert.assertEquals((long)0L, (long)this.client.getNumOpenConnections());
        QueryInterruptedException exception = null;
        try {
            results.toList();
        }
        catch (QueryInterruptedException e) {
            exception = e;
        }
        Assert.assertNotNull((Object)((Object)exception));
        EasyMock.verify((Object[])new Object[]{this.httpClient});
    }

    @Test
    public void testQueryInterruptionExceptionLogMessage() {
        SettableFuture interruptionFuture = SettableFuture.create();
        Capture capturedRequest = EasyMock.newCapture();
        String hostName = "localhost:8080";
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)interruptionFuture).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)Long.MAX_VALUE));
        interruptionFuture.set((Object)new ByteArrayInputStream(StringUtils.toUtf8((String)"{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")));
        Sequence results = this.client.run(QueryPlus.wrap((Query)query));
        QueryInterruptedException actualException = null;
        try {
            results.toList();
        }
        catch (QueryInterruptedException e) {
            actualException = e;
        }
        Assert.assertNotNull((Object)((Object)actualException));
        Assert.assertEquals((Object)"testing1", (Object)actualException.getErrorCode());
        Assert.assertEquals((Object)"testing2", (Object)actualException.getMessage());
        Assert.assertEquals((Object)"localhost:8080", (Object)actualException.getHost());
        EasyMock.verify((Object[])new Object[]{this.httpClient});
    }

    @Test
    public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException {
        SettableFuture timeoutFuture = SettableFuture.create();
        Capture capturedRequest = EasyMock.newCapture();
        String queryId = "timeout-before-future";
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)timeoutFuture).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)(System.currentTimeMillis() + 250L), (Object)"queryId", (Object)"timeout-before-future"));
        Sequence results = this.client.run(QueryPlus.wrap((Query)query));
        PipedInputStream in = new PipedInputStream();
        PipedOutputStream out = new PipedOutputStream(in);
        timeoutFuture.set((Object)in);
        QueryTimeoutException actualException = null;
        try {
            out.write(StringUtils.toUtf8((String)"[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
            Thread.sleep(250L);
            out.write(StringUtils.toUtf8((String)"]"));
            out.close();
            results.toList();
        }
        catch (QueryTimeoutException e) {
            actualException = e;
        }
        Assert.assertNotNull((Object)((Object)actualException));
        Assert.assertEquals((Object)"Query timeout", (Object)actualException.getErrorCode());
        Assert.assertEquals((Object)"url[http://localhost:8080/druid/v2/] timed out", (Object)actualException.getMessage());
        Assert.assertEquals((Object)"localhost:8080", (Object)actualException.getHost());
        EasyMock.verify((Object[])new Object[]{this.httpClient});
    }

    @Test
    public void testQueryTimeoutFromFuture() {
        SettableFuture noFuture = SettableFuture.create();
        Capture capturedRequest = EasyMock.newCapture();
        String queryId = "never-ending-future";
        EasyMock.expect((Object)this.httpClient.go((Request)EasyMock.capture((Capture)capturedRequest), (HttpResponseHandler)EasyMock.anyObject(), (Duration)EasyMock.anyObject(Duration.class))).andReturn((Object)noFuture).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.httpClient});
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)(System.currentTimeMillis() + 500L), (Object)"queryId", (Object)"never-ending-future"));
        Sequence results = this.client.run(QueryPlus.wrap((Query)query));
        QueryTimeoutException actualException = null;
        try {
            results.toList();
        }
        catch (QueryTimeoutException e) {
            actualException = e;
        }
        Assert.assertNotNull((Object)((Object)actualException));
        Assert.assertEquals((Object)"Query timeout", (Object)actualException.getErrorCode());
        Assert.assertEquals((Object)StringUtils.format((String)"Query [%s] timed out!", (Object[])new Object[]{"never-ending-future"}), (Object)actualException.getMessage());
        Assert.assertEquals((Object)"localhost:8080", (Object)actualException.getHost());
        EasyMock.verify((Object[])new Object[]{this.httpClient});
    }

    @Test
    public void testConnectionCountAfterException() throws JsonProcessingException {
        ObjectMapper mockObjectMapper = (ObjectMapper)EasyMock.createMock(ObjectMapper.class);
        EasyMock.expect((Object)mockObjectMapper.writeValueAsBytes(Query.class)).andThrow((Throwable)new JsonProcessingException("Error"){});
        DirectDruidClient client2 = new DirectDruidClient(conglomerateRule.getConglomerate(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, mockObjectMapper, this.httpClient, "http", "localhost:8080", (ServiceEmitter)new NoopServiceEmitter(), this.queryCancellationExecutor);
        QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(new DruidServer("test1", "localhost", null, 0L, ServerType.HISTORICAL, "_default_tier", 0), (QueryRunner)client2);
        this.serverSelector.addServerAndUpdateSegment(queryableDruidServer2, this.serverSelector.getSegment());
        TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
        TimeBoundaryQuery finalQuery = query = query.withOverriddenContext((Map)ImmutableMap.of((Object)"queryFailTime", (Object)Long.MAX_VALUE));
        Assert.assertThrows(RuntimeException.class, () -> client2.run(QueryPlus.wrap((Query)finalQuery)));
        Assert.assertEquals((long)0L, (long)client2.getNumOpenConnections());
    }
}

