package co.cask.cdap.report.main;

import co.cask.cdap.api.Admin;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.data.DatasetInstantiationException;
import co.cask.cdap.api.dataset.lib.FileSet;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.spark.JavaSparkExecutionContext;
import co.cask.cdap.api.spark.JavaSparkMain;
import co.cask.cdap.report.ReportGenerationApp;
import co.cask.cdap.report.util.Constants;
import java.io.IOException;
import java.io.OutputStream;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeUnit;
import javax.crypto.KeyGenerator;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.twill.filesystem.Location;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/report/main/SparkPersistRunRecordMain.class */
public class SparkPersistRunRecordMain implements JavaSparkMain {
    private TMSSubscriber tmsSubscriber;
    private static final Logger LOG = LoggerFactory.getLogger(SparkPersistRunRecordMain.class);
    private static final SampledLogging SAMPLED_LOGGING = new SampledLogging(LOG, 100);

    public void run(JavaSparkExecutionContext javaSparkExecutionContext) throws Exception {
        new JavaSparkContext();
        Admin admin = javaSparkExecutionContext.getAdmin();
        if (!admin.datasetExists(ReportGenerationApp.RUN_META_FILESET)) {
            admin.createDataset(ReportGenerationApp.RUN_META_FILESET, FileSet.class.getName(), FileSetProperties.builder().build());
        }
        createSecurityKeyFile(getDatasetBaseLocationWithRetry(javaSparkExecutionContext, ReportGenerationApp.REPORT_FILESET));
        if (Boolean.parseBoolean((String) javaSparkExecutionContext.getRuntimeArguments().getOrDefault(Constants.DISABLE_TMS_SUBSCRIBER_THREAD, "false"))) {
            return;
        }
        this.tmsSubscriber = new TMSSubscriber(javaSparkExecutionContext.getMessagingContext().getMessageFetcher(), getDatasetBaseLocationWithRetry(javaSparkExecutionContext, ReportGenerationApp.RUN_META_FILESET), javaSparkExecutionContext.getRuntimeArguments(), javaSparkExecutionContext.getMetrics());
        this.tmsSubscriber.start();
        try {
            this.tmsSubscriber.join();
        } catch (InterruptedException e) {
            this.tmsSubscriber.requestStop();
            this.tmsSubscriber.interrupt();
        }
    }

    private void createSecurityKeyFile(Location location) throws IOException, NoSuchAlgorithmException {
        Location append = location.append(Constants.Security.KEY_FILE_NAME);
        if (append.exists()) {
            return;
        }
        KeyGenerator keyGenerator = KeyGenerator.getInstance(Constants.Security.ENCRYPTION_ALGORITHM);
        keyGenerator.init(128);
        writeKeyBytes(append, keyGenerator.generateKey().getEncoded());
    }

    private void writeKeyBytes(Location location, byte[] bArr) throws IOException {
        OutputStream outputStream = location.getOutputStream(Constants.Security.KEY_FILE_PERMISSION);
        Throwable th = null;
        try {
            try {
                outputStream.write(bArr);
                if (outputStream != null) {
                    if (0 == 0) {
                        outputStream.close();
                        return;
                    }
                    try {
                        outputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (outputStream != null) {
                if (th != null) {
                    try {
                        outputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    outputStream.close();
                }
            }
            throw th4;
        }
    }

    private Location getDatasetBaseLocationWithRetry(JavaSparkExecutionContext javaSparkExecutionContext, String str) throws InterruptedException {
        while (true) {
            try {
                return (Location) Transactionals.execute(javaSparkExecutionContext, datasetContext -> {
                    return datasetContext.getDataset(str).getBaseLocation();
                });
            } catch (RuntimeException e) {
                if (!(e instanceof DatasetInstantiationException)) {
                    throw e;
                }
                SAMPLED_LOGGING.logWarning(String.format("Exception while trying to get dataset %s", str), e);
                TimeUnit.MILLISECONDS.sleep(100L);
            }
        }
    }
}
