package org.apache.atlas.notification;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.inject.Inject;
import kafka.utils.ShutdownableThread;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasClient;
import org.apache.atlas.AtlasClientV2;
import org.apache.atlas.AtlasException;
import org.apache.atlas.AtlasServiceException;
import org.apache.atlas.RequestContext;
import org.apache.atlas.RequestContextV1;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.kafka.AtlasKafkaMessage;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.notification.NotificationInterface;
import org.apache.atlas.notification.hook.HookNotification;
import org.apache.atlas.repository.converters.AtlasInstanceConverter;
import org.apache.atlas.repository.store.graph.AtlasEntityStore;
import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
import org.apache.atlas.repository.store.graph.v1.AtlasGraphUtilsV1;
import org.apache.atlas.service.Service;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.atlas.utils.AtlasPerfTracer;
import org.apache.atlas.web.filters.AuditFilter;
import org.apache.atlas.web.service.ServiceState;
import org.apache.atlas.web.util.DateTimeHelper;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

@Component
@Order(4)
/* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer.class */
public class NotificationHookConsumer implements Service, ActiveStateChangeHandler {
    private static final String LOCALHOST = "localhost";
    private static final String TYPE_HIVE_COLUMN_LINEAGE = "hive_column_lineage";
    private static final String ATTRIBUTE_INPUTS = "inputs";
    public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
    public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
    public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
    public static final String CONSUMER_MIN_RETRY_INTERVAL = "atlas.notification.consumer.min.retry.interval";
    public static final String CONSUMER_MAX_RETRY_INTERVAL = "atlas.notification.consumer.max.retry.interval";
    public static final int SERVER_READY_WAIT_TIME_MS = 1000;
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633 = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633";
    public static final String CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD = "atlas.notification.consumer.skip.hive_column_lineage.hive-20633.inputs.threshold";
    private final AtlasEntityStore atlasEntityStore;
    private final ServiceState serviceState;
    private final AtlasInstanceConverter instanceConverter;
    private final AtlasTypeRegistry typeRegistry;
    private NotificationInterface notificationInterface;
    private ExecutorService executors;

    @VisibleForTesting
    List<HookConsumer> consumers;
    private static final Logger LOG = LoggerFactory.getLogger(NotificationHookConsumer.class);
    private static final Logger PERF_LOG = AtlasPerfTracer.getPerfLogger(NotificationHookConsumer.class);
    private static Logger FAILED_LOG = LoggerFactory.getLogger("FAILED");
    private static final String THREADNAME_PREFIX = NotificationHookConsumer.class.getSimpleName();
    private Configuration applicationProperties = ApplicationProperties.get();
    private final int maxRetries = this.applicationProperties.getInt(CONSUMER_RETRIES_PROPERTY, 3);
    private final int failedMsgCacheSize = this.applicationProperties.getInt(CONSUMER_FAILEDCACHESIZE_PROPERTY, 1);

    @VisibleForTesting
    final int consumerRetryInterval = this.applicationProperties.getInt(CONSUMER_RETRY_INTERVAL, 500);
    private final int minWaitDuration = this.applicationProperties.getInt(CONSUMER_MIN_RETRY_INTERVAL, this.consumerRetryInterval);
    private final int maxWaitDuration = this.applicationProperties.getInt(CONSUMER_MAX_RETRY_INTERVAL, this.minWaitDuration * 60);
    private final boolean skipHiveColumnLineageHive20633 = this.applicationProperties.getBoolean(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, true);
    private final int skipHiveColumnLineageHive20633InputsThreshold = this.applicationProperties.getInt(CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, 15);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.atlas.notification.NotificationHookConsumer$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType = new int[HookNotification.HookNotificationType.values().length];

        static {
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_CREATE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_DELETE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_FULL_UPDATE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_CREATE_V2.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_PARTIAL_UPDATE_V2.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_FULL_UPDATE_V2.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[HookNotification.HookNotificationType.ENTITY_DELETE_V2.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
        }
    }

    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$AdaptiveWaiter.class */
    static class AdaptiveWaiter {
        private final long increment;
        private final long maxDuration;
        private final long minDuration;
        private final long resetInterval;
        private long lastWaitAt = 0;

        @VisibleForTesting
        long waitDuration;

        public AdaptiveWaiter(long j, long j2, long j3) {
            this.minDuration = j;
            this.maxDuration = j2;
            this.increment = j3;
            this.waitDuration = j;
            this.resetInterval = j2 * 2;
        }

        public void pause(Exception exc) {
            setWaitDurations();
            try {
                if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                    NotificationHookConsumer.LOG.debug("{} in NotificationHookConsumer. Waiting for {} ms for recovery.", new Object[]{exc.getClass().getName(), Long.valueOf(this.waitDuration), exc});
                }
                Thread.sleep(this.waitDuration);
            } catch (InterruptedException e) {
                if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                    NotificationHookConsumer.LOG.debug("{} in NotificationHookConsumer. Waiting for recovery interrupted.", exc.getClass().getName(), e);
                }
            }
        }

        private void setWaitDurations() {
            long currentTimeMillis = this.lastWaitAt == 0 ? 0L : System.currentTimeMillis() - this.lastWaitAt;
            this.lastWaitAt = System.currentTimeMillis();
            if (currentTimeMillis > this.resetInterval) {
                this.waitDuration = this.minDuration;
                return;
            }
            this.waitDuration += this.increment;
            if (this.waitDuration > this.maxDuration) {
                this.waitDuration = this.maxDuration;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$FailedCommitOffsetRecorder.class */
    public static class FailedCommitOffsetRecorder {
        private Long currentOffset;

        FailedCommitOffsetRecorder() {
        }

        public void recordIfFailed(boolean z, long j) {
            if (z) {
                this.currentOffset = null;
            } else {
                this.currentOffset = Long.valueOf(j);
            }
        }

        public boolean isMessageReplayed(long j) {
            return this.currentOffset != null && this.currentOffset.longValue() == j;
        }

        public Long getCurrentOffset() {
            return this.currentOffset;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$HookConsumer.class */
    public class HookConsumer extends ShutdownableThread {
        private final NotificationConsumer<HookNotification.HookNotificationMessage> consumer;
        private final AtomicBoolean shouldRun;
        private final List<String> failedMessages;
        private final AdaptiveWaiter adaptiveWaiter;

        @VisibleForTesting
        final FailedCommitOffsetRecorder failedCommitOffsetRecorder;

        public HookConsumer(NotificationConsumer<HookNotification.HookNotificationMessage> notificationConsumer) {
            super("atlas-hook-consumer-thread", false);
            this.shouldRun = new AtomicBoolean(false);
            this.failedMessages = new ArrayList();
            this.adaptiveWaiter = new AdaptiveWaiter(NotificationHookConsumer.this.minWaitDuration, NotificationHookConsumer.this.maxWaitDuration, NotificationHookConsumer.this.minWaitDuration);
            this.consumer = notificationConsumer;
            this.failedCommitOffsetRecorder = new FailedCommitOffsetRecorder();
        }

        public void doWork() {
            NotificationHookConsumer.LOG.info("==> HookConsumer doWork()");
            this.shouldRun.set(true);
            if (serverAvailable(new Timer())) {
                while (this.shouldRun.get()) {
                    try {
                        try {
                            try {
                                Iterator it = this.consumer.receive().iterator();
                                while (it.hasNext()) {
                                    handleMessage((AtlasKafkaMessage) it.next());
                                }
                            } catch (Exception e) {
                                if (!this.shouldRun.get()) {
                                    break;
                                }
                                NotificationHookConsumer.LOG.warn("Exception in NotificationHookConsumer", e);
                                this.adaptiveWaiter.pause(e);
                            }
                        } catch (IllegalStateException e2) {
                            this.adaptiveWaiter.pause(e2);
                        }
                    } finally {
                        if (this.consumer != null) {
                            NotificationHookConsumer.LOG.info("closing NotificationConsumer");
                            this.consumer.close();
                        }
                        NotificationHookConsumer.LOG.info("<== HookConsumer doWork()");
                    }
                }
            }
        }

        @VisibleForTesting
        void handleMessage(AtlasKafkaMessage<HookNotification.HookNotificationMessage> atlasKafkaMessage) throws AtlasServiceException, AtlasException {
            HookNotification.EntityCreateRequest entityCreateRequest = (HookNotification.HookNotificationMessage) atlasKafkaMessage.getMessage();
            String user = entityCreateRequest.getUser();
            AtlasPerfTracer perfTracer = AtlasPerfTracer.isPerfTraceEnabled(NotificationHookConsumer.PERF_LOG) ? AtlasPerfTracer.getPerfTracer(NotificationHookConsumer.PERF_LOG, entityCreateRequest.getType().name()) : null;
            try {
                if (this.failedCommitOffsetRecorder.isMessageReplayed(atlasKafkaMessage.getOffset())) {
                    commit(atlasKafkaMessage);
                    AtlasPerfTracer.log(perfTracer);
                    return;
                }
                NotificationHookConsumer.this.preProcessNotificationMessage(atlasKafkaMessage);
                int i = 0;
                while (true) {
                    if (i >= NotificationHookConsumer.this.maxRetries) {
                        break;
                    }
                    if (NotificationHookConsumer.LOG.isDebugEnabled()) {
                        NotificationHookConsumer.LOG.debug("handleMessage({}): attempt {}", entityCreateRequest.getType().name(), Integer.valueOf(i));
                    }
                    try {
                        try {
                            RequestContext createContext = RequestContext.createContext();
                            createContext.setAttemptCount(i + 1);
                            createContext.setMaxAttempts(NotificationHookConsumer.this.maxRetries);
                            createContext.setUser(user);
                            switch (AnonymousClass1.$SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[entityCreateRequest.getType().ordinal()]) {
                                case 1:
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities = NotificationHookConsumer.this.instanceConverter.toAtlasEntities(entityCreateRequest.getEntities());
                                    if (i == 0) {
                                        AtlasClient.API_V1 api_v1 = AtlasClient.API_V1.CREATE_ENTITY;
                                        NotificationHookConsumer.this.audit(user, api_v1.getMethod(), api_v1.getNormalizedPath());
                                    }
                                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(atlasEntities), false);
                                    break;
                                case 2:
                                    final HookNotification.EntityPartialUpdateRequest entityPartialUpdateRequest = (HookNotification.EntityPartialUpdateRequest) entityCreateRequest;
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntity = NotificationHookConsumer.this.instanceConverter.toAtlasEntity(entityPartialUpdateRequest.getEntity());
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v2 = AtlasClientV2.API_V2.UPDATE_ENTITY_BY_ATTRIBUTE;
                                        NotificationHookConsumer.this.audit(user, api_v2.getMethod(), String.format(api_v2.getNormalizedPath(), entityPartialUpdateRequest.getTypeName()));
                                    }
                                    ((AtlasEntity) atlasEntity.getEntities().get(0)).setGuid(AtlasGraphUtilsV1.getGuidByUniqueAttributes(NotificationHookConsumer.this.typeRegistry.getEntityTypeByName(entityPartialUpdateRequest.getTypeName()), new HashMap<String, Object>() { // from class: org.apache.atlas.notification.NotificationHookConsumer.HookConsumer.1
                                        {
                                            put(entityPartialUpdateRequest.getAttribute(), entityPartialUpdateRequest.getAttributeValue());
                                        }
                                    }));
                                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(atlasEntity), true);
                                    break;
                                case 3:
                                    final HookNotification.EntityDeleteRequest entityDeleteRequest = (HookNotification.EntityDeleteRequest) entityCreateRequest;
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v22 = AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
                                        NotificationHookConsumer.this.audit(user, api_v22.getMethod(), String.format(api_v22.getNormalizedPath(), entityDeleteRequest.getTypeName()));
                                    }
                                    try {
                                        NotificationHookConsumer.this.atlasEntityStore.deleteByUniqueAttributes(NotificationHookConsumer.this.typeRegistry.getType(entityDeleteRequest.getTypeName()), new HashMap<String, Object>() { // from class: org.apache.atlas.notification.NotificationHookConsumer.HookConsumer.2
                                            {
                                                put(entityDeleteRequest.getAttribute(), entityDeleteRequest.getAttributeValue());
                                            }
                                        });
                                    } catch (ClassCastException e) {
                                        NotificationHookConsumer.LOG.error("Failed to delete entity {}", entityDeleteRequest);
                                    }
                                    break;
                                case 4:
                                    AtlasEntity.AtlasEntitiesWithExtInfo atlasEntities2 = NotificationHookConsumer.this.instanceConverter.toAtlasEntities(((HookNotification.EntityUpdateRequest) entityCreateRequest).getEntities());
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v23 = AtlasClientV2.API_V2.UPDATE_ENTITY;
                                        NotificationHookConsumer.this.audit(user, api_v23.getMethod(), api_v23.getNormalizedPath());
                                    }
                                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(atlasEntities2), false);
                                    break;
                                case 5:
                                    AtlasEntity.AtlasEntitiesWithExtInfo entities = ((HookNotification.EntityCreateRequestV2) entityCreateRequest).getEntities();
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v24 = AtlasClientV2.API_V2.CREATE_ENTITY;
                                        NotificationHookConsumer.this.audit(user, api_v24.getMethod(), api_v24.getNormalizedPath());
                                    }
                                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities), false);
                                    break;
                                case 6:
                                    HookNotification.EntityPartialUpdateRequestV2 entityPartialUpdateRequestV2 = (HookNotification.EntityPartialUpdateRequestV2) entityCreateRequest;
                                    AtlasObjectId entityId = entityPartialUpdateRequestV2.getEntityId();
                                    AtlasEntity.AtlasEntityWithExtInfo entity = entityPartialUpdateRequestV2.getEntity();
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v25 = AtlasClientV2.API_V2.UPDATE_ENTITY;
                                        NotificationHookConsumer.this.audit(user, api_v25.getMethod(), api_v25.getNormalizedPath());
                                    }
                                    NotificationHookConsumer.this.atlasEntityStore.updateEntity(entityId, entity, true);
                                    break;
                                case 7:
                                    AtlasEntity.AtlasEntitiesWithExtInfo entities2 = ((HookNotification.EntityUpdateRequestV2) entityCreateRequest).getEntities();
                                    if (i == 0) {
                                        AtlasClientV2.API_V2 api_v26 = AtlasClientV2.API_V2.UPDATE_ENTITY;
                                        NotificationHookConsumer.this.audit(user, api_v26.getMethod(), api_v26.getNormalizedPath());
                                    }
                                    NotificationHookConsumer.this.atlasEntityStore.createOrUpdate(new AtlasEntityStream(entities2), false);
                                    break;
                                case 8:
                                    List<AtlasObjectId> entities3 = ((HookNotification.EntityDeleteRequestV2) entityCreateRequest).getEntities();
                                    try {
                                        for (AtlasObjectId atlasObjectId : entities3) {
                                            if (i == 0) {
                                                AtlasClientV2.API_V2 api_v27 = AtlasClientV2.API_V2.DELETE_ENTITY_BY_ATTRIBUTE;
                                                NotificationHookConsumer.this.audit(user, api_v27.getMethod(), String.format(api_v27.getNormalizedPath(), atlasObjectId.getTypeName()));
                                            }
                                            NotificationHookConsumer.this.atlasEntityStore.deleteByUniqueAttributes(NotificationHookConsumer.this.typeRegistry.getType(atlasObjectId.getTypeName()), atlasObjectId.getUniqueAttributes());
                                        }
                                    } catch (ClassCastException e2) {
                                        NotificationHookConsumer.LOG.error("Failed to do a delete entities {}", entities3);
                                    }
                                    break;
                                default:
                                    throw new IllegalStateException("Unknown notification type: " + entityCreateRequest.getType().name());
                            }
                            RequestContext.clear();
                            RequestContextV1.clear();
                        } catch (Throwable th) {
                            RequestContextV1.get().resetEntityGuidUpdates();
                            if (i == NotificationHookConsumer.this.maxRetries - 1) {
                                String messageJson = AbstractNotification.getMessageJson(entityCreateRequest);
                                NotificationHookConsumer.LOG.warn("Max retries exceeded for message {}", messageJson, th);
                                this.failedMessages.add(messageJson);
                                if (this.failedMessages.size() >= NotificationHookConsumer.this.failedMsgCacheSize) {
                                    recordFailedMessages();
                                }
                                RequestContext.clear();
                                RequestContextV1.clear();
                                AtlasPerfTracer.log(perfTracer);
                                return;
                            }
                            NotificationHookConsumer.LOG.warn("Error handling message", th);
                            try {
                                NotificationHookConsumer.LOG.info("Sleeping for {} ms before retry", Integer.valueOf(NotificationHookConsumer.this.consumerRetryInterval));
                                Thread.sleep(NotificationHookConsumer.this.consumerRetryInterval);
                            } catch (InterruptedException e3) {
                                NotificationHookConsumer.LOG.error("Notification consumer thread sleep interrupted");
                            }
                            RequestContext.clear();
                            RequestContextV1.clear();
                            i++;
                        }
                    } catch (Throwable th2) {
                        RequestContext.clear();
                        RequestContextV1.clear();
                        throw th2;
                    }
                }
                commit(atlasKafkaMessage);
                AtlasPerfTracer.log(perfTracer);
            } catch (Throwable th3) {
                AtlasPerfTracer.log(perfTracer);
                throw th3;
            }
        }

        private void recordFailedMessages() {
            Iterator<String> it = this.failedMessages.iterator();
            while (it.hasNext()) {
                NotificationHookConsumer.FAILED_LOG.error("[DROPPED_NOTIFICATION] {}", it.next());
            }
            this.failedMessages.clear();
        }

        private void commit(AtlasKafkaMessage<HookNotification.HookNotificationMessage> atlasKafkaMessage) {
            boolean z = false;
            try {
                recordFailedMessages();
                this.consumer.commit(new TopicPartition("ATLAS_HOOK", atlasKafkaMessage.getPartition()), atlasKafkaMessage.getOffset() + 1);
                z = true;
                this.failedCommitOffsetRecorder.recordIfFailed(true, atlasKafkaMessage.getOffset());
            } catch (Throwable th) {
                this.failedCommitOffsetRecorder.recordIfFailed(z, atlasKafkaMessage.getOffset());
                throw th;
            }
        }

        boolean serverAvailable(Timer timer) {
            while (NotificationHookConsumer.this.serviceState.getState() != ServiceState.ServiceStateValue.ACTIVE) {
                try {
                    try {
                        NotificationHookConsumer.LOG.info("Atlas Server is not ready. Waiting for {} milliseconds to retry...", Integer.valueOf(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS));
                        timer.sleep(NotificationHookConsumer.SERVER_READY_WAIT_TIME_MS);
                    } catch (InterruptedException e) {
                        NotificationHookConsumer.LOG.info("Interrupted while waiting for Atlas Server to become ready, exiting consumer thread.", e);
                        return false;
                    }
                } catch (Throwable th) {
                    NotificationHookConsumer.LOG.info("Handled AtlasServiceException while waiting for Atlas Server to become ready, exiting consumer thread.", th);
                    return false;
                }
            }
            NotificationHookConsumer.LOG.info("Atlas Server is ready, can start reading Kafka events.");
            return true;
        }

        public void shutdown() {
            NotificationHookConsumer.LOG.info("==> HookConsumer shutdown()");
            if (this.shouldRun.get()) {
                super.initiateShutdown();
                this.shouldRun.set(false);
                if (this.consumer != null) {
                    this.consumer.wakeup();
                }
                super.awaitShutdown();
                NotificationHookConsumer.LOG.info("<== HookConsumer shutdown()");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/atlas/notification/NotificationHookConsumer$Timer.class */
    public static class Timer {
        Timer() {
        }

        public void sleep(int i) throws InterruptedException {
            Thread.sleep(i);
        }
    }

    @Inject
    public NotificationHookConsumer(NotificationInterface notificationInterface, AtlasEntityStore atlasEntityStore, ServiceState serviceState, AtlasInstanceConverter atlasInstanceConverter, AtlasTypeRegistry atlasTypeRegistry) throws AtlasException {
        this.notificationInterface = notificationInterface;
        this.atlasEntityStore = atlasEntityStore;
        this.serviceState = serviceState;
        this.instanceConverter = atlasInstanceConverter;
        this.typeRegistry = atlasTypeRegistry;
        LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633, Boolean.valueOf(this.skipHiveColumnLineageHive20633));
        LOG.info("{}={}", CONSUMER_SKIP_HIVE_COLUMN_LINEAGE_HIVE_20633_INPUTS_THRESHOLD, Integer.valueOf(this.skipHiveColumnLineageHive20633InputsThreshold));
    }

    public void start() throws AtlasException {
        startInternal(this.applicationProperties, null);
    }

    void startInternal(Configuration configuration, ExecutorService executorService) {
        if (this.consumers == null) {
            this.consumers = new ArrayList();
        }
        if (executorService != null) {
            this.executors = executorService;
        }
        if (HAConfiguration.isHAEnabled(configuration)) {
            return;
        }
        LOG.info("HA is disabled, starting consumers inline.");
        startConsumers(executorService);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void startConsumers(ExecutorService executorService) {
        List createConsumers = this.notificationInterface.createConsumers(NotificationInterface.NotificationType.HOOK, this.applicationProperties.getInt(CONSUMER_THREADS_PROPERTY, 1));
        if (executorService == null) {
            executorService = Executors.newFixedThreadPool(createConsumers.size(), new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d").build());
        }
        this.executors = executorService;
        Iterator it = createConsumers.iterator();
        while (it.hasNext()) {
            ShutdownableThread hookConsumer = new HookConsumer((NotificationConsumer) it.next());
            this.consumers.add(hookConsumer);
            this.executors.submit((Runnable) hookConsumer);
        }
    }

    public void stop() {
        try {
            stopConsumerThreads();
            if (this.executors != null) {
                this.executors.shutdown();
                if (!this.executors.awaitTermination(5000L, TimeUnit.MILLISECONDS)) {
                    LOG.error("Timed out waiting for consumer threads to shut down, exiting uncleanly");
                }
                this.executors = null;
            }
            this.notificationInterface.close();
        } catch (InterruptedException e) {
            LOG.error("Failure in shutting down consumers");
        }
    }

    private void stopConsumerThreads() {
        LOG.info("==> stopConsumerThreads()");
        if (this.consumers != null) {
            Iterator<HookConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            this.consumers.clear();
        }
        LOG.info("<== stopConsumerThreads()");
    }

    public void instanceIsActive() {
        LOG.info("Reacting to active state: initializing Kafka consumers");
        startConsumers(this.executors);
    }

    public void instanceIsPassive() {
        LOG.info("Reacting to passive state: shutting down Kafka consumers.");
        stop();
    }

    public int getHandlerOrder() {
        return ActiveStateChangeHandler.HandlerOrder.NOTIFICATION_HOOK_CONSUMER.getOrder();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void preProcessNotificationMessage(AtlasKafkaMessage<HookNotification.HookNotificationMessage> atlasKafkaMessage) {
        skipHiveColumnLineage(atlasKafkaMessage);
    }

    private void skipHiveColumnLineage(AtlasKafkaMessage<HookNotification.HookNotificationMessage> atlasKafkaMessage) {
        AtlasEntity.AtlasEntitiesWithExtInfo atlasEntitiesWithExtInfo;
        if (this.skipHiveColumnLineageHive20633) {
            HookNotification.EntityCreateRequestV2 entityCreateRequestV2 = (HookNotification.HookNotificationMessage) atlasKafkaMessage.getMessage();
            switch (AnonymousClass1.$SwitchMap$org$apache$atlas$notification$hook$HookNotification$HookNotificationType[entityCreateRequestV2.getType().ordinal()]) {
                case 5:
                    atlasEntitiesWithExtInfo = entityCreateRequestV2.getEntities();
                    break;
                case 7:
                    atlasEntitiesWithExtInfo = ((HookNotification.EntityUpdateRequestV2) entityCreateRequestV2).getEntities();
                    break;
                default:
                    atlasEntitiesWithExtInfo = null;
                    break;
            }
            if (atlasEntitiesWithExtInfo == null || atlasEntitiesWithExtInfo.getEntities() == null) {
                return;
            }
            int i = 0;
            int i2 = 0;
            ListIterator listIterator = atlasEntitiesWithExtInfo.getEntities().listIterator();
            while (listIterator.hasNext()) {
                AtlasEntity atlasEntity = (AtlasEntity) listIterator.next();
                if (StringUtils.equals(atlasEntity.getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
                    i++;
                    Object attribute = atlasEntity.getAttribute("inputs");
                    if (attribute instanceof Collection) {
                        i2 += ((Collection) attribute).size();
                    }
                }
            }
            float f = i > 0 ? i2 / i : 0.0f;
            if (f > this.skipHiveColumnLineageHive20633InputsThreshold) {
                int i3 = 0;
                ListIterator listIterator2 = atlasEntitiesWithExtInfo.getEntities().listIterator();
                while (listIterator2.hasNext()) {
                    if (StringUtils.equals(((AtlasEntity) listIterator2.next()).getTypeName(), TYPE_HIVE_COLUMN_LINEAGE)) {
                        listIterator2.remove();
                        i3++;
                    }
                }
                if (i3 > 0) {
                    LOG.warn("removed {} hive_column_lineage entities. Average # of inputs={}, threshold={}, total # of inputs={}. topic-offset={}, partition={}", new Object[]{Integer.valueOf(i3), Float.valueOf(f), Integer.valueOf(this.skipHiveColumnLineageHive20633InputsThreshold), Integer.valueOf(i2), Long.valueOf(atlasKafkaMessage.getOffset()), Integer.valueOf(atlasKafkaMessage.getPartition())});
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void audit(String str, String str2, String str3) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> audit({},{}, {})", new Object[]{str, str2, str3});
        }
        AuditFilter.audit(str, THREADNAME_PREFIX, str2, LOCALHOST, str3, LOCALHOST, DateTimeHelper.formatDateUTC(new Date()));
    }
}
