package org.apache.gobblin.compaction.action;

import com.google.common.base.Optional;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.mapreduce.test.TestCompactionTaskUtils;
import org.apache.gobblin.compaction.parser.CompactionPathParser;
import org.apache.gobblin.compaction.verify.CompactionWatermarkChecker;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.dataset.FileSystemDataset;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.time.TimeIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compaction/action/CompactionWatermarkAction.class */
public class CompactionWatermarkAction implements CompactionCompleteAction<FileSystemDataset> {
    private static final Logger log = LoggerFactory.getLogger(CompactionWatermarkAction.class);
    public static final String CONF_PREFIX = "compactionWatermarkAction";
    public static final String GRANULARITY = "compactionWatermarkAction.granularity";
    public static final String DEFAULT_HIVE_DB = "compactionWatermarkAction.defaultHiveDb";
    private EventSubmitter submitter;
    private State state;
    private final String defaultHiveDb;
    private final TimeIterator.Granularity granularity;
    private final ZoneId zone;

    public CompactionWatermarkAction(State state) {
        this.state = state;
        this.defaultHiveDb = state.getProp(DEFAULT_HIVE_DB);
        this.granularity = TimeIterator.Granularity.valueOf(state.getProp(GRANULARITY).toUpperCase());
        this.zone = ZoneId.of(state.getProp(MRCompactor.COMPACTION_TIMEZONE, MRCompactor.DEFAULT_COMPACTION_TIMEZONE));
    }

    @Override // org.apache.gobblin.compaction.action.CompactionCompleteAction
    public void onCompactionJobComplete(FileSystemDataset fileSystemDataset) throws IOException {
        String prop = this.state.getProp(CompactionWatermarkChecker.COMPACTION_WATERMARK);
        String prop2 = this.state.getProp(CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK);
        if (StringUtils.isEmpty(prop) && StringUtils.isEmpty(prop2)) {
            return;
        }
        HiveDatasetFinder.DbAndTable extractDbTable = extractDbTable(new CompactionPathParser(this.state).parse(fileSystemDataset).getDatasetName());
        String db = extractDbTable.getDb();
        String table = extractDbTable.getTable();
        HiveRegister hiveRegister = HiveRegister.get(this.state);
        Optional table2 = hiveRegister.getTable(db, table);
        if (!table2.isPresent()) {
            log.info("Table {}.{} not found. Skip publishing compaction watermarks", db, table);
            return;
        }
        HiveTable hiveTable = (HiveTable) table2.get();
        State props = hiveTable.getProps();
        boolean mayUpdateWatermark = mayUpdateWatermark(fileSystemDataset, props, CompactionWatermarkChecker.COMPACTION_WATERMARK, prop);
        if (mayUpdateWatermark(fileSystemDataset, props, CompactionWatermarkChecker.COMPLETION_COMPACTION_WATERMARK, prop2)) {
            mayUpdateWatermark = true;
        }
        if (mayUpdateWatermark) {
            log.info("Alter table {}.{} to publish watermarks {}", new Object[]{db, table, props});
            hiveRegister.alterTable(hiveTable);
        }
    }

    private boolean mayUpdateWatermark(FileSystemDataset fileSystemDataset, State state, String str, String str2) {
        if (StringUtils.isEmpty(str2)) {
            return false;
        }
        long propAsLong = state.getPropAsLong(str, 0L);
        if (propAsLong == 0) {
            state.setProp(str, str2);
            return true;
        }
        long parseLong = Long.parseLong(str2);
        if (parseLong <= propAsLong) {
            return false;
        }
        long expectedNextWatermark = getExpectedNextWatermark(Long.valueOf(propAsLong));
        if (parseLong == expectedNextWatermark) {
            state.setProp(str, str2);
            return true;
        }
        String format = String.format("Fail to advance %s of dataset %s: expect %s but got %s, please manually fill the gap and rerun.", str, fileSystemDataset.datasetRoot(), Long.valueOf(expectedNextWatermark), Long.valueOf(parseLong));
        log.error(format);
        throw new RuntimeException(format);
    }

    private long getExpectedNextWatermark(Long l) {
        return TimeIterator.inc(ZonedDateTime.ofInstant(Instant.ofEpochMilli(l.longValue()), this.zone), this.granularity, 1L).toInstant().toEpochMilli();
    }

    @Override // org.apache.gobblin.compaction.action.CompactionCompleteAction
    public void addEventSubmitter(EventSubmitter eventSubmitter) {
        this.submitter = eventSubmitter;
    }

    private HiveDatasetFinder.DbAndTable extractDbTable(String str) {
        String[] split = str.split(TestCompactionTaskUtils.PATH_SEPARATOR);
        if (split.length == 0 || split.length > 2) {
            throw new RuntimeException(String.format("Unsupported dataset %s", str));
        }
        String str2 = this.defaultHiveDb;
        String str3 = split[0];
        if (split.length == 2) {
            str2 = split[0];
            str3 = split[1];
        }
        return new HiveDatasetFinder.DbAndTable(str2, str3);
    }
}
