package co.cask.cdap.messaging.service;

import co.cask.cdap.api.messaging.TopicAlreadyExistsException;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.utils.TimeProvider;
import co.cask.cdap.messaging.MessageFetcher;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.MessagingUtils;
import co.cask.cdap.messaging.RollbackDetail;
import co.cask.cdap.messaging.StoreRequest;
import co.cask.cdap.messaging.TopicMetadata;
import co.cask.cdap.messaging.store.MessageTable;
import co.cask.cdap.messaging.store.MetadataTable;
import co.cask.cdap.messaging.store.PayloadTable;
import co.cask.cdap.messaging.store.TableFactory;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:co/cask/cdap/messaging/service/CoreMessagingService.class */
public class CoreMessagingService extends AbstractIdleService implements MessagingService {
    private static final Logger LOG = LoggerFactory.getLogger(CoreMessagingService.class);
    private final CConfiguration cConf;
    private final TableFactory tableFactory;
    private final LoadingCache<TopicId, TopicMetadata> topicCache;
    private final LoadingCache<TopicId, ConcurrentMessageWriter> messageTableWriterCache;
    private final LoadingCache<TopicId, ConcurrentMessageWriter> payloadTableWriterCache;
    private final TimeProvider timeProvider;
    private final MetricsCollectionService metricsCollectionService;

    @Inject
    CoreMessagingService(CConfiguration cConfiguration, TableFactory tableFactory, MetricsCollectionService metricsCollectionService) {
        this(cConfiguration, tableFactory, TimeProvider.SYSTEM_TIME, metricsCollectionService);
    }

    @VisibleForTesting
    CoreMessagingService(CConfiguration cConfiguration, TableFactory tableFactory, TimeProvider timeProvider, MetricsCollectionService metricsCollectionService) {
        this.cConf = cConfiguration;
        this.tableFactory = tableFactory;
        this.topicCache = createTopicCache();
        this.messageTableWriterCache = createTableWriterCache(true, cConfiguration);
        this.payloadTableWriterCache = createTableWriterCache(false, cConfiguration);
        this.timeProvider = timeProvider;
        this.metricsCollectionService = metricsCollectionService;
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void createTopic(TopicMetadata topicMetadata) throws TopicAlreadyExistsException, IOException {
        MetadataTable createMetadataTable = createMetadataTable();
        Throwable th = null;
        try {
            try {
                Map<String, String> createDefaultProperties = createDefaultProperties();
                createDefaultProperties.putAll(topicMetadata.getProperties());
                createMetadataTable.createTopic(new TopicMetadata(topicMetadata.getTopicId(), createDefaultProperties, true));
                if (createMetadataTable != null) {
                    if (0 == 0) {
                        createMetadataTable.close();
                        return;
                    }
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createMetadataTable != null) {
                if (th != null) {
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createMetadataTable.close();
                }
            }
            throw th4;
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void updateTopic(TopicMetadata topicMetadata) throws TopicNotFoundException, IOException {
        MetadataTable createMetadataTable = createMetadataTable();
        Throwable th = null;
        try {
            try {
                Map<String, String> createDefaultProperties = createDefaultProperties();
                createDefaultProperties.putAll(topicMetadata.getProperties());
                createMetadataTable.updateTopic(new TopicMetadata(topicMetadata.getTopicId(), createDefaultProperties, true));
                this.topicCache.invalidate(topicMetadata.getTopicId());
                if (createMetadataTable != null) {
                    if (0 == 0) {
                        createMetadataTable.close();
                        return;
                    }
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createMetadataTable != null) {
                if (th != null) {
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createMetadataTable.close();
                }
            }
            throw th4;
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void deleteTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        MetadataTable createMetadataTable = createMetadataTable();
        Throwable th = null;
        try {
            try {
                createMetadataTable.deleteTopic(topicId);
                this.topicCache.invalidate(topicId);
                this.messageTableWriterCache.invalidate(topicId);
                this.payloadTableWriterCache.invalidate(topicId);
                if (createMetadataTable != null) {
                    if (0 == 0) {
                        createMetadataTable.close();
                        return;
                    }
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createMetadataTable != null) {
                if (th != null) {
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createMetadataTable.close();
                }
            }
            throw th4;
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public TopicMetadata getTopic(TopicId topicId) throws TopicNotFoundException, IOException {
        try {
            return (TopicMetadata) this.topicCache.get(topicId);
        } catch (ExecutionException e) {
            Throwables.propagateIfPossible((Throwable) Objects.firstNonNull(e.getCause(), e), TopicNotFoundException.class, IOException.class);
            throw Throwables.propagate(e.getCause());
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public List<TopicId> listTopics(NamespaceId namespaceId) throws IOException {
        MetadataTable createMetadataTable = createMetadataTable();
        Throwable th = null;
        try {
            try {
                List<TopicId> listTopics = createMetadataTable.listTopics(namespaceId);
                if (createMetadataTable != null) {
                    if (0 != 0) {
                        try {
                            createMetadataTable.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createMetadataTable.close();
                    }
                }
                return listTopics;
            } finally {
            }
        } catch (Throwable th3) {
            if (createMetadataTable != null) {
                if (th != null) {
                    try {
                        createMetadataTable.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createMetadataTable.close();
                }
            }
            throw th3;
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public MessageFetcher prepareFetch(TopicId topicId) throws TopicNotFoundException, IOException {
        final TopicMetadata topic = getTopic(topicId);
        return new CoreMessageFetcher(topic, new TableProvider<MessageTable>() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.messaging.service.TableProvider
            public MessageTable get() throws IOException {
                return CoreMessagingService.this.createMessageTable(topic);
            }
        }, new TableProvider<PayloadTable>() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // co.cask.cdap.messaging.service.TableProvider
            public PayloadTable get() throws IOException {
                return CoreMessagingService.this.createPayloadTable(topic);
            }
        });
    }

    @Override // co.cask.cdap.messaging.MessagingService
    @Nullable
    public RollbackDetail publish(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        try {
            return ((ConcurrentMessageWriter) this.messageTableWriterCache.get(storeRequest.getTopicId())).persist(storeRequest, (TopicMetadata) this.topicCache.get(storeRequest.getTopicId()));
        } catch (ExecutionException e) {
            Throwables.propagateIfPossible((Throwable) Objects.firstNonNull(e.getCause(), e), TopicNotFoundException.class, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void storePayload(StoreRequest storeRequest) throws TopicNotFoundException, IOException {
        try {
            ((ConcurrentMessageWriter) this.payloadTableWriterCache.get(storeRequest.getTopicId())).persist(storeRequest, (TopicMetadata) this.topicCache.get(storeRequest.getTopicId()));
        } catch (ExecutionException e) {
            Throwables.propagateIfPossible((Throwable) Objects.firstNonNull(e.getCause(), e), TopicNotFoundException.class, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.messaging.MessagingService
    public void rollback(TopicId topicId, RollbackDetail rollbackDetail) throws TopicNotFoundException, IOException {
        Throwable th;
        MessageTable createMessageTable;
        TopicMetadata topic = getTopic(topicId);
        Exception exc = null;
        try {
            createMessageTable = createMessageTable(topic);
            th = null;
        } catch (Exception th2) {
        }
        try {
            try {
                createMessageTable.rollback(topic, rollbackDetail);
                if (createMessageTable != null) {
                    if (0 != 0) {
                        try {
                            createMessageTable.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createMessageTable.close();
                    }
                }
                if (exc != null) {
                    Throwables.propagateIfPossible(exc, TopicNotFoundException.class, IOException.class);
                    throw Throwables.propagate(exc);
                }
            } finally {
            }
        } finally {
        }
    }

    protected void startUp() throws Exception {
        LinkedList linkedList = new LinkedList();
        Iterator it = new HashSet(this.cConf.getTrimmedStringCollection("messaging.system.topics")).iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            try {
                TopicId topicId = NamespaceId.SYSTEM.topic(str);
                try {
                    createTopicIfNotExists(topicId);
                } catch (Exception e) {
                    LOG.warn("Topic {} creation failed with exception {}. Will retry.", topicId, e.getMessage());
                    LOG.debug("Topic {} creation failure stacktrace", topicId, e);
                    linkedList.add(topicId);
                }
            } catch (IllegalArgumentException e2) {
                LOG.warn("Ignore creation of invalid topic '{}'.", str);
            }
        }
        if (!linkedList.isEmpty()) {
            startAsyncTopicCreation(linkedList, 5L, TimeUnit.SECONDS);
        }
        LOG.info("Core Messaging Service started");
    }

    protected void shutDown() throws Exception {
        this.messageTableWriterCache.invalidateAll();
        this.messageTableWriterCache.invalidateAll();
        this.payloadTableWriterCache.invalidateAll();
        LOG.info("Core Messaging Service stopped");
    }

    private void startAsyncTopicCreation(final Queue<TopicId> queue, final long j, final TimeUnit timeUnit) {
        final ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(Threads.createDaemonThreadFactory("async-topic-creation"));
        newSingleThreadScheduledExecutor.schedule(new Runnable() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.3
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = queue.iterator();
                while (it.hasNext()) {
                    TopicId topicId = (TopicId) it.next();
                    try {
                        CoreMessagingService.this.createTopicIfNotExists(topicId);
                        it.remove();
                    } catch (Exception e) {
                        CoreMessagingService.LOG.warn("Topic {} creation failed with exception {}. Will retry.", topicId, e.getMessage());
                        CoreMessagingService.LOG.debug("Topic {} creation failure stacktrace", topicId, e);
                    }
                }
                if (queue.isEmpty()) {
                    return;
                }
                newSingleThreadScheduledExecutor.schedule(this, j, timeUnit);
            }
        }, j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void createTopicIfNotExists(TopicId topicId) throws IOException {
        try {
            createTopic(new TopicMetadata(topicId, new Object[0]));
            LOG.info("System topic created: {}", topicId);
        } catch (TopicAlreadyExistsException e) {
            LOG.debug("System topic already exists: {}", topicId);
        }
    }

    private LoadingCache<TopicId, TopicMetadata> createTopicCache() {
        return CacheBuilder.newBuilder().build(new CacheLoader<TopicId, TopicMetadata>() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.4
            public TopicMetadata load(TopicId topicId) throws Exception {
                MetadataTable createMetadataTable = CoreMessagingService.this.createMetadataTable();
                Throwable th = null;
                try {
                    TopicMetadata metadata = createMetadataTable.getMetadata(topicId);
                    if (createMetadataTable != null) {
                        if (0 != 0) {
                            try {
                                createMetadataTable.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createMetadataTable.close();
                        }
                    }
                    return metadata;
                } catch (Throwable th3) {
                    if (createMetadataTable != null) {
                        if (0 != 0) {
                            try {
                                createMetadataTable.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createMetadataTable.close();
                        }
                    }
                    throw th3;
                }
            }
        });
    }

    private LoadingCache<TopicId, ConcurrentMessageWriter> createTableWriterCache(final boolean z, final CConfiguration cConfiguration) {
        return CacheBuilder.newBuilder().expireAfterAccess(cConfiguration.getLong("messaging.table.expiration.seconds"), TimeUnit.SECONDS).removalListener(new RemovalListener<TopicId, ConcurrentMessageWriter>() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.6
            public void onRemoval(RemovalNotification<TopicId, ConcurrentMessageWriter> removalNotification) {
                ConcurrentMessageWriter concurrentMessageWriter = (ConcurrentMessageWriter) removalNotification.getValue();
                if (concurrentMessageWriter != null) {
                    try {
                        concurrentMessageWriter.close();
                    } catch (IOException e) {
                        CoreMessagingService.LOG.warn("Exception raised when closing message writer for topic {}", removalNotification.getKey(), e);
                    }
                }
            }
        }).build(new CacheLoader<TopicId, ConcurrentMessageWriter>() { // from class: co.cask.cdap.messaging.service.CoreMessagingService.5
            public ConcurrentMessageWriter load(TopicId topicId) throws Exception {
                TopicMetadata topic = CoreMessagingService.this.getTopic(topicId);
                return new ConcurrentMessageWriter(z ? new MessageTableStoreRequestWriter(CoreMessagingService.this.createMessageTable(topic), CoreMessagingService.this.timeProvider) : new PayloadTableStoreRequestWriter(CoreMessagingService.this.createPayloadTable(topic), CoreMessagingService.this.timeProvider), CoreMessagingService.this.metricsCollectionService.getContext(ImmutableMap.of("cmp", "messaging.service", "ins", cConfiguration.get("messaging.container.instance.id", "0"), "ns", topicId.getNamespace(), "tbl", z ? "message" : "payload")));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MetadataTable createMetadataTable() throws IOException {
        return this.tableFactory.createMetadataTable(this.cConf.get("messaging.metadata.table.name"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageTable createMessageTable(TopicMetadata topicMetadata) throws IOException {
        return this.tableFactory.createMessageTable(this.cConf.get("messaging.message.table.name"));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PayloadTable createPayloadTable(TopicMetadata topicMetadata) throws IOException {
        return this.tableFactory.createPayloadTable(this.cConf.get("messaging.payload.table.name"));
    }

    private Map<String, String> createDefaultProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put(TopicMetadata.TTL_KEY, this.cConf.get("messaging.topic.default.ttl.seconds"));
        hashMap.put(TopicMetadata.GENERATION_KEY, MessagingUtils.Constants.DEFAULT_GENERATION);
        return hashMap;
    }
}
