package gobblin.runtime;

import com.google.common.base.Throwables;
import gobblin.annotation.Alias;
import gobblin.runtime.cli.CliApplication;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Alias(value = "watermarks", description = "Inspect streaming watermarks")
/* loaded from: input_file:gobblin/runtime/StateStoreBasedWatermarkStorageCli.class */
public class StateStoreBasedWatermarkStorageCli implements CliApplication {
    private static final Logger log = LoggerFactory.getLogger(StateStoreBasedWatermarkStorageCli.class);
    private static final Option HELP = Option.builder("h").longOpt("help").build();
    private static final Option ZK = Option.builder("z").longOpt(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_DEFAULT).desc("Zk connect string").hasArg().build();
    private static final Option JOB_NAME = Option.builder("j").longOpt("jobName").desc("The Job name").hasArg().build();
    private static final Option ROOT_DIR = Option.builder("r").longOpt("rootDir").desc("The State Store Root Directory").hasArg().build();
    private static final Option WATCH = Option.builder("w").longOpt("watch").desc("Watch the watermarks").build();

    @Override // gobblin.runtime.cli.CliApplication
    public void run(String[] strArr) {
        Options options = new Options();
        options.addOption(HELP);
        options.addOption(ZK);
        options.addOption(JOB_NAME);
        options.addOption(ROOT_DIR);
        options.addOption(WATCH);
        try {
            CommandLine parse = new DefaultParser().parse(options, (String[]) Arrays.copyOfRange(strArr, 1, strArr.length));
            if (parse.hasOption(HELP.getOpt())) {
                printUsage(options);
                return;
            }
            TaskState taskState = new TaskState();
            if (!parse.hasOption(JOB_NAME.getOpt())) {
                log.error("Need Job Name to be specified --", JOB_NAME.getLongOpt());
                throw new RuntimeException("Need Job Name to be specified");
            }
            String optionValue = parse.getOptionValue(JOB_NAME.getOpt());
            log.info("Using job name: {}", optionValue);
            taskState.setProp("job.name", optionValue);
            String optionValue2 = parse.hasOption(ZK.getOpt()) ? parse.getOptionValue(ZK.getOpt()) : "locahost:2181";
            log.info("Using zk address : {}", optionValue2);
            taskState.setProp(StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_KEY, StateStoreBasedWatermarkStorage.WATERMARK_STORAGE_TYPE_DEFAULT);
            taskState.setProp("state.store.zk.connectString", optionValue2);
            if (!parse.hasOption(ROOT_DIR.getOpt())) {
                log.error("Need root directory specified");
                printUsage(options);
                return;
            }
            String optionValue3 = parse.getOptionValue(ROOT_DIR.getOpt());
            taskState.setProp("streaming.watermarkStateStore.config.state.store.dir", optionValue3);
            log.info("Setting root dir to {}", optionValue3);
            StateStoreBasedWatermarkStorage stateStoreBasedWatermarkStorage = new StateStoreBasedWatermarkStorage(taskState);
            final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            if (parse.hasOption(WATCH.getOpt())) {
                atomicBoolean.set(false);
            }
            try {
                if (!atomicBoolean.get()) {
                    Runtime.getRuntime().addShutdownHook(new Thread() { // from class: gobblin.runtime.StateStoreBasedWatermarkStorageCli.1
                        @Override // java.lang.Thread, java.lang.Runnable
                        public void run() {
                            atomicBoolean.set(true);
                        }
                    });
                }
                do {
                    boolean z = false;
                    try {
                        Iterator<CheckpointableWatermarkState> it = stateStoreBasedWatermarkStorage.getAllCommittedWatermarks().iterator();
                        while (it.hasNext()) {
                            z = true;
                            System.out.println(it.next().getProperties());
                        }
                    } catch (IOException e) {
                        Throwables.propagate(e);
                    }
                    if (!z) {
                        System.out.println("No watermarks found.");
                    }
                    if (!atomicBoolean.get()) {
                        Thread.sleep(1000L);
                    }
                } while (!atomicBoolean.get());
            } catch (Exception e2) {
                Throwables.propagate(e2);
            }
        } catch (ParseException e3) {
            System.out.println("Command line parse exception: " + e3.getMessage());
        }
    }

    private void printUsage(Options options) {
        HelpFormatter helpFormatter = new HelpFormatter();
        helpFormatter.setOptionComparator(new Comparator<Option>() { // from class: gobblin.runtime.StateStoreBasedWatermarkStorageCli.2
            @Override // java.util.Comparator
            public int compare(Option option, Option option2) {
                if (option.isRequired() && !option2.isRequired()) {
                    return -1;
                }
                if (option.isRequired() || !option2.isRequired()) {
                    return option.getOpt().compareTo(option2.getOpt());
                }
                return 1;
            }
        });
        helpFormatter.printHelp("gobblin watermarks ", options);
    }
}
