package org.apache.gobblin.data.management.conversion.hive.watermarker;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
import org.apache.gobblin.data.management.conversion.hive.provider.HiveUnitUpdateProvider;
import org.apache.gobblin.data.management.conversion.hive.provider.UpdateProviderFactory;
import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.source.LoopingDatasetFinderSource;
import org.apache.gobblin.hive.HiveMetastoreClientPool;
import org.apache.gobblin.source.extractor.WatermarkInterval;
import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.util.AutoReturnableObject;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.thrift.TException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker.class */
public class PartitionLevelWatermarker implements HiveSourceWatermarker {
    public static final String IS_WATERMARK_WORKUNIT_KEY = "hive.source.watermark.isWatermarkWorkUnit";

    @VisibleForTesting
    protected long leastWatermarkToPersistInState;
    protected static final int BUFFER_WATERMARK_DAYS_TO_PERSIST = 2;
    protected final HiveMetastoreClientPool pool;
    protected final TableLevelWatermarker tableLevelWatermarker;
    protected final HiveUnitUpdateProvider updateProvider;
    private static final Logger log = LoggerFactory.getLogger(PartitionLevelWatermarker.class);
    private static final Joiner PARTITION_VALUES_JOINER = Joiner.on(",");
    static final Predicate<WorkUnitState> WATERMARK_WORKUNIT_PREDICATE = new Predicate<WorkUnitState>() { // from class: org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker.1
        public boolean apply(@Nonnull WorkUnitState workUnitState) {
            return workUnitState.contains("hive.source.watermark.isWatermarkWorkUnit");
        }
    };

    @VisibleForTesting
    protected final TableWatermarks expectedHighWatermarks = new TableWatermarks();

    @VisibleForTesting
    protected final TableWatermarks previousWatermarks = new TableWatermarks();

    /* loaded from: input_file:org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker$Factory.class */
    public static class Factory implements HiveSourceWatermarkerFactory {
        @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory
        public PartitionLevelWatermarker createFromState(State state) {
            return new PartitionLevelWatermarker(state);
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarker$TableWatermarks.class */
    static class TableWatermarks extends ConcurrentHashMap<String, Map<String, Long>> {
        private static final long serialVersionUID = 1;

        void setPartitionWatermarks(String str, Map<String, Long> map) {
            put(str, map);
        }

        boolean hasPartitionWatermarks(String str) {
            return containsKey(str);
        }

        void removePartitionWatermarks(String str, Collection<String> collection) {
            get(str).keySet().removeAll(collection);
        }

        void addPartitionWatermark(String str, String str2, Long l) {
            get(str).put(str2, l);
        }

        Long getPartitionWatermark(String str, String str2) {
            return get(str).get(str2);
        }

        Map<String, Long> getPartitionWatermarks(String str) {
            return get(str);
        }
    }

    public PartitionLevelWatermarker(State state) {
        this.tableLevelWatermarker = new TableLevelWatermarker(state);
        this.updateProvider = UpdateProviderFactory.create(state);
        try {
            this.pool = HiveMetastoreClientPool.get(state.getProperties(), Optional.fromNullable(state.getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
            this.leastWatermarkToPersistInState = new DateTime().minusDays(state.getPropAsInt(HiveSource.HIVE_SOURCE_MAXIMUM_LOOKBACK_DAYS_KEY, 3) + BUFFER_WATERMARK_DAYS_TO_PERSIST).getMillis();
            if (state instanceof SourceState) {
                for (Map.Entry entry : ((SourceState) state).getPreviousWorkUnitStatesByDatasetUrns().entrySet()) {
                    ArrayList newArrayList = Lists.newArrayList(Iterables.filter((Iterable) entry.getValue(), WATERMARK_WORKUNIT_PREDICATE));
                    if (newArrayList.isEmpty()) {
                        log.info(String.format("No previous partition watermarks for table %s", entry.getKey()));
                    } else {
                        if (newArrayList.size() > 1) {
                            throw new IllegalStateException(String.format("Each table should have only 1 watermark workunit that contains watermarks for all its partitions. Found %s", Integer.valueOf(newArrayList.size())));
                        }
                        MultiKeyValueLongWatermark multiKeyValueLongWatermark = (MultiKeyValueLongWatermark) ((WorkUnitState) newArrayList.get(0)).getActualHighWatermark(MultiKeyValueLongWatermark.class);
                        if (multiKeyValueLongWatermark != null) {
                            this.previousWatermarks.setPartitionWatermarks((String) entry.getKey(), multiKeyValueLongWatermark.getWatermarks());
                        } else {
                            log.warn(String.format("Previous workunit for %s has %s set but null MultiKeyValueLongWatermark found", entry.getKey(), "hive.source.watermark.isWatermarkWorkUnit"));
                        }
                    }
                }
                log.debug("Loaded partition watermarks from previous state " + this.previousWatermarks);
                Iterator it = this.previousWatermarks.keySet().iterator();
                while (it.hasNext()) {
                    String str = (String) it.next();
                    this.expectedHighWatermarks.setPartitionWatermarks(str, Maps.newHashMap(this.previousWatermarks.getPartitionWatermarks(str)));
                }
            }
        } catch (IOException e) {
            throw new RuntimeException("Could not initialize metastore client pool", e);
        }
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public void onTableProcessBegin(Table table, long j) {
        Preconditions.checkNotNull(table);
        if (this.expectedHighWatermarks.hasPartitionWatermarks(tableKey(table))) {
            return;
        }
        this.expectedHighWatermarks.setPartitionWatermarks(tableKey(table), Maps.newHashMap());
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public void onPartitionProcessBegin(Partition partition, long j, long j2) {
        Preconditions.checkNotNull(partition);
        Preconditions.checkNotNull(partition.getTable());
        if (!this.expectedHighWatermarks.hasPartitionWatermarks(tableKey(partition.getTable()))) {
            throw new IllegalStateException(String.format("onPartitionProcessBegin called before onTableProcessBegin for table: %s, partitions: %s", tableKey(partition.getTable()), partitionKey(partition)));
        }
        this.expectedHighWatermarks.removePartitionWatermarks(tableKey(partition.getTable()), Collections2.transform(AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(partition), new Function<Map<String, String>, String>() { // from class: org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker.2
            public String apply(Map<String, String> map) {
                return PartitionLevelWatermarker.PARTITION_VALUES_JOINER.join(map.values());
            }
        }));
        this.expectedHighWatermarks.addPartitionWatermark(tableKey(partition.getTable()), partitionKey(partition), Long.valueOf(j2));
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public LongWatermark getPreviousHighWatermark(Table table) {
        return this.tableLevelWatermarker.getPreviousHighWatermark(table);
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public LongWatermark getPreviousHighWatermark(Partition partition) {
        return (this.previousWatermarks.hasPartitionWatermarks(tableKey(partition.getTable())) && this.previousWatermarks.get(tableKey(partition.getTable())).containsKey(partitionKey(partition))) ? new LongWatermark(this.previousWatermarks.getPartitionWatermark(tableKey(partition.getTable()), partitionKey(partition)).longValue()) : new LongWatermark(0L);
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public void onGetWorkunitsEnd(List<WorkUnit> list) {
        try {
            AutoReturnableObject client = this.pool.getClient();
            Throwable th = null;
            try {
                try {
                    for (Map.Entry<String, Map<String, Long>> entry : this.expectedHighWatermarks.entrySet()) {
                        String key = entry.getKey();
                        Map<String, Long> value = entry.getValue();
                        if (new Table(((IMetaStoreClient) client.get()).getTable(key.split(LoopingDatasetFinderSource.DATASET_PARTITION_DELIMITER)[0], key.split(LoopingDatasetFinderSource.DATASET_PARTITION_DELIMITER)[1])).isPartitioned()) {
                            ImmutableMap copyOf = ImmutableMap.copyOf(Maps.filterEntries(value, new Predicate<Map.Entry<String, Long>>() { // from class: org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker.3
                                public boolean apply(@Nonnull Map.Entry<String, Long> entry2) {
                                    return Long.compare(entry2.getValue().longValue(), PartitionLevelWatermarker.this.leastWatermarkToPersistInState) >= 0;
                                }
                            }));
                            WorkUnit createEmpty = WorkUnit.createEmpty();
                            createEmpty.setProp("hive.source.watermark.isWatermarkWorkUnit", true);
                            createEmpty.setProp("dataset.urn", key);
                            createEmpty.setWatermarkInterval(new WatermarkInterval(new MultiKeyValueLongWatermark(this.previousWatermarks.get(key)), new MultiKeyValueLongWatermark(copyOf)));
                            list.add(createEmpty);
                        }
                    }
                    if (client != null) {
                        if (0 != 0) {
                            try {
                                client.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            client.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException | TException e) {
            Throwables.propagate(e);
        }
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public LongWatermark getExpectedHighWatermark(Table table, long j) {
        return new LongWatermark(this.updateProvider.getUpdateTime(table));
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public LongWatermark getExpectedHighWatermark(Partition partition, long j, long j2) {
        return new LongWatermark(this.expectedHighWatermarks.getPartitionWatermark(tableKey(partition.getTable()), partitionKey(partition)).longValue());
    }

    @Override // org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker
    public void setActualHighWatermark(WorkUnitState workUnitState) {
        if (Boolean.valueOf(workUnitState.getPropAsBoolean("hive.source.watermark.isWatermarkWorkUnit")).booleanValue()) {
            workUnitState.setActualHighWatermark(workUnitState.getWorkunit().getExpectedHighWatermark(MultiKeyValueLongWatermark.class));
        } else {
            workUnitState.setActualHighWatermark(workUnitState.getWorkunit().getExpectedHighWatermark(LongWatermark.class));
        }
    }

    @VisibleForTesting
    public static String tableKey(Table table) {
        return table.getCompleteName();
    }

    @VisibleForTesting
    public static String partitionKey(Partition partition) {
        return PARTITION_VALUES_JOINER.join(partition.getValues());
    }

    void setLeastWatermarkToPersistInState(long j) {
        this.leastWatermarkToPersistInState = j;
    }

    TableWatermarks getPreviousWatermarks() {
        return this.previousWatermarks;
    }

    TableWatermarks getExpectedHighWatermarks() {
        return this.expectedHighWatermarks;
    }
}
