package co.cask.cdap.messaging.distributed;

import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.server.MessagingHttpService;
import co.cask.cdap.messaging.service.CoreMessagingService;
import co.cask.cdap.messaging.store.ForwardingTableFactory;
import co.cask.cdap.messaging.store.TableFactory;
import co.cask.cdap.messaging.store.cache.MessageTableCacheProvider;
import co.cask.cdap.messaging.store.hbase.HBaseTableFactory;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import com.google.inject.Injector;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicMarkableReference;
import javax.annotation.Nullable;
import org.apache.twill.api.ElectionHandler;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.zookeeper.LeaderElection;
import org.apache.twill.zookeeper.ZKClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/messaging/distributed/LeaderElectionMessagingService.class */
public class LeaderElectionMessagingService extends AbstractIdleService implements MessagingService {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionMessagingService.class);
    private final Injector injector;
    private final CConfiguration cConf;
    private final MessageTableCacheProvider cacheProvider;
    private final ZKClient zkClient;
    private final AtomicMarkableReference<DelegateService> delegate = new AtomicMarkableReference<>(null, false);
    private boolean tableUpgraded;
    private LeaderElection leaderElection;
    private ScheduledExecutorService delayExecutor;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/messaging/distributed/LeaderElectionMessagingService$DelegateService.class */
    public final class DelegateService extends AbstractIdleService {
        private final CoreMessagingService messagingService;
        private final MessagingHttpService httpService;

        DelegateService(CoreMessagingService coreMessagingService, MessagingHttpService messagingHttpService) {
            this.messagingService = coreMessagingService;
            this.httpService = messagingHttpService;
        }

        protected void startUp() throws Exception {
            this.messagingService.startAndWait();
            this.httpService.startAndWait();
        }

        protected void shutDown() throws Exception {
            try {
                this.httpService.stopAndWait();
                this.messagingService.stopAndWait();
                LeaderElectionMessagingService.this.cacheProvider.clear();
            } catch (Throwable th) {
                LeaderElectionMessagingService.this.cacheProvider.clear();
                throw th;
            }
        }

        MessagingService getMessagingService() {
            return this.messagingService;
        }
    }

    @Inject
    LeaderElectionMessagingService(Injector injector, CConfiguration cConfiguration, MessageTableCacheProvider messageTableCacheProvider, ZKClient zKClient) {
        this.injector = injector;
        this.cConf = cConfiguration;
        this.cacheProvider = messageTableCacheProvider;
        this.zkClient = zKClient;
    }

    protected void startUp() throws Exception {
        this.delayExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("fencing-delay"));
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.leaderElection = new LeaderElection(this.zkClient, "messaging.service", new ElectionHandler() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingService.1
            public void leader() {
                if (!LeaderElectionMessagingService.this.tableUpgraded) {
                    LeaderElectionMessagingService.this.upgradeTable();
                    LeaderElectionMessagingService.this.tableUpgraded = true;
                }
                DelegateService delegateService = new DelegateService((CoreMessagingService) LeaderElectionMessagingService.this.injector.getInstance(CoreMessagingService.class), (MessagingHttpService) LeaderElectionMessagingService.this.injector.getInstance(MessagingHttpService.class));
                LeaderElectionMessagingService.this.updateDelegate(delegateService);
                LeaderElectionMessagingService.LOG.info("Messaging service instance {} running at {} becomes leader", LeaderElectionMessagingService.this.cConf.get("messaging.container.instance.id"), LeaderElectionMessagingService.this.cConf.get("messaging.http.server.bind.address"));
                LeaderElectionMessagingService.this.fencingStart(delegateService);
                countDownLatch.countDown();
            }

            public void follower() {
                LeaderElectionMessagingService.this.updateDelegate(null);
                LeaderElectionMessagingService.LOG.info("Messaging service instance {} running at {} becomes follower", LeaderElectionMessagingService.this.cConf.get("messaging.container.instance.id"), LeaderElectionMessagingService.this.cConf.get("messaging.http.server.bind.address"));
                countDownLatch.countDown();
            }
        });
        this.leaderElection.startAndWait();
        countDownLatch.await();
    }

    protected void shutDown() throws Exception {
        try {
            try {
                this.leaderElection.stopAndWait();
                this.delayExecutor.shutdownNow();
            } catch (Exception e) {
                LOG.warn("Exception during shutting down leader election", e);
                this.delayExecutor.shutdownNow();
            }
        } catch (Throwable th) {
            this.delayExecutor.shutdownNow();
            throw th;
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException {
        getMessagingService().createTopic(topicMetadata);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException {
        getMessagingService().updateTopic(topicMetadata);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        getMessagingService().deleteTopic(topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        return getMessagingService().getTopic(topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public List<TopicId> listTopics(NamespaceId namespaceId) throws IOException {
        return getMessagingService().listTopics(namespaceId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException {
        return getMessagingService().prepareFetch(topicId);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    @Nullable
    public RollbackDetail publish(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        return getMessagingService().publish(storeRequest);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void storePayload(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        getMessagingService().storePayload(storeRequest);
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException {
        getMessagingService().rollback(topicId, rollbackDetail);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void upgradeTable() {
        HBaseTableFactory hBaseTableFactory = getHBaseTableFactory((TableFactory) this.injector.getInstance(TableFactory.class));
        if (hBaseTableFactory != null) {
            try {
                hBaseTableFactory.upgradeMessageTable(this.cConf.get("messaging.message.table.name"));
            } catch (IOException e) {
                LOG.warn("Exception while trying to upgrade TMS MessageTable.", e);
            }
            try {
                hBaseTableFactory.upgradePayloadTable(this.cConf.get("messaging.payload.table.name"));
            } catch (IOException e2) {
                LOG.warn("Exception while trying to upgrade TMS PayloadTable.", e2);
            }
        }
    }

    @Nullable
    private HBaseTableFactory getHBaseTableFactory(TableFactory tableFactory) {
        TableFactory tableFactory2;
        TableFactory tableFactory3 = tableFactory;
        while (true) {
            tableFactory2 = tableFactory3;
            if ((tableFactory2 instanceof HBaseTableFactory) || !(tableFactory2 instanceof ForwardingTableFactory)) {
                break;
            }
            tableFactory3 = ((ForwardingTableFactory) tableFactory2).getDelegate();
        }
        if (tableFactory2 instanceof HBaseTableFactory) {
            return (HBaseTableFactory) tableFactory2;
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateDelegate(@Nullable DelegateService delegateService) {
        DelegateService delegateService2;
        DelegateService reference = this.delegate.getReference();
        while (true) {
            delegateService2 = reference;
            if (this.delegate.compareAndSet(delegateService2, delegateService, this.delegate.isMarked(), false)) {
                break;
            } else {
                reference = this.delegate.getReference();
            }
        }
        if (delegateService2 != null) {
            delegateService2.stopAndWait();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fencingStart(final DelegateService delegateService) {
        Runnable runnable = new Runnable() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingService.2
            @Override // java.lang.Runnable
            public void run() {
                delegateService.startAndWait();
                if (LeaderElectionMessagingService.this.delegate.attemptMark(delegateService, true)) {
                    return;
                }
                delegateService.stopAndWait();
            }
        };
        long j = this.cConf.getLong("messaging.ha.fencing.delay.seconds");
        if (j <= 0) {
            runnable.run();
        } else {
            this.delayExecutor.schedule(runnable, j, TimeUnit.SECONDS);
        }
    }

    private MessagingService getMessagingService() {
        DelegateService reference = this.delegate.getReference();
        if (reference == null || !this.delegate.isMarked()) {
            throw new ServiceUnavailableException("messaging.service", "Messaging service is temporarily unavailable due to leader transition");
        }
        return reference.getMessagingService();
    }
}
