package org.apache.druid.client;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.druid.client.selector.ConnectionCountServerSelectorStrategy;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.QueryableDruidServer;
import org.apache.druid.client.selector.ServerSelector;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/client/DirectDruidClientTest.class */
public class DirectDruidClientTest {
    private final String hostName = "localhost:8080";
    private final DataSegment dataSegment = new DataSegment("test", Intervals.of("2013-01-01/2013-01-02"), DateTimes.of("2013-01-01").toString(), new HashMap(), new ArrayList(), new ArrayList(), NoneShardSpec.instance(), 0, 0);
    private ServerSelector serverSelector;
    private HttpClient httpClient;
    private DirectDruidClient client;
    private QueryableDruidServer queryableDruidServer;

    @Before
    public void setup() {
        this.httpClient = (HttpClient) EasyMock.createMock(HttpClient.class);
        this.serverSelector = new ServerSelector(this.dataSegment, new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()));
        this.client = new DirectDruidClient(new ReflectionQueryToolChestWarehouse(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), this.httpClient, "http", "localhost:8080", new NoopServiceEmitter());
        this.queryableDruidServer = new QueryableDruidServer(new DruidServer("test1", "localhost", null, 0L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), this.client);
        this.serverSelector.addServerAndUpdateSegment(this.queryableDruidServer, this.serverSelector.getSegment());
    }

    @Test
    public void testRun() throws Exception {
        URL url = new URL(StringUtils.format("http://%s/druid/v2/", "localhost:8080"));
        SettableFuture create = SettableFuture.create();
        Capture newCapture = EasyMock.newCapture();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newCapture), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(create).times(1);
        SettableFuture create2 = SettableFuture.create();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newCapture), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(create2).times(1);
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newCapture), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(SettableFuture.create()).atLeastOnce();
        EasyMock.replay(this.httpClient);
        DirectDruidClient directDruidClient = new DirectDruidClient(new ReflectionQueryToolChestWarehouse(), QueryRunnerTestHelper.NOOP_QUERYWATCHER, new DefaultObjectMapper(), this.httpClient, "http", CalciteTests.DATASOURCE2, new NoopServiceEmitter());
        QueryableDruidServer queryableDruidServer = new QueryableDruidServer(new DruidServer("test1", "localhost", null, 0L, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0), directDruidClient);
        this.serverSelector.addServerAndUpdateSegment(queryableDruidServer, this.serverSelector.getSegment());
        TimeBoundaryQuery withOverriddenContext = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build().withOverriddenContext((Map<String, Object>) ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
        Sequence<T> run = this.client.run(QueryPlus.wrap(withOverriddenContext));
        Assert.assertTrue(newCapture.hasCaptured());
        Assert.assertEquals(url, ((Request) newCapture.getValue()).getUrl());
        Assert.assertEquals(HttpMethod.POST, ((Request) newCapture.getValue()).getMethod());
        Assert.assertEquals(1L, this.client.getNumOpenConnections());
        this.client.run(QueryPlus.wrap(withOverriddenContext));
        Assert.assertEquals(2L, this.client.getNumOpenConnections());
        create2.setException(new ReadTimeoutException());
        Assert.assertEquals(1L, this.client.getNumOpenConnections());
        this.client.run(QueryPlus.wrap(withOverriddenContext));
        this.client.run(QueryPlus.wrap(withOverriddenContext));
        this.client.run(QueryPlus.wrap(withOverriddenContext));
        Assert.assertTrue(this.client.getNumOpenConnections() == 4);
        create.set(new ByteArrayInputStream(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\", \"result\": 42.0}]")));
        List list = run.toList();
        Assert.assertEquals(1L, list.size());
        Assert.assertEquals(DateTimes.of("2014-01-01T01:02:03Z"), ((Result) list.get(0)).getTimestamp());
        Assert.assertEquals(3L, this.client.getNumOpenConnections());
        directDruidClient.run(QueryPlus.wrap(withOverriddenContext));
        directDruidClient.run(QueryPlus.wrap(withOverriddenContext));
        Assert.assertEquals(2L, directDruidClient.getNumOpenConnections());
        Assert.assertEquals(this.serverSelector.pick(null), queryableDruidServer);
        EasyMock.verify(this.httpClient);
    }

    @Test
    public void testCancel() {
        Capture newCapture = EasyMock.newCapture();
        ListenableFuture immediateCancelledFuture = Futures.immediateCancelledFuture();
        SettableFuture create = SettableFuture.create();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newCapture), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(immediateCancelledFuture).once();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(newCapture), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(create).anyTimes();
        EasyMock.replay(this.httpClient);
        TimeBoundaryQuery withOverriddenContext = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build().withOverriddenContext((Map<String, Object>) ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
        create.set(new StatusResponseHolder(HttpResponseStatus.OK, new StringBuilder("cancelled")));
        Sequence<T> run = this.client.run(QueryPlus.wrap(withOverriddenContext));
        Assert.assertEquals(HttpMethod.POST, ((Request) newCapture.getValue()).getMethod());
        Assert.assertEquals(0L, this.client.getNumOpenConnections());
        QueryInterruptedException queryInterruptedException = null;
        try {
            run.toList();
        } catch (QueryInterruptedException e) {
            queryInterruptedException = e;
        }
        Assert.assertNotNull(queryInterruptedException);
        EasyMock.verify(this.httpClient);
    }

    @Test
    public void testQueryInterruptionExceptionLogMessage() {
        SettableFuture create = SettableFuture.create();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(EasyMock.newCapture()), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(create).anyTimes();
        EasyMock.replay(this.httpClient);
        TimeBoundaryQuery withOverriddenContext = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build().withOverriddenContext((Map<String, Object>) ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, Long.MAX_VALUE));
        create.set(new ByteArrayInputStream(StringUtils.toUtf8("{\"error\":\"testing1\",\"errorMessage\":\"testing2\"}")));
        QueryInterruptedException queryInterruptedException = null;
        try {
            this.client.run(QueryPlus.wrap(withOverriddenContext)).toList();
        } catch (QueryInterruptedException e) {
            queryInterruptedException = e;
        }
        Assert.assertNotNull(queryInterruptedException);
        Assert.assertEquals("testing1", queryInterruptedException.getErrorCode());
        Assert.assertEquals("testing2", queryInterruptedException.getMessage());
        Assert.assertEquals("localhost:8080", queryInterruptedException.getHost());
        EasyMock.verify(this.httpClient);
    }

    @Test
    public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedException {
        SettableFuture create = SettableFuture.create();
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(EasyMock.newCapture()), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(create).anyTimes();
        EasyMock.replay(this.httpClient);
        Sequence<T> run = this.client.run(QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").build().withOverriddenContext((Map<String, Object>) ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, (String) Long.valueOf(System.currentTimeMillis() + 250), BaseQuery.QUERY_ID, "timeout-before-future"))));
        PipedInputStream pipedInputStream = new PipedInputStream();
        PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
        create.set(pipedInputStream);
        QueryTimeoutException queryTimeoutException = null;
        try {
            pipedOutputStream.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
            Thread.sleep(250L);
            pipedOutputStream.write(StringUtils.toUtf8("]"));
            pipedOutputStream.close();
            run.toList();
        } catch (QueryTimeoutException e) {
            queryTimeoutException = e;
        }
        Assert.assertNotNull(queryTimeoutException);
        Assert.assertEquals(QueryException.QUERY_TIMEOUT_ERROR_CODE, queryTimeoutException.getErrorCode());
        Assert.assertEquals("url[http://localhost:8080/druid/v2/] timed out", queryTimeoutException.getMessage());
        Assert.assertEquals("localhost:8080", queryTimeoutException.getHost());
        EasyMock.verify(this.httpClient);
    }

    @Test
    public void testQueryTimeoutFromFuture() {
        EasyMock.expect(this.httpClient.go((Request) EasyMock.capture(EasyMock.newCapture()), (HttpResponseHandler) EasyMock.anyObject(), (Duration) EasyMock.anyObject(Duration.class))).andReturn(SettableFuture.create()).anyTimes();
        EasyMock.replay(this.httpClient);
        QueryTimeoutException queryTimeoutException = null;
        try {
            this.client.run(QueryPlus.wrap(Druids.newTimeBoundaryQueryBuilder().dataSource("test").build().withOverriddenContext((Map<String, Object>) ImmutableMap.of(DirectDruidClient.QUERY_FAIL_TIME, (String) Long.valueOf(System.currentTimeMillis() + 500), BaseQuery.QUERY_ID, "never-ending-future")))).toList();
        } catch (QueryTimeoutException e) {
            queryTimeoutException = e;
        }
        Assert.assertNotNull(queryTimeoutException);
        Assert.assertEquals(QueryException.QUERY_TIMEOUT_ERROR_CODE, queryTimeoutException.getErrorCode());
        Assert.assertEquals(StringUtils.format("Query [%s] timed out!", "never-ending-future"), queryTimeoutException.getMessage());
        Assert.assertEquals("localhost:8080", queryTimeoutException.getHost());
        EasyMock.verify(this.httpClient);
    }
}
