package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.druid.indexing.common.IndexTaskClient;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.query.DruidMetrics;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.class */
public abstract class SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> extends IndexTaskClient {
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamIndexTaskClient.class);

    public SeekableStreamIndexTaskClient(HttpClient httpClient, ObjectMapper objectMapper, TaskInfoProvider taskInfoProvider, String str, int i, Duration duration, long j) {
        super(httpClient, objectMapper, taskInfoProvider, duration, str, i, j);
    }

    public boolean stop(String str, boolean z) {
        log.debug("Stop task[%s] publish[%s]", str, Boolean.valueOf(z));
        try {
            return isSuccess(submitRequestWithEmptyContent(str, HttpMethod.POST, "stop", z ? "publish=true" : null, true));
        } catch (IndexTaskClient.NoTaskLocationException e) {
            return false;
        } catch (IndexTaskClient.TaskNotRunnableException e2) {
            log.info("Task [%s] couldn't be stopped because it is no longer running", str);
            return true;
        } catch (Exception e3) {
            log.warn(e3, "Exception while stopping task [%s]", str);
            return false;
        }
    }

    public boolean resume(String str) {
        log.debug("Resume task[%s]", str);
        try {
            return isSuccess(submitRequestWithEmptyContent(str, HttpMethod.POST, "resume", null, true));
        } catch (IOException | IndexTaskClient.NoTaskLocationException e) {
            log.warn(e, "Exception while stopping task [%s]", str);
            return false;
        }
    }

    public Map<PartitionIdType, SequenceOffsetType> pause(String str) {
        log.debug("Pause task[%s]", str);
        try {
            try {
                StringFullResponseHolder submitRequestWithEmptyContent = submitRequestWithEmptyContent(str, HttpMethod.POST, "pause", null, true);
                HttpResponseStatus status = submitRequestWithEmptyContent.getStatus();
                String content = submitRequestWithEmptyContent.getContent();
                if (status.equals(HttpResponseStatus.OK)) {
                    log.info("Task [%s] paused successfully", str);
                    return (Map) deserializeMap(content, Map.class, getPartitionType(), getSequenceType());
                }
                if (!status.equals(HttpResponseStatus.ACCEPTED)) {
                    throw new ISE("Pause request for task [%s] failed with response [%s] : [%s]", str, status, content);
                }
                while (true) {
                    SeekableStreamIndexTaskRunner.Status status2 = getStatus(str);
                    if (status2 == SeekableStreamIndexTaskRunner.Status.PAUSED) {
                        return getCurrentOffsets(str, true);
                    }
                    Duration andIncrementRetryDelay = newRetryPolicy().getAndIncrementRetryDelay();
                    if (andIncrementRetryDelay == null) {
                        throw new ISE("Task [%s] failed to change its status from [%s] to [%s], aborting", str, status2, SeekableStreamIndexTaskRunner.Status.PAUSED);
                    }
                    long millis = andIncrementRetryDelay.getMillis();
                    log.info("Still waiting for task [%s] to change its status to [%s]; will try again in [%s]", str, SeekableStreamIndexTaskRunner.Status.PAUSED, new Duration(millis).toString());
                    Thread.sleep(millis);
                }
            } catch (IOException | InterruptedException e) {
                throw new RE(e, "Exception [%s] while pausing Task [%s]", e.getMessage(), str);
            }
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            log.error("Exception [%s] while pausing Task [%s]", e2.getMessage(), str);
            return ImmutableMap.of();
        }
    }

    public SeekableStreamIndexTaskRunner.Status getStatus(String str) {
        log.debug("GetStatus task[%s]", str);
        try {
            return (SeekableStreamIndexTaskRunner.Status) deserialize(submitRequestWithEmptyContent(str, HttpMethod.GET, DruidMetrics.STATUS, null, true).getContent(), SeekableStreamIndexTaskRunner.Status.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return SeekableStreamIndexTaskRunner.Status.NOT_STARTED;
        }
    }

    @Nullable
    public DateTime getStartTime(String str) {
        log.debug("GetStartTime task[%s]", str);
        try {
            StringFullResponseHolder submitRequestWithEmptyContent = submitRequestWithEmptyContent(str, HttpMethod.GET, "time/start", null, true);
            if (submitRequestWithEmptyContent.getContent() == null || submitRequestWithEmptyContent.getContent().isEmpty()) {
                return null;
            }
            return (DateTime) deserialize(submitRequestWithEmptyContent.getContent(), DateTime.class);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return null;
        }
    }

    public Map<String, Object> getMovingAverages(String str) {
        log.debug("GetMovingAverages task[%s]", str);
        try {
            StringFullResponseHolder submitRequestWithEmptyContent = submitRequestWithEmptyContent(str, HttpMethod.GET, "rowStats", null, true);
            return (submitRequestWithEmptyContent.getContent() == null || submitRequestWithEmptyContent.getContent().isEmpty()) ? Collections.emptyMap() : (Map) deserialize(submitRequestWithEmptyContent.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return Collections.emptyMap();
        }
    }

    public Map<PartitionIdType, SequenceOffsetType> getCurrentOffsets(String str, boolean z) {
        log.debug("GetCurrentOffsets task[%s] retry[%s]", str, Boolean.valueOf(z));
        try {
            return (Map) deserializeMap(submitRequestWithEmptyContent(str, HttpMethod.GET, "offsets/current", null, z).getContent(), Map.class, getPartitionType(), getSequenceType());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return ImmutableMap.of();
        }
    }

    public TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> getCheckpoints(String str, boolean z) {
        log.debug("GetCheckpoints task[%s] retry[%s]", str, Boolean.valueOf(z));
        try {
            return (TreeMap) deserializeNestedValueMap(submitRequestWithEmptyContent(str, HttpMethod.GET, SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, null, z).getContent(), TreeMap.class, Integer.class, Map.class, getPartitionType(), getSequenceType());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return new TreeMap<>();
        }
    }

    public ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> getCheckpointsAsync(String str, boolean z) {
        return (ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>>) doAsync(() -> {
            return getCheckpoints(str, z);
        });
    }

    public Map<PartitionIdType, SequenceOffsetType> getEndOffsets(String str) {
        log.debug("GetEndOffsets task[%s]", str);
        try {
            return (Map) deserializeMap(submitRequestWithEmptyContent(str, HttpMethod.GET, "offsets/end", null, true).getContent(), Map.class, getPartitionType(), getSequenceType());
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (IndexTaskClient.NoTaskLocationException e2) {
            return ImmutableMap.of();
        }
    }

    public boolean setEndOffsets(String str, Map<PartitionIdType, SequenceOffsetType> map, boolean z) throws IOException {
        log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", str, map, Boolean.valueOf(z));
        try {
            return isSuccess(submitJsonRequest(str, HttpMethod.POST, "offsets/end", StringUtils.format("finish=%s", Boolean.valueOf(z)), serialize(map), true));
        } catch (IndexTaskClient.NoTaskLocationException e) {
            return false;
        }
    }

    public ListenableFuture<Boolean> stopAsync(String str, boolean z) {
        return doAsync(() -> {
            return Boolean.valueOf(stop(str, z));
        });
    }

    public ListenableFuture<Boolean> resumeAsync(String str) {
        return doAsync(() -> {
            return Boolean.valueOf(resume(str));
        });
    }

    public ListenableFuture<DateTime> getStartTimeAsync(String str) {
        return doAsync(() -> {
            return getStartTime(str);
        });
    }

    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(String str) {
        return (ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>) doAsync(() -> {
            return pause(str);
        });
    }

    public ListenableFuture<Boolean> setEndOffsetsAsync(String str, Map<PartitionIdType, SequenceOffsetType> map, boolean z) {
        return doAsync(() -> {
            return Boolean.valueOf(setEndOffsets(str, map, z));
        });
    }

    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getCurrentOffsetsAsync(String str, boolean z) {
        return (ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>) doAsync(() -> {
            return getCurrentOffsets(str, z);
        });
    }

    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> getEndOffsetsAsync(String str) {
        return (ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>) doAsync(() -> {
            return getEndOffsets(str);
        });
    }

    public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String str) {
        return doAsync(() -> {
            return getMovingAverages(str);
        });
    }

    public ListenableFuture<SeekableStreamIndexTaskRunner.Status> getStatusAsync(String str) {
        return doAsync(() -> {
            return getStatus(str);
        });
    }

    protected abstract Class<PartitionIdType> getPartitionType();

    protected abstract Class<SequenceOffsetType> getSequenceType();
}
