package org.apache.druid.server.coordinator;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.coordination.SegmentLoadDropHandler;
import org.apache.druid.timeline.DataSegment;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/server/coordinator/HttpLoadQueuePeon.class */
public class HttpLoadQueuePeon extends LoadQueuePeon {
    public static final TypeReference<List<DataSegmentChangeRequest>> REQUEST_ENTITY_TYPE_REF = new TypeReference<List<DataSegmentChangeRequest>>() { // from class: org.apache.druid.server.coordinator.HttpLoadQueuePeon.1
    };
    public static final TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>> RESPONSE_ENTITY_TYPE_REF = new TypeReference<List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus>>() { // from class: org.apache.druid.server.coordinator.HttpLoadQueuePeon.2
    };
    private static final EmittingLogger log = new EmittingLogger(HttpLoadQueuePeon.class);
    private final ScheduledExecutorService processingExecutor;
    private final DruidCoordinatorConfig config;
    private final ObjectMapper jsonMapper;
    private final HttpClient httpClient;
    private final URL changeRequestURL;
    private final String serverId;
    private final ExecutorService callBackExecutor;
    private final ObjectWriter requestBodyWriter;
    private final AtomicLong queuedSize = new AtomicLong(0);
    private final AtomicInteger failedAssignCount = new AtomicInteger(0);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
    private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
    private volatile boolean stopped = false;
    private final Object lock = new Object();
    private final AtomicBoolean mainLoopInProgress = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.druid.server.coordinator.HttpLoadQueuePeon$5, reason: invalid class name */
    /* loaded from: input_file:org/apache/druid/server/coordinator/HttpLoadQueuePeon$5.class */
    public static /* synthetic */ class AnonymousClass5 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$druid$server$coordination$SegmentLoadDropHandler$Status$STATE = new int[SegmentLoadDropHandler.Status.STATE.values().length];

        static {
            try {
                $SwitchMap$org$apache$druid$server$coordination$SegmentLoadDropHandler$Status$STATE[SegmentLoadDropHandler.Status.STATE.SUCCESS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$druid$server$coordination$SegmentLoadDropHandler$Status$STATE[SegmentLoadDropHandler.Status.STATE.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$druid$server$coordination$SegmentLoadDropHandler$Status$STATE[SegmentLoadDropHandler.Status.STATE.PENDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/druid/server/coordinator/HttpLoadQueuePeon$DropSegmentHolder.class */
    private class DropSegmentHolder extends SegmentHolder {
        public DropSegmentHolder(DataSegment dataSegment, LoadPeonCallback loadPeonCallback) {
            super(dataSegment, new SegmentChangeRequestDrop(dataSegment), loadPeonCallback);
        }
    }

    /* loaded from: input_file:org/apache/druid/server/coordinator/HttpLoadQueuePeon$LoadSegmentHolder.class */
    private class LoadSegmentHolder extends SegmentHolder {
        public LoadSegmentHolder(DataSegment dataSegment, LoadPeonCallback loadPeonCallback) {
            super(dataSegment, new SegmentChangeRequestLoad(dataSegment), loadPeonCallback);
            HttpLoadQueuePeon.this.queuedSize.addAndGet(dataSegment.getSize());
        }

        @Override // org.apache.druid.server.coordinator.HttpLoadQueuePeon.SegmentHolder
        public void requestSucceeded() {
            HttpLoadQueuePeon.this.queuedSize.addAndGet(-getSegment().getSize());
            super.requestSucceeded();
        }

        @Override // org.apache.druid.server.coordinator.HttpLoadQueuePeon.SegmentHolder
        public void requestFailed(String str) {
            HttpLoadQueuePeon.this.queuedSize.addAndGet(-getSegment().getSize());
            super.requestFailed(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/server/coordinator/HttpLoadQueuePeon$SegmentHolder.class */
    public abstract class SegmentHolder {
        private final DataSegment segment;
        private final DataSegmentChangeRequest changeRequest;
        private final List<LoadPeonCallback> callbacks;
        private volatile long scheduleTime;

        private SegmentHolder(DataSegment dataSegment, DataSegmentChangeRequest dataSegmentChangeRequest, LoadPeonCallback loadPeonCallback) {
            this.callbacks = new ArrayList();
            this.scheduleTime = -1L;
            this.segment = dataSegment;
            this.changeRequest = dataSegmentChangeRequest;
            if (loadPeonCallback != null) {
                this.callbacks.add(loadPeonCallback);
            }
        }

        public void addCallback(LoadPeonCallback loadPeonCallback) {
            synchronized (this.callbacks) {
                if (loadPeonCallback != null) {
                    this.callbacks.add(loadPeonCallback);
                }
            }
        }

        public DataSegment getSegment() {
            return this.segment;
        }

        public DataSegmentChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public boolean hasTimedOut() {
            if (this.scheduleTime >= 0) {
                return System.currentTimeMillis() - this.scheduleTime > HttpLoadQueuePeon.this.config.getLoadTimeoutDelay().getMillis();
            }
            this.scheduleTime = System.currentTimeMillis();
            return false;
        }

        public void requestSucceeded() {
            HttpLoadQueuePeon.log.trace("Server[%s] Successfully processed segment[%s] request[%s].", HttpLoadQueuePeon.this.serverId, this.segment.getId(), this.changeRequest.getClass().getSimpleName());
            HttpLoadQueuePeon.this.callBackExecutor.execute(() -> {
                for (LoadPeonCallback loadPeonCallback : this.callbacks) {
                    if (loadPeonCallback != null) {
                        loadPeonCallback.execute();
                    }
                }
            });
        }

        public void requestFailed(String str) {
            HttpLoadQueuePeon.log.error("Server[%s] Failed segment[%s] request[%s] with cause [%s].", HttpLoadQueuePeon.this.serverId, this.segment.getId(), this.changeRequest.getClass().getSimpleName(), str);
            HttpLoadQueuePeon.this.failedAssignCount.getAndIncrement();
            HttpLoadQueuePeon.this.callBackExecutor.execute(() -> {
                for (LoadPeonCallback loadPeonCallback : this.callbacks) {
                    if (loadPeonCallback != null) {
                        loadPeonCallback.execute();
                    }
                }
            });
        }

        public String toString() {
            return this.changeRequest.toString();
        }
    }

    public HttpLoadQueuePeon(String str, ObjectMapper objectMapper, HttpClient httpClient, DruidCoordinatorConfig druidCoordinatorConfig, ScheduledExecutorService scheduledExecutorService, ExecutorService executorService) {
        this.jsonMapper = objectMapper;
        this.requestBodyWriter = objectMapper.writerWithType(REQUEST_ENTITY_TYPE_REF);
        this.httpClient = httpClient;
        this.config = druidCoordinatorConfig;
        this.processingExecutor = scheduledExecutorService;
        this.callBackExecutor = executorService;
        this.serverId = str;
        try {
            this.changeRequestURL = new URL(new URL(str), StringUtils.nonStrictFormat("druid-internal/v1/segments/changeRequests?timeout=%d", Long.valueOf(druidCoordinatorConfig.getHttpLoadQueuePeonHostTimeout().getMillis())));
        } catch (MalformedURLException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doSegmentManagement() {
        if (this.stopped || !this.mainLoopInProgress.compareAndSet(false, true)) {
            log.trace("[%s]Ignoring tick. Either in-progress already or stopped.", this.serverId);
            return;
        }
        int httpLoadQueuePeonBatchSize = this.config.getHttpLoadQueuePeonBatchSize();
        ArrayList arrayList = new ArrayList(httpLoadQueuePeonBatchSize);
        synchronized (this.lock) {
            Iterator concat = Iterators.concat(this.segmentsToDrop.entrySet().iterator(), this.segmentsToLoad.entrySet().iterator());
            while (arrayList.size() < httpLoadQueuePeonBatchSize && concat.hasNext()) {
                Map.Entry entry = (Map.Entry) concat.next();
                if (((SegmentHolder) entry.getValue()).hasTimedOut()) {
                    ((SegmentHolder) entry.getValue()).requestFailed("timed out");
                    concat.remove();
                } else {
                    arrayList.add(((SegmentHolder) entry.getValue()).getChangeRequest());
                }
            }
        }
        if (arrayList.size() == 0) {
            log.trace("[%s]Found no load/drop requests. SegmentsToLoad[%d], SegmentsToDrop[%d], batchSize[%d].", this.serverId, Integer.valueOf(this.segmentsToLoad.size()), Integer.valueOf(this.segmentsToDrop.size()), Integer.valueOf(this.config.getHttpLoadQueuePeonBatchSize()));
            this.mainLoopInProgress.set(false);
            return;
        }
        try {
            log.trace("Sending [%d] load/drop requests to Server[%s].", Integer.valueOf(arrayList.size()), this.serverId);
            final BytesAccumulatingResponseHandler bytesAccumulatingResponseHandler = new BytesAccumulatingResponseHandler();
            Futures.addCallback(this.httpClient.go(new Request(HttpMethod.POST, this.changeRequestURL).addHeader("Accept", "application/json").addHeader("Content-Type", "application/json").setContent(this.requestBodyWriter.writeValueAsBytes(arrayList)), bytesAccumulatingResponseHandler, new Duration(this.config.getHttpLoadQueuePeonHostTimeout().getMillis() + ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS)), new FutureCallback<InputStream>() { // from class: org.apache.druid.server.coordinator.HttpLoadQueuePeon.3
                /* JADX WARN: Finally extract failed */
                @Override // com.google.common.util.concurrent.FutureCallback
                public void onSuccess(InputStream inputStream) {
                    boolean z = true;
                    try {
                        if (bytesAccumulatingResponseHandler.getStatus() == 204) {
                            HttpLoadQueuePeon.log.trace("Received NO CONTENT reseponse from [%s]", HttpLoadQueuePeon.this.serverId);
                        } else if (200 == bytesAccumulatingResponseHandler.getStatus()) {
                            try {
                                List<SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus> list = (List) HttpLoadQueuePeon.this.jsonMapper.readValue(inputStream, HttpLoadQueuePeon.RESPONSE_ENTITY_TYPE_REF);
                                HttpLoadQueuePeon.log.trace("Server[%s] returned status response [%s].", HttpLoadQueuePeon.this.serverId, list);
                                synchronized (HttpLoadQueuePeon.this.lock) {
                                    if (HttpLoadQueuePeon.this.stopped) {
                                        HttpLoadQueuePeon.log.trace("Ignoring response from Server[%s]. We are already stopped.", HttpLoadQueuePeon.this.serverId);
                                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                                        if (0 != 0) {
                                            ScheduledExecutorService scheduledExecutorService = HttpLoadQueuePeon.this.processingExecutor;
                                            HttpLoadQueuePeon httpLoadQueuePeon = HttpLoadQueuePeon.this;
                                            scheduledExecutorService.execute(() -> {
                                                httpLoadQueuePeon.doSegmentManagement();
                                            });
                                            return;
                                        }
                                        return;
                                    }
                                    for (SegmentLoadDropHandler.DataSegmentChangeRequestAndStatus dataSegmentChangeRequestAndStatus : list) {
                                        switch (AnonymousClass5.$SwitchMap$org$apache$druid$server$coordination$SegmentLoadDropHandler$Status$STATE[dataSegmentChangeRequestAndStatus.getStatus().getState().ordinal()]) {
                                            case 1:
                                            case 2:
                                                HttpLoadQueuePeon.this.handleResponseStatus(dataSegmentChangeRequestAndStatus.getRequest(), dataSegmentChangeRequestAndStatus.getStatus());
                                                break;
                                            case 3:
                                                HttpLoadQueuePeon.log.trace("Request[%s] is still pending on server[%s].", dataSegmentChangeRequestAndStatus.getRequest(), HttpLoadQueuePeon.this.serverId);
                                                break;
                                            default:
                                                z = false;
                                                HttpLoadQueuePeon.log.error("Server[%s] returned unknown state in status[%s].", HttpLoadQueuePeon.this.serverId, dataSegmentChangeRequestAndStatus.getStatus());
                                                break;
                                        }
                                    }
                                }
                            } catch (Exception e) {
                                z = false;
                                logRequestFailure(e);
                            }
                        } else {
                            z = false;
                            logRequestFailure(new RE("Unexpected Response Status.", new Object[0]));
                        }
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                        if (z) {
                            ScheduledExecutorService scheduledExecutorService2 = HttpLoadQueuePeon.this.processingExecutor;
                            HttpLoadQueuePeon httpLoadQueuePeon2 = HttpLoadQueuePeon.this;
                            scheduledExecutorService2.execute(() -> {
                                httpLoadQueuePeon2.doSegmentManagement();
                            });
                        }
                    } catch (Throwable th) {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                        if (1 != 0) {
                            ScheduledExecutorService scheduledExecutorService3 = HttpLoadQueuePeon.this.processingExecutor;
                            HttpLoadQueuePeon httpLoadQueuePeon3 = HttpLoadQueuePeon.this;
                            scheduledExecutorService3.execute(() -> {
                                httpLoadQueuePeon3.doSegmentManagement();
                            });
                        }
                        throw th;
                    }
                }

                @Override // com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    try {
                        logRequestFailure(th);
                    } finally {
                        HttpLoadQueuePeon.this.mainLoopInProgress.set(false);
                    }
                }

                private void logRequestFailure(Throwable th) {
                    HttpLoadQueuePeon.log.error(th, "Request[%s] Failed with status[%s]. Reason[%s].", HttpLoadQueuePeon.this.changeRequestURL, Integer.valueOf(bytesAccumulatingResponseHandler.getStatus()), bytesAccumulatingResponseHandler.getDescription());
                }
            }, this.processingExecutor);
        } catch (Throwable th) {
            log.error(th, "Error sending load/drop request to [%s].", this.serverId);
            this.mainLoopInProgress.set(false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResponseStatus(DataSegmentChangeRequest dataSegmentChangeRequest, final SegmentLoadDropHandler.Status status) {
        dataSegmentChangeRequest.go(new DataSegmentChangeHandler() { // from class: org.apache.druid.server.coordinator.HttpLoadQueuePeon.4
            @Override // org.apache.druid.server.coordination.DataSegmentChangeHandler
            public void addSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
                updateSuccessOrFailureInHolder((SegmentHolder) HttpLoadQueuePeon.this.segmentsToLoad.remove(dataSegment), status);
            }

            @Override // org.apache.druid.server.coordination.DataSegmentChangeHandler
            public void removeSegment(DataSegment dataSegment, DataSegmentChangeCallback dataSegmentChangeCallback) {
                updateSuccessOrFailureInHolder((SegmentHolder) HttpLoadQueuePeon.this.segmentsToDrop.remove(dataSegment), status);
            }

            private void updateSuccessOrFailureInHolder(SegmentHolder segmentHolder, SegmentLoadDropHandler.Status status2) {
                if (segmentHolder == null) {
                    return;
                }
                if (status2.getState() == SegmentLoadDropHandler.Status.STATE.FAILED) {
                    segmentHolder.requestFailed(status2.getFailureCause());
                } else {
                    segmentHolder.requestSucceeded();
                }
            }
        }, null);
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void start() {
        synchronized (this.lock) {
            if (this.stopped) {
                throw new ISE("Can't start.", new Object[0]);
            }
            ScheduledExecutors.scheduleAtFixedRate(this.processingExecutor, new Duration(this.config.getHttpLoadQueuePeonRepeatDelay()), () -> {
                if (!this.stopped) {
                    doSegmentManagement();
                }
                return this.stopped ? ScheduledExecutors.Signal.STOP : ScheduledExecutors.Signal.REPEAT;
            });
        }
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void stop() {
        synchronized (this.lock) {
            if (this.stopped) {
                return;
            }
            this.stopped = true;
            Iterator<SegmentHolder> it2 = this.segmentsToDrop.values().iterator();
            while (it2.hasNext()) {
                it2.next().requestFailed("Stopping load queue peon.");
            }
            Iterator<SegmentHolder> it3 = this.segmentsToLoad.values().iterator();
            while (it3.hasNext()) {
                it3.next().requestFailed("Stopping load queue peon.");
            }
            this.segmentsToDrop.clear();
            this.segmentsToLoad.clear();
            this.queuedSize.set(0L);
            this.failedAssignCount.set(0);
        }
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void loadSegment(DataSegment dataSegment, LoadPeonCallback loadPeonCallback) {
        synchronized (this.lock) {
            if (this.stopped) {
                log.warn("Server[%s] cannot load segment[%s] because load queue peon is stopped.", this.serverId, dataSegment.getId());
                loadPeonCallback.execute();
                return;
            }
            SegmentHolder segmentHolder = this.segmentsToLoad.get(dataSegment);
            if (segmentHolder == null) {
                log.trace("Server[%s] to load segment[%s] queued.", this.serverId, dataSegment.getId());
                this.segmentsToLoad.put(dataSegment, new LoadSegmentHolder(dataSegment, loadPeonCallback));
                this.processingExecutor.execute(this::doSegmentManagement);
            } else {
                segmentHolder.addCallback(loadPeonCallback);
            }
        }
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void dropSegment(DataSegment dataSegment, LoadPeonCallback loadPeonCallback) {
        synchronized (this.lock) {
            if (this.stopped) {
                log.warn("Server[%s] cannot drop segment[%s] because load queue peon is stopped.", this.serverId, dataSegment.getId());
                loadPeonCallback.execute();
                return;
            }
            SegmentHolder segmentHolder = this.segmentsToDrop.get(dataSegment);
            if (segmentHolder == null) {
                log.trace("Server[%s] to drop segment[%s] queued.", this.serverId, dataSegment.getId());
                this.segmentsToDrop.put(dataSegment, new DropSegmentHolder(dataSegment, loadPeonCallback));
                this.processingExecutor.execute(this::doSegmentManagement);
            } else {
                segmentHolder.addCallback(loadPeonCallback);
            }
        }
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public Set<DataSegment> getSegmentsToLoad() {
        return Collections.unmodifiableSet(this.segmentsToLoad.keySet());
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public Set<DataSegment> getSegmentsToDrop() {
        return Collections.unmodifiableSet(this.segmentsToDrop.keySet());
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public long getLoadQueueSize() {
        return this.queuedSize.get();
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public int getAndResetFailedAssignCount() {
        return this.failedAssignCount.getAndSet(0);
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void markSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.add(dataSegment);
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public void unmarkSegmentToDrop(DataSegment dataSegment) {
        this.segmentsMarkedToDrop.remove(dataSegment);
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public int getNumberOfSegmentsInQueue() {
        return this.segmentsToLoad.size();
    }

    @Override // org.apache.druid.server.coordinator.LoadQueuePeon
    public Set<DataSegment> getSegmentsMarkedToDrop() {
        return Collections.unmodifiableSet(this.segmentsMarkedToDrop);
    }
}
