package org.apache.atlas.notification;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.consumer.ConsumerTimeoutException;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.LocalAtlasClient;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.service.Service;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
/* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer.class */
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
    private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
    private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
    public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
    private final LocalAtlasClient atlasClient;
    private NotificationInterface notificationInterface;
    private ExecutorService executors;
    private List<HookConsumer> consumers;
    private Configuration applicationProperties = ApplicationProperties.get();
    private final int maxRetries = this.applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
    private final int failedMsgCacheSize = this.applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 20);
    private final int consumerRetryInterval = this.applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.atlas.notification.NotificationHookConsumer$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType = new int[HookNotification.HookNotificationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_FULL_UPDATE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$HookConsumer.class */
    public class HookConsumer implements Runnable {
        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
        private List<HookNotification.HookNotificationMessage> failedMessages = new ArrayList();

        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> notificationConsumer) {
            this.consumer = notificationConsumer;
        }

        private boolean hasNext() {
            try {
                return this.consumer.hasNext();
            } catch (ConsumerTimeoutException e) {
                return false;
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            this.shouldRun.set(true);
            if (serverAvailable(new Timer())) {
                while (this.shouldRun.get()) {
                    try {
                        if (hasNext()) {
                            handleMessage((HookNotification.HookNotificationMessage) this.consumer.next());
                        }
                    } catch (Throwable th) {
                        NotificationHookConsumer.LOG.warn("Failure in NotificationHookConsumer", th);
                    }
                }
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:10:0x016b, code lost:
        
            commit();
         */
        /* JADX WARN: Code restructure failed: missing block: B:11:0x016f, code lost:
        
            return;
         */
        /* JADX WARN: Failed to find 'out' block for switch in B:7:0x0034. Please report as an issue. */
        @com.google.common.annotations.VisibleForTesting
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        void handleMessage(org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage r7) throws org.apache.atlas.AtlasServiceException, org.apache.atlas.AtlasException {
            /*
                Method dump skipped, instructions count: 368
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.atlas.notification.NotificationHookConsumer.HookConsumer.handleMessage(org.apache.atlas.notification.hook.HookNotification$HookNotificationMessage):void");
        }

        private void recordFailedMessages() {
            Iterator<HookNotification.HookNotificationMessage> it = this.failedMessages.iterator();
            while (it.hasNext()) {
                NotificationHookConsumer.FAILED_LOG.error("[DROPPED_NOTIFICATION] " + AbstractNotification.getMessageJson(it.next()));
            }
            this.failedMessages.clear();
        }

        private void commit() {
            recordFailedMessages();
            this.consumer.commit();
        }

        boolean serverAvailable(Timer timer) {
            while (!NotificationHookConsumer.this.atlasClient.isServerReady()) {
                try {
                    try {
                        NotificationHookConsumer.LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", Integer.valueOf(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS));
                        timer.sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
                    } catch (InterruptedException e) {
                        NotificationHookConsumer.LOG.info("Interrupted while waiting for Atlas Server to become ready, exiting consumer thread.", e);
                        return false;
                    }
                } catch (Throwable th) {
                    NotificationHookConsumer.LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", th);
                    return false;
                }
            }
            NotificationHookConsumer.LOG.info("Atlas Server is ready, can start reading Kafka events.");
            return true;
        }

        public void stop() {
            this.shouldRun.set(false);
            this.consumer.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$Timer.class */
    public static class Timer {
        Timer() {
        }

        public void sleep(int i) throws InterruptedException {
            Thread.sleep(i);
        }
    }

    @Inject
    public NotificationHookConsumer(NotificationInterface notificationInterface, LocalAtlasClient localAtlasClient) throws AtlasException {
        this.notificationInterface = notificationInterface;
        this.atlasClient = localAtlasClient;
    }

    public void start() throws AtlasException {
        startInternal(this.applicationProperties, null);
    }

    void startInternal(Configuration configuration, ExecutorService executorService) {
        if (this.consumers == null) {
            this.consumers = new ArrayList();
        }
        if (executorService != null) {
            this.executors = executorService;
        }
        if (HAConfiguration.isHAEnabled(configuration)) {
            return;
        }
        LOG.info("HA is disabled, starting consumers inline.");
        startConsumers(executorService);
    }

    private void startConsumers(ExecutorService executorService) {
        List createConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, this.applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1));
        if (executorService == null) {
            executorService = Executors.newFixedThreadPool(createConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
        }
        this.executors = executorService;
        Iterator it = createConsumers.iterator();
        while (it.hasNext()) {
            HookConsumer hookConsumer = new HookConsumer((NotificationConsumer) it.next());
            this.consumers.add(hookConsumer);
            this.executors.submit(hookConsumer);
        }
    }

    public void stop() {
        try {
            stopConsumerThreads();
            if (this.executors != null) {
                this.executors.shutdown();
                if (!this.executors.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
                this.executors = null;
            }
            this.notificationInterface.close();
        } catch (InterruptedException e) {
            LOG.error("Failure in shutting down consumers");
        }
    }

    private void stopConsumerThreads() {
        if (this.consumers != null) {
            Iterator<HookConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().stop();
            }
            this.consumers.clear();
        }
    }

    public void instanceIsActive() {
        LOG.info("Reacting to active state: initializing Kafka consumers");
        startConsumers(this.executors);
    }

    public void instanceIsPassive() {
        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
        stop();
    }
}
