package org.apache.druid.server.coordinator.loading;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.server.ServerTestHelper;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.druid.server.coordinator.simulate.BlockingExecutorService;
import org.apache.druid.server.coordinator.simulate.WrappingScheduledExecutorService;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest.class */
public class HttpLoadQueuePeonTest {
    private TestHttpClient httpClient;
    private HttpLoadQueuePeon httpLoadQueuePeon;
    private BlockingExecutorService processingExecutor;
    private BlockingExecutorService callbackExecutor;
    private final List<DataSegment> segments = CreateDataSegments.ofDatasource("test").forIntervals(1, Granularities.DAY).startingAt("2022-01-01").withNumPartitions(4).eachOfSizeInMb(100);
    private final List<DataSegment> processedSegments = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest$QueueAction.class */
    public static class QueueAction {
        final DataSegment segment;
        final Consumer<DataSegment> action;

        static QueueAction of(DataSegment dataSegment, Consumer<DataSegment> consumer) {
            return new QueueAction(dataSegment, consumer);
        }

        QueueAction(DataSegment dataSegment, Consumer<DataSegment> consumer) {
            this.segment = dataSegment;
            this.action = consumer;
        }

        void invoke() {
            this.action.accept(this.segment);
        }
    }

    /* loaded from: input_file:org/apache/druid/server/coordinator/loading/HttpLoadQueuePeonTest$TestHttpClient.class */
    private static class TestHttpClient implements HttpClient, DataSegmentChangeHandler {
        private final List<DataSegment> segmentsSentToServer;

        private TestHttpClient() {
            this.segmentsSentToServer = new ArrayList();
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler) {
            throw new UnsupportedOperationException("Not Implemented.");
        }

        public <Intermediate, Final> ListenableFuture<Final> go(Request request, HttpResponseHandler<Intermediate, Final> httpResponseHandler, Duration duration) {
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            defaultHttpResponse.setContent(ChannelBuffers.buffer(0));
            httpResponseHandler.handleResponse(defaultHttpResponse, (HttpResponseHandler.TrafficCop) null);
            try {
                List<DataSegmentChangeRequest> list = (List) ServerTestHelper.MAPPER.readValue(request.getContent().array(), new TypeReference<List<DataSegmentChangeRequest>>() { // from class: org.apache.druid.server.coordinator.loading.HttpLoadQueuePeonTest.TestHttpClient.1
                });
                ArrayList arrayList = new ArrayList(list.size());
                for (DataSegmentChangeRequest dataSegmentChangeRequest : list) {
                    dataSegmentChangeRequest.go(this, (DataSegmentChangeCallback) null);
                    arrayList.add(new SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus(dataSegmentChangeRequest, SegmentLoadDropHandler.Status.SUCCESS));
                }
                return Futures.immediateFuture(new ByteArrayInputStream(ServerTestHelper.MAPPER.writerWithType(HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF).writeValueAsBytes(arrayList)));
            } catch (Exception e) {
                throw new RE(e, "Unexpected exception.", new Object[0]);
            }
        }

        public void addSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
            this.segmentsSentToServer.add(dataSegment);
        }

        public void removeSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
            this.segmentsSentToServer.add(dataSegment);
        }
    }

    @Before
    public void setUp() {
        this.httpClient = new TestHttpClient();
        this.processingExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-%s");
        this.callbackExecutor = new BlockingExecutorService("HttpLoadQueuePeonTest-cb");
        this.processedSegments.clear();
        this.httpLoadQueuePeon = new HttpLoadQueuePeon("http://dummy:4000", ServerTestHelper.MAPPER, this.httpClient, new TestDruidCoordinatorConfig.Builder().withHttpLoadQueuePeonBatchSize(10).build(), new WrappingScheduledExecutorService("HttpLoadQueuePeonTest-%s", this.processingExecutor, true), this.callbackExecutor);
        this.httpLoadQueuePeon.start();
    }

    @After
    public void tearDown() {
        this.httpLoadQueuePeon.stop();
    }

    @Test
    public void testSimple() {
        this.httpLoadQueuePeon.dropSegment(this.segments.get(0), markSegmentProcessed(this.segments.get(0)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(1), SegmentAction.LOAD, markSegmentProcessed(this.segments.get(1)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(2), SegmentAction.REPLICATE, markSegmentProcessed(this.segments.get(2)));
        this.httpLoadQueuePeon.loadSegment(this.segments.get(3), SegmentAction.MOVE_TO, markSegmentProcessed(this.segments.get(3)));
        this.processingExecutor.finishAllPendingTasks();
        Assert.assertEquals(this.segments, this.httpClient.segmentsSentToServer);
        this.callbackExecutor.finishAllPendingTasks();
        Assert.assertEquals(this.segments, this.processedSegments);
    }

    @Test
    public void testLoadDropAfterStop() {
        this.httpLoadQueuePeon.stop();
        HashSet hashSet = new HashSet();
        DataSegment dataSegment = this.segments.get(0);
        this.httpLoadQueuePeon.dropSegment(dataSegment, z -> {
            if (z) {
                return;
            }
            hashSet.add(dataSegment);
        });
        DataSegment dataSegment2 = this.segments.get(1);
        this.httpLoadQueuePeon.loadSegment(dataSegment2, SegmentAction.MOVE_TO, z2 -> {
            if (z2) {
                return;
            }
            hashSet.add(dataSegment2);
        });
        Assert.assertTrue(hashSet.contains(dataSegment));
        Assert.assertTrue(hashSet.contains(dataSegment2));
    }

    @Test
    public void testPriorityOfSegmentAction() {
        ArrayList arrayList = new ArrayList(this.segments);
        Collections.shuffle(arrayList);
        List asList = Arrays.asList(QueueAction.of((DataSegment) arrayList.get(0), dataSegment -> {
            this.httpLoadQueuePeon.dropSegment(dataSegment, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(1), dataSegment2 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment2, SegmentAction.LOAD, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(2), dataSegment3 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment3, SegmentAction.REPLICATE, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(3), dataSegment4 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment4, SegmentAction.MOVE_TO, (LoadPeonCallback) null);
        }));
        Collections.shuffle(asList);
        asList.forEach((v0) -> {
            v0.invoke();
        });
        this.processingExecutor.finishAllPendingTasks();
        Assert.assertEquals(arrayList, this.httpClient.segmentsSentToServer);
    }

    @Test
    public void testPriorityOfSegmentInterval() {
        ArrayList arrayList = new ArrayList(this.segments);
        Collections.shuffle(arrayList);
        ArrayList arrayList2 = new ArrayList(CreateDataSegments.ofDatasource("test").forIntervals(1, Granularities.DAY).startingAt("2022-01-02").withNumPartitions(4).eachOfSizeInMb(100L));
        Collections.shuffle(arrayList2);
        List asList = Arrays.asList(QueueAction.of((DataSegment) arrayList2.get(0), dataSegment -> {
            this.httpLoadQueuePeon.dropSegment(dataSegment, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(0), dataSegment2 -> {
            this.httpLoadQueuePeon.dropSegment(dataSegment2, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList2.get(1), dataSegment3 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment3, SegmentAction.LOAD, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(1), dataSegment4 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment4, SegmentAction.LOAD, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList2.get(2), dataSegment5 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment5, SegmentAction.REPLICATE, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(2), dataSegment6 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment6, SegmentAction.REPLICATE, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList2.get(3), dataSegment7 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment7, SegmentAction.MOVE_TO, (LoadPeonCallback) null);
        }), QueueAction.of((DataSegment) arrayList.get(3), dataSegment8 -> {
            this.httpLoadQueuePeon.loadSegment(dataSegment8, SegmentAction.MOVE_TO, (LoadPeonCallback) null);
        }));
        List list = (List) asList.stream().map(queueAction -> {
            return queueAction.segment;
        }).collect(Collectors.toList());
        Collections.shuffle(asList);
        asList.forEach((v0) -> {
            v0.invoke();
        });
        this.processingExecutor.finishNextPendingTask();
        Assert.assertEquals(list, this.httpClient.segmentsSentToServer);
    }

    @Test
    public void testCancelLoad() {
        DataSegment dataSegment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(dataSegment, SegmentAction.REPLICATE, markSegmentProcessed(dataSegment));
        Assert.assertEquals(1L, this.httpLoadQueuePeon.getSegmentsToLoad().size());
        Assert.assertTrue(this.httpLoadQueuePeon.cancelOperation(dataSegment));
        Assert.assertEquals(0L, this.httpLoadQueuePeon.getSegmentsToLoad().size());
        Assert.assertTrue(this.processedSegments.isEmpty());
    }

    @Test
    public void testCancelDrop() {
        DataSegment dataSegment = this.segments.get(0);
        this.httpLoadQueuePeon.dropSegment(dataSegment, markSegmentProcessed(dataSegment));
        Assert.assertEquals(1L, this.httpLoadQueuePeon.getSegmentsToDrop().size());
        Assert.assertTrue(this.httpLoadQueuePeon.cancelOperation(dataSegment));
        Assert.assertTrue(this.httpLoadQueuePeon.getSegmentsToDrop().isEmpty());
        Assert.assertTrue(this.processedSegments.isEmpty());
    }

    @Test
    public void testCannotCancelRequestSentToServer() {
        DataSegment dataSegment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(dataSegment, SegmentAction.REPLICATE, markSegmentProcessed(dataSegment));
        Assert.assertTrue(this.httpLoadQueuePeon.getSegmentsToLoad().contains(dataSegment));
        this.processingExecutor.finishNextPendingTask();
        Assert.assertTrue(this.httpClient.segmentsSentToServer.contains(dataSegment));
        Assert.assertTrue(this.httpLoadQueuePeon.getSegmentsToLoad().contains(dataSegment));
        Assert.assertFalse(this.httpLoadQueuePeon.cancelOperation(dataSegment));
        this.processingExecutor.finishNextPendingTask();
        Assert.assertTrue(this.httpLoadQueuePeon.getSegmentsToLoad().isEmpty());
        Assert.assertFalse(this.httpLoadQueuePeon.cancelOperation(dataSegment));
        this.callbackExecutor.finishAllPendingTasks();
        Assert.assertTrue(this.processedSegments.contains(dataSegment));
    }

    @Test
    public void testCannotCancelOperationMultipleTimes() {
        DataSegment dataSegment = this.segments.get(0);
        this.httpLoadQueuePeon.loadSegment(dataSegment, SegmentAction.REPLICATE, markSegmentProcessed(dataSegment));
        Assert.assertTrue(this.httpLoadQueuePeon.getSegmentsToLoad().contains(dataSegment));
        Assert.assertTrue(this.httpLoadQueuePeon.cancelOperation(dataSegment));
        Assert.assertFalse(this.httpLoadQueuePeon.cancelOperation(dataSegment));
    }

    private LoadPeonCallback markSegmentProcessed(DataSegment dataSegment) {
        return z -> {
            this.processedSegments.add(dataSegment);
        };
    }
}
