package org.apache.gobblin.yarn;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.helix.HelixAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/yarn/HelixInstancePurgerWithMetrics.class */
public class HelixInstancePurgerWithMetrics {
    private static final Logger log = LoggerFactory.getLogger(HelixInstancePurgerWithMetrics.class);
    private final EventSubmitter eventSubmitter;
    private final long pollingRateMs;
    private static final String PREFIX = "HelixOfflineInstancePurge.";
    public static final String PURGE_FAILURE_EVENT = "HelixOfflineInstancePurge.Failure";
    public static final String PURGE_LAGGING_EVENT = "HelixOfflineInstancePurge.Lagging";
    public static final String PURGE_COMPLETED_EVENT = "HelixOfflineInstancePurge.Completed";

    public void purgeAllOfflineInstances(HelixAdmin helixAdmin, String str, long j, Map<String, String> map) {
        log.info("Finished purging offline helix instances. It took timeToPurgeMs={}", Long.valueOf(waitForPurgeCompletion(CompletableFuture.supplyAsync(() -> {
            helixAdmin.purgeOfflineInstances(str, 0L);
            return null;
        }), j, Stopwatch.createUnstarted(), map)));
    }

    @VisibleForTesting
    long waitForPurgeCompletion(CompletableFuture<Void> completableFuture, long j, Stopwatch stopwatch, Map<String, String> map) {
        stopwatch.start();
        boolean z = false;
        while (!completableFuture.isDone()) {
            try {
                long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
                log.info("Waiting for helix to purge offline instances. Cannot proceed with execution because purging is a non-thread safe call. To disable purging offline instances during startup, change the flag {} elapsedTimeMs={}, laggingThresholdMs={}", new Object[]{GobblinYarnConfigurationKeys.HELIX_PURGE_OFFLINE_INSTANCES_ENABLED, Long.valueOf(elapsed), Long.valueOf(j)});
                if (!z && elapsed > j) {
                    submitLaggingEvent(elapsed, j, map);
                    z = true;
                }
                Thread.sleep(this.pollingRateMs);
            } catch (InterruptedException | ExecutionException e) {
                log.warn("The call to purge offline helix instances failed. This is not a fatal error because it is not mandatory to clean up old helix instances. But repeated failure to purge offline helix instances will cause an accumulationof offline helix instances which may cause large delays in future helix calls.", e);
                long elapsed2 = stopwatch.elapsed(TimeUnit.MILLISECONDS);
                submitFailureEvent(elapsed2, map);
                return elapsed2;
            }
        }
        long elapsed3 = stopwatch.elapsed(TimeUnit.MILLISECONDS);
        if (!z && elapsed3 > j) {
            submitLaggingEvent(elapsed3, j, map);
        }
        completableFuture.get();
        submitCompletedEvent(elapsed3, map);
        return elapsed3;
    }

    private void submitFailureEvent(long j, Map<String, String> map) {
        if (this.eventSubmitter == null) {
            log.warn("Cannot submit {} GTE because eventSubmitter is null", PURGE_FAILURE_EVENT);
            return;
        }
        GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(PURGE_FAILURE_EVENT);
        gobblinEventBuilder.addAdditionalMetadata(map);
        gobblinEventBuilder.addMetadata("elapsedTimeMs", String.valueOf(j));
        log.warn("Submitting GTE because purging offline instances has failed to complete. event={}", gobblinEventBuilder);
        this.eventSubmitter.submit(gobblinEventBuilder);
    }

    private void submitCompletedEvent(long j, Map<String, String> map) {
        if (this.eventSubmitter == null) {
            log.warn("Cannot submit {} GTE because eventSubmitter is null", PURGE_COMPLETED_EVENT);
            return;
        }
        GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(PURGE_COMPLETED_EVENT);
        gobblinEventBuilder.addAdditionalMetadata(map);
        gobblinEventBuilder.addMetadata("timeToPurgeMs", String.valueOf(j));
        log.info("Submitting GTE because purging offline instances has completed successfully. event={}", gobblinEventBuilder);
        this.eventSubmitter.submit(gobblinEventBuilder);
    }

    private void submitLaggingEvent(long j, long j2, Map<String, String> map) {
        if (this.eventSubmitter == null) {
            log.warn("Cannot submit {} GTE because eventSubmitter is null", PURGE_LAGGING_EVENT);
            return;
        }
        GobblinEventBuilder gobblinEventBuilder = new GobblinEventBuilder(PURGE_LAGGING_EVENT);
        gobblinEventBuilder.addAdditionalMetadata(map);
        gobblinEventBuilder.addMetadata("elapsedTimeMs", String.valueOf(j));
        gobblinEventBuilder.addMetadata("laggingThresholdMs", String.valueOf(j2));
        log.info("Submitting GTE because purging offline instances is lagging and has exceeded lagging threshold. event={}", gobblinEventBuilder);
        this.eventSubmitter.submit(gobblinEventBuilder);
    }

    public HelixInstancePurgerWithMetrics(EventSubmitter eventSubmitter, long j) {
        this.eventSubmitter = eventSubmitter;
        this.pollingRateMs = j;
    }
}
