/*
 * Decompiled with CFR 0.152.
 */
package com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.errors;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.header.internals.RecordHeaders;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.header.Header;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.InternalSinkRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.errors.Stage;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.sink.ErrantRecordReporter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.sink.SinkRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.sink.SinkTask;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.Converter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.HeaderConverter;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerErrantRecordReporter
implements ErrantRecordReporter {
    private static final Logger log = LoggerFactory.getLogger(WorkerErrantRecordReporter.class);
    private final RetryWithToleranceOperator retryWithToleranceOperator;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private final HeaderConverter headerConverter;
    protected final LinkedList<Future<Void>> futures;

    public WorkerErrantRecordReporter(RetryWithToleranceOperator retryWithToleranceOperator, Converter keyConverter, Converter valueConverter, HeaderConverter headerConverter) {
        this.retryWithToleranceOperator = retryWithToleranceOperator;
        this.keyConverter = keyConverter;
        this.valueConverter = valueConverter;
        this.headerConverter = headerConverter;
        this.futures = new LinkedList();
    }

    @Override
    public Future<Void> report(SinkRecord record, Throwable error) {
        ConsumerRecord<byte[], byte[]> consumerRecord;
        if (record instanceof InternalSinkRecord) {
            consumerRecord = ((InternalSinkRecord)record).originalRecord();
        } else {
            String topic = record.topic();
            byte[] key = this.keyConverter.fromConnectData(topic, record.keySchema(), record.key());
            byte[] value = this.valueConverter.fromConnectData(topic, record.valueSchema(), record.value());
            RecordHeaders headers = new RecordHeaders();
            if (record.headers() != null) {
                for (Header header : record.headers()) {
                    String headerKey = header.key();
                    byte[] rawHeader = this.headerConverter.fromConnectHeader(topic, headerKey, header.schema(), header.value());
                    headers.add(headerKey, rawHeader);
                }
            }
            int keyLength = key != null ? key.length : -1;
            int valLength = value != null ? value.length : -1;
            consumerRecord = new ConsumerRecord<byte[], byte[]>(record.topic(), record.kafkaPartition(), record.kafkaOffset(), record.timestamp(), record.timestampType(), -1L, keyLength, valLength, key, value, headers);
        }
        Future<Void> future = this.retryWithToleranceOperator.executeFailed(Stage.TASK_PUT, SinkTask.class, consumerRecord, error);
        if (!future.isDone()) {
            this.futures.add(future);
        }
        return future;
    }

    public void awaitAllFutures() {
        Future<Void> future;
        while ((future = this.futures.poll()) != null) {
            try {
                future.get();
            }
            catch (InterruptedException | ExecutionException e) {
                log.error("Encountered an error while awaiting an errant record future's completion.");
                throw new ConnectException(e);
            }
        }
    }

    public static class ErrantRecordFuture
    implements Future<Void> {
        private final List<Future<RecordMetadata>> futures;

        public ErrantRecordFuture(List<Future<RecordMetadata>> producerFutures) {
            this.futures = producerFutures;
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            throw new UnsupportedOperationException("Reporting an errant record cannot be cancelled.");
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return this.futures.stream().allMatch(Future::isDone);
        }

        @Override
        public Void get() throws InterruptedException, ExecutionException {
            for (Future<RecordMetadata> future : this.futures) {
                future.get();
            }
            return null;
        }

        @Override
        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            for (Future<RecordMetadata> future : this.futures) {
                future.get(timeout, unit);
            }
            return null;
        }
    }
}

