package org.apache.iceberg.connect.channel;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.connect.data.Offset;
import org.apache.iceberg.connect.data.SinkWriter;
import org.apache.iceberg.connect.data.SinkWriterResult;
import org.apache.iceberg.connect.events.DataComplete;
import org.apache.iceberg.connect.events.DataWritten;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/iceberg/connect/channel/Worker.class */
public class Worker extends Channel {
    private final IcebergSinkConfig config;
    private final SinkTaskContext context;
    private final SinkWriter sinkWriter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public Worker(IcebergSinkConfig icebergSinkConfig, KafkaClientFactory kafkaClientFactory, SinkWriter sinkWriter, SinkTaskContext sinkTaskContext) {
        super("worker", "cg-control-" + UUID.randomUUID(), icebergSinkConfig, kafkaClientFactory, sinkTaskContext);
        this.config = icebergSinkConfig;
        this.context = sinkTaskContext;
        this.sinkWriter = sinkWriter;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void process() {
        consumeAvailable(Duration.ZERO);
    }

    @Override // org.apache.iceberg.connect.channel.Channel
    protected boolean receive(Envelope envelope) {
        Event event = envelope.event();
        if (event.payload().type() != PayloadType.START_COMMIT) {
            return false;
        }
        SinkWriterResult completeWrite = this.sinkWriter.completeWrite();
        List list = (List) this.context.assignment().stream().map(topicPartition -> {
            Offset offset = completeWrite.sourceOffsets().get(topicPartition);
            if (offset == null) {
                offset = Offset.NULL_OFFSET;
            }
            return new TopicPartitionOffset(topicPartition.topic(), topicPartition.partition(), offset.offset(), offset.timestamp());
        }).collect(Collectors.toList());
        UUID commitId = event.payload().commitId();
        List<Event> list2 = (List) completeWrite.writerResults().stream().map(icebergWriterResult -> {
            return new Event(this.config.connectGroupId(), new DataWritten(icebergWriterResult.partitionStruct(), commitId, TableReference.of(this.config.catalogName(), icebergWriterResult.tableIdentifier()), icebergWriterResult.dataFiles(), icebergWriterResult.deleteFiles()));
        }).collect(Collectors.toList());
        list2.add(new Event(this.config.connectGroupId(), new DataComplete(commitId, list)));
        send(list2, completeWrite.sourceOffsets());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.iceberg.connect.channel.Channel
    public void stop() {
        super.stop();
        this.sinkWriter.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void save(Collection<SinkRecord> collection) {
        this.sinkWriter.save(collection);
    }
}
