package co.cask.cdap.report.main;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.messaging.Message;
import co.cask.cdap.api.messaging.MessageFetcher;
import co.cask.cdap.api.messaging.TopicNotFoundException;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.report.util.Constants;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/report/main/TMSSubscriber.class */
public class TMSSubscriber extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(TMSSubscriber.class);
    private static final String TOPIC = "programstatusrecordevent";
    private static final String NAMESPACE_SYSTEM = "system";
    private static final String FETCH_SIZE = "tms.fetch.size";
    private static final int DEFAULT_FETCH_SIZE = 100;
    private final MessageFetcher messageFetcher;
    private final RunMetaFileManager runMetaFileManager;
    private final Location baseLocation;
    private final int fetchSize;
    private final Metrics metrics;
    private volatile boolean isStopped;

    /* JADX INFO: Access modifiers changed from: package-private */
    public TMSSubscriber(MessageFetcher messageFetcher, Location location, Map<String, String> map, Metrics metrics) {
        super("TMS-RunrecordEvent-Subscriber-thread");
        this.messageFetcher = messageFetcher;
        this.isStopped = false;
        this.baseLocation = location;
        this.runMetaFileManager = new RunMetaFileManager(location, map, metrics);
        this.fetchSize = map.containsKey(FETCH_SIZE) ? Integer.parseInt(map.get(FETCH_SIZE)) : 100;
        this.metrics = metrics;
    }

    public void requestStop() {
        this.isStopped = true;
        this.runMetaFileManager.cleanup();
        LOG.info("Shutting down tms-subscriber thread");
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        SampledLogging sampledLogging = new SampledLogging(LOG, 100);
        try {
            String findMessageId = MessageUtil.findMessageId(this.baseLocation);
            while (!this.isStopped) {
                try {
                    TimeUnit.MILLISECONDS.sleep(10L);
                    this.runMetaFileManager.syncOutputStreamsIfRequired();
                    try {
                        CloseableIterator fetch = this.messageFetcher.fetch("system", TOPIC, this.fetchSize, findMessageId);
                        Throwable th = null;
                        while (!this.isStopped && fetch.hasNext()) {
                            try {
                                try {
                                    Message message = (Message) fetch.next();
                                    Notification messageToNotification = MessageUtil.messageToNotification(message);
                                    if (MessageUtil.isCDAPVersionCompatible(messageToNotification)) {
                                        ProgramRunInfo constructAndGetProgramRunInfo = MessageUtil.constructAndGetProgramRunInfo(message, messageToNotification);
                                        this.runMetaFileManager.append(constructAndGetProgramRunInfo);
                                        emitUserProgramMetrics(constructAndGetProgramRunInfo);
                                    }
                                    findMessageId = message.getId();
                                } catch (Throwable th2) {
                                    th = th2;
                                    throw th2;
                                    break;
                                }
                            } catch (Throwable th3) {
                                if (fetch != null) {
                                    if (th != null) {
                                        try {
                                            fetch.close();
                                        } catch (Throwable th4) {
                                            th.addSuppressed(th4);
                                        }
                                    } else {
                                        fetch.close();
                                    }
                                }
                                throw th3;
                                break;
                            }
                        }
                        if (fetch != null) {
                            if (0 != 0) {
                                try {
                                    fetch.close();
                                } catch (Throwable th5) {
                                    th.addSuppressed(th5);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                    } catch (TopicNotFoundException e) {
                        LOG.error("Unable to find topic {} in tms, returning, cant write to the Fileset, Please fix", TOPIC, e);
                    } catch (IOException e2) {
                        sampledLogging.logWarning("Exception while fetching from TMS, will be retried", e2);
                    } catch (InterruptedException e3) {
                    }
                } catch (InterruptedException e4) {
                }
            }
            LOG.info("Done reading from tms meta");
        } catch (InterruptedException e5) {
            LOG.info("Interrupted from processing program status, returning");
        }
    }

    private void emitUserProgramMetrics(ProgramRunInfo programRunInfo) {
        if (programRunInfo.getNamespace().equals("system")) {
            return;
        }
        this.metrics.count(Constants.Metrics.RECORDS_PROCESSED_METRIC, 1);
    }
}
