package org.apache.iceberg.connect.channel;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.events.CommitComplete;
import org.apache.iceberg.connect.events.CommitToTable;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iceberg/connect/channel/Coordinator.class */
public class Coordinator extends Channel {
    private static final String COMMIT_ID_SNAPSHOT_PROP = "kafka.connect.commit-id";
    private static final String VALID_THROUGH_TS_SNAPSHOT_PROP = "kafka.connect.valid-through-ts";
    private final Catalog catalog;
    private final IcebergSinkConfig config;
    private final int totalPartitionCount;
    private final String snapshotOffsetsProp;
    private final ExecutorService exec;
    private final CommitState commitState;
    private volatile boolean terminated;
    private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Duration POLL_DURATION = Duration.ofSeconds(1);

    /* renamed from: org.apache.iceberg.connect.channel.Coordinator$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/iceberg/connect/channel/Coordinator$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iceberg$connect$events$PayloadType = new int[PayloadType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iceberg$connect$events$PayloadType[PayloadType.DATA_WRITTEN.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iceberg$connect$events$PayloadType[PayloadType.DATA_COMPLETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Coordinator(Catalog catalog, IcebergSinkConfig icebergSinkConfig, Collection<MemberDescription> collection, KafkaClientFactory kafkaClientFactory, SinkTaskContext sinkTaskContext) {
        super("coordinator", icebergSinkConfig.connectGroupId() + "-coord", icebergSinkConfig, kafkaClientFactory, sinkTaskContext);
        this.catalog = catalog;
        this.config = icebergSinkConfig;
        this.totalPartitionCount = collection.stream().mapToInt(memberDescription -> {
            return memberDescription.assignment().topicPartitions().size();
        }).sum();
        this.snapshotOffsetsProp = String.format("kafka.connect.offsets.%s.%s", icebergSinkConfig.controlTopic(), icebergSinkConfig.connectGroupId());
        this.exec = ThreadPools.newFixedThreadPool("iceberg-committer", icebergSinkConfig.commitThreads());
        this.commitState = new CommitState(icebergSinkConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void process() {
        if (this.commitState.isCommitIntervalReached()) {
            this.commitState.startNewCommit();
            send(new Event(this.config.connectGroupId(), new StartCommit(this.commitState.currentCommitId())));
            LOG.info("Commit {} initiated", this.commitState.currentCommitId());
        }
        consumeAvailable(POLL_DURATION);
        if (this.commitState.isCommitTimedOut()) {
            commit(true);
        }
    }

    @Override // org.apache.iceberg.connect.channel.Channel
    protected boolean receive(Envelope envelope) {
        switch (AnonymousClass2.$SwitchMap$org$apache$iceberg$connect$events$PayloadType[envelope.event().payload().type().ordinal()]) {
            case 1:
                this.commitState.addResponse(envelope);
                return true;
            case 2:
                this.commitState.addReady(envelope);
                if (!this.commitState.isCommitReady(this.totalPartitionCount)) {
                    return true;
                }
                commit(false);
                return true;
            default:
                return false;
        }
    }

    private void commit(boolean z) {
        try {
            doCommit(z);
        } catch (Exception e) {
            LOG.warn("Commit failed, will try again next cycle", e);
        } finally {
            this.commitState.endCurrentCommit();
        }
    }

    private void doCommit(boolean z) {
        Map<TableReference, List<Envelope>> tableCommitMap = this.commitState.tableCommitMap();
        String offsetsJson = offsetsJson();
        OffsetDateTime validThroughTs = this.commitState.validThroughTs(z);
        Tasks.foreach(tableCommitMap.entrySet()).executeWith(this.exec).stopOnFailure().run(entry -> {
            commitToTable((TableReference) entry.getKey(), (List) entry.getValue(), offsetsJson, validThroughTs);
        });
        commitConsumerOffsets();
        this.commitState.clearResponses();
        send(new Event(this.config.connectGroupId(), new CommitComplete(this.commitState.currentCommitId(), validThroughTs)));
        LOG.info("Commit {} complete, committed to {} table(s), valid-through {}", new Object[]{this.commitState.currentCommitId(), Integer.valueOf(tableCommitMap.size()), validThroughTs});
    }

    private String offsetsJson() {
        try {
            return MAPPER.writeValueAsString(controlTopicOffsets());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void commitToTable(TableReference tableReference, List<Envelope> list, String str, OffsetDateTime offsetDateTime) {
        TableIdentifier identifier = tableReference.identifier();
        try {
            Table loadTable = this.catalog.loadTable(identifier);
            String commitBranch = this.config.tableConfig(identifier.toString()).commitBranch();
            Map<Integer, Long> lastCommittedOffsetsForTable = lastCommittedOffsetsForTable(loadTable, commitBranch);
            List list2 = (List) list.stream().filter(envelope -> {
                Long l = (Long) lastCommittedOffsetsForTable.get(Integer.valueOf(envelope.partition()));
                return l == null || envelope.offset() >= l.longValue();
            }).map(envelope2 -> {
                return envelope2.event().payload();
            }).collect(Collectors.toList());
            List list3 = (List) list2.stream().filter(dataWritten -> {
                return dataWritten.dataFiles() != null;
            }).flatMap(dataWritten2 -> {
                return dataWritten2.dataFiles().stream();
            }).filter(dataFile -> {
                return dataFile.recordCount() > 0;
            }).filter(distinctByKey(dataFile2 -> {
                return dataFile2.path().toString();
            })).collect(Collectors.toList());
            List list4 = (List) list2.stream().filter(dataWritten3 -> {
                return dataWritten3.deleteFiles() != null;
            }).flatMap(dataWritten4 -> {
                return dataWritten4.deleteFiles().stream();
            }).filter(deleteFile -> {
                return deleteFile.recordCount() > 0;
            }).filter(distinctByKey(deleteFile2 -> {
                return deleteFile2.path().toString();
            })).collect(Collectors.toList());
            if (this.terminated) {
                throw new ConnectException("Coordinator is terminated, commit aborted");
            }
            if (list3.isEmpty() && list4.isEmpty()) {
                LOG.info("Nothing to commit to table {}, skipping", identifier);
                return;
            }
            if (list4.isEmpty()) {
                AppendFiles newAppend = loadTable.newAppend();
                if (commitBranch != null) {
                    newAppend.toBranch(commitBranch);
                }
                newAppend.set(this.snapshotOffsetsProp, str);
                newAppend.set(COMMIT_ID_SNAPSHOT_PROP, this.commitState.currentCommitId().toString());
                if (offsetDateTime != null) {
                    newAppend.set(VALID_THROUGH_TS_SNAPSHOT_PROP, offsetDateTime.toString());
                }
                Objects.requireNonNull(newAppend);
                list3.forEach(newAppend::appendFile);
                newAppend.commit();
            } else {
                RowDelta newRowDelta = loadTable.newRowDelta();
                if (commitBranch != null) {
                    newRowDelta.toBranch(commitBranch);
                }
                newRowDelta.set(this.snapshotOffsetsProp, str);
                newRowDelta.set(COMMIT_ID_SNAPSHOT_PROP, this.commitState.currentCommitId().toString());
                if (offsetDateTime != null) {
                    newRowDelta.set(VALID_THROUGH_TS_SNAPSHOT_PROP, offsetDateTime.toString());
                }
                Objects.requireNonNull(newRowDelta);
                list3.forEach(newRowDelta::addRows);
                Objects.requireNonNull(newRowDelta);
                list4.forEach(newRowDelta::addDeletes);
                newRowDelta.commit();
            }
            Long valueOf = Long.valueOf(latestSnapshot(loadTable, commitBranch).snapshotId());
            send(new Event(this.config.connectGroupId(), new CommitToTable(this.commitState.currentCommitId(), tableReference, valueOf, offsetDateTime)));
            LOG.info("Commit complete to table {}, snapshot {}, commit ID {}, valid-through {}", new Object[]{identifier, valueOf, this.commitState.currentCommitId(), offsetDateTime});
        } catch (NoSuchTableException e) {
            LOG.warn("Table not found, skipping commit: {}", identifier, e);
        }
    }

    private <T> Predicate<T> distinctByKey(Function<? super T, ?> function) {
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        return obj -> {
            return newConcurrentMap.putIfAbsent(function.apply(obj), Boolean.TRUE) == null;
        };
    }

    private Snapshot latestSnapshot(Table table, String str) {
        return str == null ? table.currentSnapshot() : table.snapshot(str);
    }

    private Map<Integer, Long> lastCommittedOffsetsForTable(Table table, String str) {
        Snapshot latestSnapshot = latestSnapshot(table, str);
        while (true) {
            Snapshot snapshot = latestSnapshot;
            if (snapshot == null) {
                return ImmutableMap.of();
            }
            String str2 = (String) snapshot.summary().get(this.snapshotOffsetsProp);
            if (str2 != null) {
                try {
                    return (Map) MAPPER.readValue(str2, new TypeReference<Map<Integer, Long>>() { // from class: org.apache.iceberg.connect.channel.Coordinator.1
                    });
                } catch (IOException e) {
                    throw new UncheckedIOException(e);
                }
            }
            Long parentId = snapshot.parentId();
            latestSnapshot = parentId != null ? table.snapshot(parentId.longValue()) : null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void terminate() {
        this.terminated = true;
        this.exec.shutdownNow();
        try {
            if (this.exec.awaitTermination(1L, TimeUnit.MINUTES)) {
            } else {
                throw new ConnectException("Timed out waiting for coordinator shutdown");
            }
        } catch (InterruptedException e) {
            throw new ConnectException("Interrupted while waiting for coordinator shutdown", e);
        }
    }
}
