package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.RetryPolicy;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.response.BytesFullResponseHandler;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.IgnoreHttpResponseHandler;
import org.apache.druid.rpc.RequestBuilder;
import org.apache.druid.rpc.ServiceClient;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceClosedException;
import org.apache.druid.rpc.ServiceLocation;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
import org.apache.druid.rpc.ServiceNotAvailableException;
import org.apache.druid.rpc.ServiceRetryPolicy;
import org.apache.druid.rpc.StandardRetryPolicy;
import org.apache.druid.rpc.indexing.SpecificTaskRetryPolicy;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.class */
public abstract class SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType> implements SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> {
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClientAsyncImpl.class);
    private final ServiceClientFactory serviceClientFactory;
    private final TaskInfoProvider taskInfoProvider;
    private final ObjectMapper jsonMapper;
    private final Duration httpTimeout;
    private final long httpRetries;
    private final ScheduledExecutorService retryExec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl$SeekableStreamRequestBuilder.class */
    public class SeekableStreamRequestBuilder<IntermediateType, FinalType, T> {
        private final String taskId;
        private final RequestBuilder requestBuilder;
        private HttpResponseHandler<IntermediateType, FinalType> responseHandler;
        private Function<FinalType, T> responseTransformer;
        private final List<Function<Throwable, Either<Throwable, T>>> exceptionMappers = new ArrayList();
        private boolean retry = true;

        SeekableStreamRequestBuilder(String str, RequestBuilder requestBuilder, HttpResponseHandler<IntermediateType, FinalType> httpResponseHandler, Function<FinalType, T> function) {
            this.taskId = str;
            this.requestBuilder = requestBuilder;
            this.responseHandler = httpResponseHandler;
            this.responseTransformer = function;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <NewIntermediateType, NewFinalType> SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<NewIntermediateType, NewFinalType, T> handler(HttpResponseHandler<NewIntermediateType, NewFinalType> httpResponseHandler) {
            this.responseHandler = httpResponseHandler;
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <NewT> SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, NewT> onSuccess(Function<FinalType, NewT> function) {
            this.responseTransformer = function;
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, T> retry(boolean z) {
            this.retry = z;
            return this;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onException(Function<Throwable, Either<Throwable, T>> function) {
            this.exceptionMappers.add(function);
            return this;
        }

        public SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onHttpError(Function<HttpResponseException, Either<Throwable, T>> function) {
            return onException(th -> {
                return th instanceof HttpResponseException ? (Either) function.apply((HttpResponseException) th) : Either.error(th);
            });
        }

        public SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onNotAvailable(Function<ServiceNotAvailableException, Either<Throwable, T>> function) {
            return onException(th -> {
                return th instanceof ServiceNotAvailableException ? (Either) function.apply((ServiceNotAvailableException) th) : Either.error(th);
            });
        }

        public SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<IntermediateType, FinalType, T> onClosed(Function<ServiceClosedException, Either<Throwable, T>> function) {
            return onException(th -> {
                return th instanceof ServiceClosedException ? (Either) function.apply((ServiceClosedException) th) : Either.error(th);
            });
        }

        public ListenableFuture<T> go() {
            ServiceClient makeClient = makeClient(this.taskId, this.retry);
            final SettableFuture create = SettableFuture.create();
            Futures.addCallback(FutureUtils.transform(makeClient.asyncRequest(this.requestBuilder.timeout(SeekableStreamIndexTaskClientAsyncImpl.this.httpTimeout), this.responseHandler), this.responseTransformer), new FutureCallback<T>() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl.SeekableStreamRequestBuilder.1
                public void onSuccess(@Nullable T t) {
                    create.set(t);
                }

                /* JADX WARN: Multi-variable type inference failed */
                public void onFailure(Throwable th) {
                    Either error = Either.error(th);
                    for (Function function : SeekableStreamRequestBuilder.this.exceptionMappers) {
                        if (!error.isError()) {
                            break;
                        }
                        try {
                            Either either = (Either) function.apply(error.error());
                            if (either != null) {
                                error = either;
                            }
                        } catch (Throwable th2) {
                            SeekableStreamIndexTaskClientAsyncImpl.log.warn(th2, "Failed to map exception encountered while contacting task [%s]", new Object[]{SeekableStreamRequestBuilder.this.taskId});
                        }
                    }
                    if (error.isError()) {
                        create.setException((Throwable) error.error());
                    } else {
                        create.set(error.valueOrThrow());
                    }
                }
            });
            return create;
        }

        private ServiceClient makeClient(String str, boolean z) {
            ServiceRetryPolicy makeRetryPolicy = makeRetryPolicy(str, z);
            return SeekableStreamIndexTaskClientAsyncImpl.this.serviceClientFactory.makeClient(str, new SeekableStreamTaskLocator(SeekableStreamIndexTaskClientAsyncImpl.this.taskInfoProvider, str), makeRetryPolicy);
        }

        private ServiceRetryPolicy makeRetryPolicy(String str, boolean z) {
            return new SpecificTaskRetryPolicy(str, z ? StandardRetryPolicy.builder().maxAttempts(SeekableStreamIndexTaskClientAsyncImpl.this.httpRetries + 1).minWaitMillis(2000L).maxWaitMillis(10000L).retryNotAvailable(false).build() : StandardRetryPolicy.noRetries());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl$SeekableStreamTaskLocator.class */
    public static class SeekableStreamTaskLocator implements ServiceLocator {
        private static final String BASE_PATH = "/druid/worker/v1/chat";
        private final TaskInfoProvider taskInfoProvider;
        private final String taskId;

        SeekableStreamTaskLocator(TaskInfoProvider taskInfoProvider, String str) {
            this.taskInfoProvider = taskInfoProvider;
            this.taskId = str;
        }

        public ListenableFuture<ServiceLocations> locate() {
            Optional<TaskStatus> taskStatus = this.taskInfoProvider.getTaskStatus(this.taskId);
            if (!taskStatus.isPresent() || !((TaskStatus) taskStatus.get()).isRunnable()) {
                return Futures.immediateFuture(ServiceLocations.closed());
            }
            TaskLocation taskLocation = this.taskInfoProvider.getTaskLocation(this.taskId);
            return taskLocation.getHost() == null ? Futures.immediateFuture(ServiceLocations.forLocations(Collections.emptySet())) : Futures.immediateFuture(ServiceLocations.forLocation(new ServiceLocation(taskLocation.getHost(), taskLocation.getPort(), taskLocation.getTlsPort(), StringUtils.format("%s/%s", new Object[]{BASE_PATH, StringUtils.urlEncode(this.taskId)}))));
        }

        public void close() {
        }
    }

    public SeekableStreamIndexTaskClientAsyncImpl(String str, ServiceClientFactory serviceClientFactory, TaskInfoProvider taskInfoProvider, ObjectMapper objectMapper, Duration duration, long j) {
        this.serviceClientFactory = serviceClientFactory;
        this.taskInfoProvider = taskInfoProvider;
        this.jsonMapper = objectMapper;
        this.httpTimeout = duration;
        this.httpRetries = j;
        this.retryExec = Execs.scheduledSingleThreaded(StringUtils.format("%s-%s-%%d", new Object[]{getClass().getSimpleName(), StringUtils.encodeForFormat(str)}));
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> getCheckpointsAsync(String str, boolean z) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/checkpoints")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            TypeFactory typeFactory = this.jsonMapper.getTypeFactory();
            return (TreeMap) JacksonUtils.readValue(this.jsonMapper, bytesFullResponseHolder.getContent(), typeFactory.constructMapType(TreeMap.class, typeFactory.constructType(Integer.class), typeFactory.constructMapType(Map.class, getPartitionType(), getSequenceType())));
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(new TreeMap());
        }).retry(z).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Boolean> stopAsync(String str, boolean z) {
        return makeRequest(str, new RequestBuilder(HttpMethod.POST, "/stop" + (z ? "?publish=true" : ""))).onSuccess(r2 -> {
            return true;
        }).onHttpError(httpResponseException -> {
            log.warn("Task [%s] coundln't be stopped because of http request failure [%s].", new Object[]{str, httpResponseException.getMessage()});
            return Either.value(false);
        }).onNotAvailable(serviceNotAvailableException -> {
            log.warn("Task [%s] coundln't be stopped because it is not available.", new Object[]{str});
            return Either.value(false);
        }).onClosed(serviceClosedException -> {
            log.warn("Task [%s] couldn't be stopped because it is no longer running.", new Object[]{str});
            return Either.value(true);
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Boolean> resumeAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.POST, "/resume")).onSuccess(r2 -> {
            return true;
        }).onException(th -> {
            return Either.value(false);
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getCurrentOffsetsAsync(String str, boolean z) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/offsets/current")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            return deserializeOffsetsMap(bytesFullResponseHolder.getContent());
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(Collections.emptyMap());
        }).retry(z).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/offsets/end")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            return deserializeOffsetsMap(bytesFullResponseHolder.getContent());
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(Collections.emptyMap());
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Boolean> setEndOffsetsAsync(String str, Map<PartitionIdType, SequenceOffsetType> map, boolean z) {
        return makeRequest(str, new RequestBuilder(HttpMethod.POST, StringUtils.format("/offsets/end?finish=%s", new Object[]{Boolean.valueOf(z)})).jsonContent(this.jsonMapper, map)).handler(IgnoreHttpResponseHandler.INSTANCE).onSuccess(r2 -> {
            return true;
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/status")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            return (SeekableStreamIndexTaskRunner.Status) JacksonUtils.readValue(this.jsonMapper, bytesFullResponseHolder.getContent(), SeekableStreamIndexTaskRunner.Status.class);
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(SeekableStreamIndexTaskRunner.Status.NOT_STARTED);
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<DateTime> getStartTimeAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/time/start")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            if (isNullOrEmpty(bytesFullResponseHolder.getContent())) {
                return null;
            }
            return (DateTime) JacksonUtils.readValue(this.jsonMapper, bytesFullResponseHolder.getContent(), DateTime.class);
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value((Object) null);
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(String str) {
        return FutureUtils.transformAsync(makeRequest(str, new RequestBuilder(HttpMethod.POST, "/pause")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            if (bytesFullResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                log.info("Task [%s] paused successfully", new Object[]{str});
                return deserializeOffsetsMap(bytesFullResponseHolder.getContent());
            }
            if (bytesFullResponseHolder.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
                return null;
            }
            throw new ISE("Pause request for task [%s] failed with response [%s]", new Object[]{str, bytesFullResponseHolder.getStatus()});
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(Collections.emptyMap());
        }).go(), map -> {
            return map != null ? Futures.immediateFuture(map) : getOffsetsWhenPaused(str, IndexTaskClient.makeRetryPolicyFactory(this.httpRetries).makeRetryPolicy());
        });
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/rowStats")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            if (!isNullOrEmpty(bytesFullResponseHolder.getContent())) {
                return (Map) JacksonUtils.readValue(this.jsonMapper, bytesFullResponseHolder.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
            }
            log.warn("Got empty response when calling getMovingAverages, id[%s]", new Object[]{str});
            return null;
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(Collections.emptyMap());
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public ListenableFuture<List<ParseExceptionReport>> getParseErrorsAsync(String str) {
        return makeRequest(str, new RequestBuilder(HttpMethod.GET, "/unparseableEvents")).handler(new BytesFullResponseHandler()).onSuccess(bytesFullResponseHolder -> {
            if (!isNullOrEmpty(bytesFullResponseHolder.getContent())) {
                return (List) JacksonUtils.readValue(this.jsonMapper, bytesFullResponseHolder.getContent(), TYPE_REFERENCE_LIST_PARSE_EXCEPTION_REPORT);
            }
            log.warn("Got empty response when calling getParseErrors, id[%s]", new Object[]{str});
            return null;
        }).onNotAvailable(serviceNotAvailableException -> {
            return Either.value(Collections.emptyList());
        }).go();
    }

    @Override // org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
    public void close() {
        this.retryExec.shutdownNow();
    }

    private SeekableStreamIndexTaskClientAsyncImpl<PartitionIdType, SequenceOffsetType>.SeekableStreamRequestBuilder<Void, Void, Void> makeRequest(String str, RequestBuilder requestBuilder) {
        return new SeekableStreamRequestBuilder<>(str, requestBuilder, IgnoreHttpResponseHandler.INSTANCE, Function.identity());
    }

    private Map<PartitionIdType, SequenceOffsetType> deserializeOffsetsMap(byte[] bArr) {
        return (Map) JacksonUtils.readValue(this.jsonMapper, bArr, this.jsonMapper.getTypeFactory().constructMapType(Map.class, getPartitionType(), getSequenceType()));
    }

    private ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getOffsetsWhenPaused(String str, RetryPolicy retryPolicy) {
        return FutureUtils.transformAsync(getStatusAsync(str), status -> {
            Duration andIncrementRetryDelay;
            if (status == SeekableStreamIndexTaskRunner.Status.PAUSED) {
                return getCurrentOffsetsAsync(str, true);
            }
            synchronized (retryPolicy) {
                andIncrementRetryDelay = retryPolicy.getAndIncrementRetryDelay();
            }
            if (andIncrementRetryDelay == null) {
                return Futures.immediateFailedFuture(new ISE("Task [%s] failed to change its status from [%s] to [%s], aborting", new Object[]{str, status, SeekableStreamIndexTaskRunner.Status.PAUSED}));
            }
            long millis = andIncrementRetryDelay.getMillis();
            SettableFuture create = SettableFuture.create();
            this.retryExec.schedule(() -> {
                Futures.addCallback(getOffsetsWhenPaused(str, retryPolicy), new FutureCallback<Map<PartitionIdType, SequenceOffsetType>>() { // from class: org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientAsyncImpl.1
                    public void onSuccess(@Nullable Map<PartitionIdType, SequenceOffsetType> map) {
                        create.set(map);
                    }

                    public void onFailure(Throwable th) {
                        create.setException(th);
                    }
                });
            }, millis, TimeUnit.MILLISECONDS);
            return create;
        });
    }

    private static boolean isNullOrEmpty(@Nullable byte[] bArr) {
        return bArr == null || bArr.length == 0;
    }
}
