/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.Recyclable;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.RetriableException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KafkaCommitter
implements Committer<KafkaCommittable>,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE = "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.";
    private final Properties kafkaProducerConfig;
    @Nullable
    private FlinkKafkaInternalProducer<?, ?> recoveryProducer;

    KafkaCommitter(Properties kafkaProducerConfig) {
        this.kafkaProducerConfig = kafkaProducerConfig;
    }

    public List<KafkaCommittable> commit(List<KafkaCommittable> committables) throws IOException {
        ArrayList<KafkaCommittable> retryableCommittables = new ArrayList<KafkaCommittable>();
        Exception collected = null;
        for (KafkaCommittable committable : committables) {
            String transactionalId = committable.getTransactionalId();
            LOG.debug("Committing Kafka transaction {}", (Object)transactionalId);
            Optional<Recyclable<FlinkKafkaInternalProducer<?, ?>>> recyclable = committable.getProducer();
            try {
                FlinkKafkaInternalProducer producer = recyclable.map(Recyclable::getObject).orElseGet(() -> this.getRecoveryProducer(committable));
                producer.commitTransaction();
                producer.flush();
            }
            catch (RetriableException e) {
                LOG.warn("Encountered retriable exception while committing {}.", (Object)transactionalId, (Object)e);
                retryableCommittables.add(committable);
                continue;
            }
            catch (ProducerFencedException e) {
                LOG.error("Unable to commit transaction ({}) because its producer is already fenced. This means that you either have a different producer with the same '{}' (this is unlikely with the '{}' as all generated ids are unique and shouldn't be reused) or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss, please consult the Flink documentation for more details.", new Object[]{committable, "transactional.id", KafkaSink.class.getSimpleName(), "transaction.timeout.ms", this.kafkaProducerConfig.getProperty("transaction.timeout.ms"), e});
            }
            catch (InvalidTxnStateException e) {
                LOG.error("Unable to commit transaction ({}) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", (Object)committable, (Object)e);
            }
            catch (UnknownProducerIdException e) {
                LOG.error("Unable to commit transaction ({}) because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\nTo avoid data loss, the application will restart.", (Object)committable, (Object)e);
            }
            catch (Exception e) {
                LOG.error("Transaction ({}) encountered error and data has been potentially lost.", (Object)committable, (Object)e);
                collected = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, collected);
            }
            recyclable.ifPresent(Recyclable::close);
            if (collected == null) continue;
            throw new FlinkRuntimeException("Some committables were not committed and committing failed with:", (Throwable)collected);
        }
        return retryableCommittables;
    }

    @Override
    public void close() {
        if (this.recoveryProducer != null) {
            this.recoveryProducer.close();
        }
    }

    private FlinkKafkaInternalProducer<?, ?> getRecoveryProducer(KafkaCommittable committable) {
        if (this.recoveryProducer == null) {
            this.recoveryProducer = new FlinkKafkaInternalProducer(this.kafkaProducerConfig, committable.getTransactionalId());
        } else {
            this.recoveryProducer.setTransactionId(committable.getTransactionalId());
        }
        this.recoveryProducer.resumeTransaction(committable.getProducerId(), committable.getEpoch());
        return this.recoveryProducer;
    }
}

