package org.apache.iceberg.connect.channel;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.Offset;
import org.apache.iceberg.connect.events.AvroUtil;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/connect/channel/Channel.class */
abstract class Channel {
    private static final Logger LOG = LoggerFactory.getLogger(Channel.class);
    private final String controlTopic;
    private final String connectGroupId;
    private final Producer<String, byte[]> producer;
    private final Consumer<String, byte[]> consumer;
    private final SinkTaskContext context;
    private final Admin admin;
    private final Map<Integer, Long> controlTopicOffsets = Maps.newHashMap();
    private final String producerId = UUID.randomUUID().toString();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Channel(String str, String str2, IcebergSinkConfig icebergSinkConfig, KafkaClientFactory kafkaClientFactory, SinkTaskContext sinkTaskContext) {
        this.controlTopic = icebergSinkConfig.controlTopic();
        this.connectGroupId = icebergSinkConfig.connectGroupId();
        this.context = sinkTaskContext;
        this.producer = kafkaClientFactory.createProducer(str + icebergSinkConfig.transactionalSuffix());
        this.consumer = kafkaClientFactory.createConsumer(str2);
        this.admin = kafkaClientFactory.createAdmin();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(Event event) {
        send(ImmutableList.of(event), ImmutableMap.of());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void send(List<Event> list, Map<TopicPartition, Offset> map) {
        HashMap newHashMap = Maps.newHashMap();
        map.forEach((topicPartition, offset) -> {
        });
        List list2 = (List) list.stream().map(event -> {
            LOG.info("Sending event of type: {}", event.type().name());
            return new ProducerRecord(this.controlTopic, this.producerId, AvroUtil.encode(event));
        }).collect(Collectors.toList());
        synchronized (this.producer) {
            this.producer.beginTransaction();
            try {
                Producer<String, byte[]> producer = this.producer;
                Objects.requireNonNull(producer);
                list2.forEach(producer::send);
                if (!map.isEmpty()) {
                    this.producer.sendOffsetsToTransaction(newHashMap, KafkaUtils.consumerGroupMetadata(this.context));
                }
                this.producer.commitTransaction();
            } catch (Exception e) {
                try {
                    this.producer.abortTransaction();
                } catch (Exception e2) {
                    LOG.warn("Error aborting producer transaction", e2);
                }
                throw e;
            }
        }
    }

    protected abstract boolean receive(Envelope envelope);

    /* JADX INFO: Access modifiers changed from: protected */
    public void consumeAvailable(Duration duration) {
        ConsumerRecords poll = this.consumer.poll(duration);
        while (true) {
            ConsumerRecords consumerRecords = poll;
            if (consumerRecords.isEmpty()) {
                return;
            }
            consumerRecords.forEach(consumerRecord -> {
                this.controlTopicOffsets.put(Integer.valueOf(consumerRecord.partition()), Long.valueOf(consumerRecord.offset() + 1));
                Event decode = AvroUtil.decode((byte[]) consumerRecord.value());
                if (decode.groupId().equals(this.connectGroupId)) {
                    LOG.debug("Received event of type: {}", decode.type().name());
                    if (receive(new Envelope(decode, consumerRecord.partition(), consumerRecord.offset()))) {
                        LOG.info("Handled event of type: {}", decode.type().name());
                    }
                }
            });
            poll = this.consumer.poll(duration);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, Long> controlTopicOffsets() {
        return this.controlTopicOffsets;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitConsumerOffsets() {
        HashMap newHashMap = Maps.newHashMap();
        controlTopicOffsets().forEach((num, l) -> {
        });
        this.consumer.commitSync(newHashMap);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.consumer.subscribe(ImmutableList.of(this.controlTopic));
        consumeAvailable(Duration.ofSeconds(1L));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stop() {
        LOG.info("Channel stopping");
        this.producer.close();
        this.consumer.close();
        this.admin.close();
    }
}
