package org.apache.flink.table.filesystem.stream;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.filesystem.PartitionTimeExtractor;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/table/filesystem/stream/PartitionTimeCommitTrigger.class */
public class PartitionTimeCommitTrigger implements PartitionCommitTrigger {
    private static final ListStateDescriptor<List<String>> PENDING_PARTITIONS_STATE_DESC = new ListStateDescriptor<>("pending-partitions", new ListSerializer(StringSerializer.INSTANCE));
    private static final ListStateDescriptor<Map<Long, Long>> WATERMARKS_STATE_DESC = new ListStateDescriptor<>("checkpoint-id-to-watermark", new MapSerializer(LongSerializer.INSTANCE, LongSerializer.INSTANCE));
    private final ListState<List<String>> pendingPartitionsState;
    private final Set<String> pendingPartitions = new HashSet();
    private final ListState<Map<Long, Long>> watermarksState;
    private final TreeMap<Long, Long> watermarks;
    private final PartitionTimeExtractor extractor;
    private final long commitDelay;
    private final List<String> partitionKeys;
    private final ZoneId watermarkTimeZone;

    public PartitionTimeCommitTrigger(boolean z, OperatorStateStore operatorStateStore, Configuration configuration, ClassLoader classLoader, List<String> list) throws Exception {
        this.pendingPartitionsState = operatorStateStore.getListState(PENDING_PARTITIONS_STATE_DESC);
        if (z) {
            this.pendingPartitions.addAll((Collection) ((Iterable) this.pendingPartitionsState.get()).iterator().next());
        }
        this.partitionKeys = list;
        this.commitDelay = ((Duration) configuration.get(FileSystemOptions.SINK_PARTITION_COMMIT_DELAY)).toMillis();
        this.extractor = PartitionTimeExtractor.create(classLoader, (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_KIND), (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_CLASS), (String) configuration.get(FileSystemOptions.PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN));
        this.watermarksState = operatorStateStore.getListState(WATERMARKS_STATE_DESC);
        this.watermarks = new TreeMap<>();
        this.watermarkTimeZone = ZoneId.of(configuration.getString(FileSystemOptions.SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE));
        if (z) {
            this.watermarks.putAll((Map) ((Iterable) this.watermarksState.get()).iterator().next());
        }
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public void addPartition(String str) {
        if (StringUtils.isNullOrWhitespaceOnly(str)) {
            return;
        }
        this.pendingPartitions.add(str);
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public List<String> committablePartitions(long j) {
        if (!this.watermarks.containsKey(Long.valueOf(j))) {
            throw new IllegalArgumentException(String.format("Checkpoint(%d) has not been snapshot. The watermark information is: %s.", Long.valueOf(j), this.watermarks));
        }
        long longValue = this.watermarks.get(Long.valueOf(j)).longValue();
        this.watermarks.headMap(Long.valueOf(j), true).clear();
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = this.pendingPartitions.iterator();
        while (it.hasNext()) {
            String next = it.next();
            if (watermarkHasPassedWithDelay(longValue, this.extractor.extract(this.partitionKeys, PartitionPathUtils.extractPartitionValues(new Path(next))), this.commitDelay)) {
                arrayList.add(next);
                it.remove();
            }
        }
        return arrayList;
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.time.ZonedDateTime] */
    private boolean watermarkHasPassedWithDelay(long j, LocalDateTime localDateTime, long j2) {
        return j > localDateTime.atZone(this.watermarkTimeZone).toInstant().toEpochMilli() + j2;
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public void snapshotState(long j, long j2) throws Exception {
        this.pendingPartitionsState.clear();
        this.pendingPartitionsState.add(new ArrayList(this.pendingPartitions));
        this.watermarks.put(Long.valueOf(j), Long.valueOf(j2));
        this.watermarksState.clear();
        this.watermarksState.add(new HashMap(this.watermarks));
    }

    @Override // org.apache.flink.table.filesystem.stream.PartitionCommitTrigger
    public List<String> endInput() {
        ArrayList arrayList = new ArrayList(this.pendingPartitions);
        this.pendingPartitions.clear();
        return arrayList;
    }
}
