package org.apache.druid.tests.query;

import com.google.inject.Inject;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.IOUtils;
import org.apache.druid.curator.discovery.ServerDiscoveryFactory;
import org.apache.druid.curator.discovery.ServerDiscoverySelector;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.EventReceiverFirehoseTestClient;
import org.apache.druid.testing.guice.DruidTestModuleFactory;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.utils.ITRetryUtil;
import org.apache.druid.testing.utils.ServerDiscoveryUtil;
import org.apache.druid.tests.TestNGGroup;
import org.apache.druid.tests.indexer.AbstractITBatchIndexTest;
import org.apache.druid.tests.indexer.AbstractIndexerTest;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Guice;
import org.testng.annotations.Test;

@Guice(moduleFactory = DruidTestModuleFactory.class)
@Test(groups = {TestNGGroup.QUERY})
/* loaded from: input_file:org/apache/druid/tests/query/ITUnionQueryTest.class */
public class ITUnionQueryTest extends AbstractIndexerTest {
    private static final Logger LOG = new Logger(ITUnionQueryTest.class);
    private static final String UNION_TASK_RESOURCE = "/indexer/wikipedia_union_index_task.json";
    private static final String EVENT_RECEIVER_SERVICE_PREFIX = "eventReceiverServiceName";
    private static final String UNION_DATA_FILE = "/data/union_query/wikipedia_index_data.json";
    private static final String UNION_QUERIES_RESOURCE = "/queries/union_queries.json";
    private static final String UNION_DATASOURCE = "wikipedia_index_test";

    @Inject
    ServerDiscoveryFactory factory;

    @Inject
    @TestClient
    HttpClient httpClient;

    @Inject
    IntegrationTestingConfig config;
    private String fullDatasourceName;

    @BeforeSuite
    public void setFullDatasourceName() {
        this.fullDatasourceName = UNION_DATASOURCE + this.config.getExtraDatasourceNameSuffix();
    }

    @Test
    public void testUnionQuery() throws IOException {
        Closer create = Closer.create();
        for (int i = 0; i < 3; i++) {
            try {
                create.register(unloader(this.fullDatasourceName + i));
            } catch (Throwable th) {
                create.close();
                throw th;
            }
        }
        try {
            String shutOffTime = setShutOffTime(getResourceAsString(UNION_TASK_RESOURCE), DateTimes.utc(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(3L)));
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 3; i2++) {
                arrayList.add(this.indexer.submitTask(withServiceName(withDataSource(shutOffTime, this.fullDatasourceName + i2), EVENT_RECEIVER_SERVICE_PREFIX + i2)));
            }
            for (int i3 = 0; i3 < 3; i3++) {
                postEvents(i3);
            }
            ITRetryUtil.retryUntil(() -> {
                for (int i4 = 0; i4 < 3; i4++) {
                    int countRows = this.queryHelper.countRows(this.fullDatasourceName + i4, Intervals.of("2013-08-31/2013-09-01"), str -> {
                        return new LongSumAggregatorFactory(str, "count");
                    });
                    if (countRows < 5) {
                        LOG.warn("%d events have been ingested to %s so far", new Object[]{Integer.valueOf(countRows), this.fullDatasourceName + i4});
                        return false;
                    }
                }
                return true;
            }, true, 1000L, 100, "Waiting all events are ingested");
            LOG.info("Running Union Queries..", new Object[0]);
            try {
                String replace = StringUtils.replace(IOUtils.toString(AbstractITBatchIndexTest.class.getResourceAsStream(UNION_QUERIES_RESOURCE), StandardCharsets.UTF_8), "%%DATASOURCE%%", this.fullDatasourceName);
                this.queryHelper.testQueriesFromString(replace);
                for (int i4 = 0; i4 < 3; i4++) {
                    this.indexer.waitUntilTaskCompletes((String) arrayList.get(i4));
                }
                for (int i5 = 0; i5 < 3; i5++) {
                    int i6 = i5;
                    ITRetryUtil.retryUntil(() -> {
                        return Boolean.valueOf(this.coordinator.areSegmentsLoaded(this.fullDatasourceName + i6));
                    }, true, 10000L, 10, "Real-time generated segments loaded");
                }
                this.queryHelper.testQueriesFromString(replace);
                create.close();
            } catch (IOException e) {
                throw new ISE(e, "could not read query file: %s", new Object[]{UNION_QUERIES_RESOURCE});
            }
        } catch (Throwable th2) {
            throw create.rethrow(th2);
        }
    }

    private String setShutOffTime(String str, DateTime dateTime) {
        return StringUtils.replace(str, "#SHUTOFFTIME", dateTime.toString());
    }

    private String withDataSource(String str, String str2) {
        return StringUtils.replace(str, "%%DATASOURCE%%", str2);
    }

    private String withServiceName(String str, String str2) {
        return StringUtils.replace(str, EVENT_RECEIVER_SERVICE_PREFIX, str2);
    }

    private void postEvents(int i) throws Exception {
        ServerDiscoverySelector createSelector = this.factory.createSelector(EVENT_RECEIVER_SERVICE_PREFIX + i);
        createSelector.start();
        try {
            ServerDiscoveryUtil.waitUntilInstanceReady(createSelector, "Event Receiver");
            String str = this.config.getMiddleManagerHost() + ":" + createSelector.pick().getPort();
            LOG.info("Event Receiver Found at host [%s]", new Object[]{str});
            LOG.info("Checking worker /status/health for [%s]", new Object[]{str});
            ITRetryUtil.retryUntilTrue(() -> {
                try {
                    return Boolean.valueOf(((StatusResponseHolder) this.httpClient.go(new Request(HttpMethod.GET, new URL(StringUtils.format("https://%s/status/health", new Object[]{str}))), StatusResponseHandler.getInstance()).get()).getStatus().equals(HttpResponseStatus.OK));
                } catch (Throwable th) {
                    LOG.error(th, "", new Object[0]);
                    return false;
                }
            }, StringUtils.format("Checking /status/health for worker [%s]", new Object[]{str}));
            LOG.info("Finished checking worker /status/health for [%s], success", new Object[]{str});
            new EventReceiverFirehoseTestClient(str, EVENT_RECEIVER_SERVICE_PREFIX + i, this.jsonMapper, this.httpClient, this.smileMapper).postEventsFromFile(UNION_DATA_FILE);
            createSelector.stop();
        } catch (Throwable th) {
            createSelector.stop();
            throw th;
        }
    }
}
