package org.apache.iceberg.connect.channel;

import java.time.OffsetDateTime;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.connect.events.AvroUtil;
import org.apache.iceberg.connect.events.CommitComplete;
import org.apache.iceberg.connect.events.CommitToTable;
import org.apache.iceberg.connect.events.DataComplete;
import org.apache.iceberg.connect.events.DataWritten;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.PayloadType;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/iceberg/connect/channel/CoordinatorTest.class */
public class CoordinatorTest extends ChannelTestBase {
    @Test
    public void testCommitAppend() {
        Assertions.assertThat(this.table.snapshots()).isEmpty();
        OffsetDateTime now = EventTestUtil.now();
        UUID coordinatorTest = coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), now);
        this.table.refresh();
        Assertions.assertThat(this.producer.history()).hasSize(3);
        assertCommitTable(1, coordinatorTest, now);
        assertCommitComplete(2, coordinatorTest, now);
        ImmutableList copyOf = ImmutableList.copyOf(this.table.snapshots());
        Assertions.assertThat(copyOf).hasSize(1);
        Snapshot snapshot = (Snapshot) copyOf.get(0);
        Assertions.assertThat(snapshot.operation()).isEqualTo("append");
        Assertions.assertThat(snapshot.addedDataFiles(this.table.io())).hasSize(1);
        Assertions.assertThat(snapshot.addedDeleteFiles(this.table.io())).isEmpty();
        Assertions.assertThat(snapshot.summary()).containsEntry("kafka.connect.commit-id", coordinatorTest.toString()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}").containsEntry("kafka.connect.valid-through-ts", now.toString());
    }

    @Test
    public void testCommitDelta() {
        OffsetDateTime now = EventTestUtil.now();
        UUID coordinatorTest = coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(EventTestUtil.createDeleteFile()), now);
        Assertions.assertThat(this.producer.history()).hasSize(3);
        assertCommitTable(1, coordinatorTest, now);
        assertCommitComplete(2, coordinatorTest, now);
        ImmutableList copyOf = ImmutableList.copyOf(this.table.snapshots());
        Assertions.assertThat(copyOf).hasSize(1);
        Snapshot snapshot = (Snapshot) copyOf.get(0);
        Assertions.assertThat(snapshot.operation()).isEqualTo("overwrite");
        Assertions.assertThat(snapshot.addedDataFiles(this.table.io())).hasSize(1);
        Assertions.assertThat(snapshot.addedDeleteFiles(this.table.io())).hasSize(1);
        Assertions.assertThat(snapshot.summary()).containsEntry("kafka.connect.commit-id", coordinatorTest.toString()).containsEntry(OFFSETS_SNAPSHOT_PROP, "{\"0\":3}").containsEntry("kafka.connect.valid-through-ts", now.toString());
    }

    @Test
    public void testCommitNoFiles() {
        OffsetDateTime now = EventTestUtil.now();
        UUID coordinatorTest = coordinatorTest(ImmutableList.of(), ImmutableList.of(), now);
        Assertions.assertThat(this.producer.history()).hasSize(2);
        assertCommitComplete(1, coordinatorTest, now);
        Assertions.assertThat(this.table.snapshots()).isEmpty();
    }

    @Test
    public void testCommitError() {
        coordinatorTest(ImmutableList.of(DataFiles.builder(PartitionSpec.builderFor(SCHEMA).withSpecId(1).identity("id").build()).withPath(UUID.randomUUID() + ".parquet").withFormat(FileFormat.PARQUET).withFileSizeInBytes(100L).withRecordCount(5L).build()), ImmutableList.of(), null);
        Assertions.assertThat(this.producer.history()).hasSize(1);
        Assertions.assertThat(this.table.snapshots()).isEmpty();
    }

    private void assertCommitTable(int i, UUID uuid, OffsetDateTime offsetDateTime) {
        Event decode = AvroUtil.decode((byte[]) ((ProducerRecord) this.producer.history().get(i)).value());
        Assertions.assertThat(decode.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE);
        CommitToTable payload = decode.payload();
        Assertions.assertThat(payload.commitId()).isEqualTo(uuid);
        Assertions.assertThat(payload.tableReference().identifier().toString()).isEqualTo(TABLE_IDENTIFIER.toString());
        Assertions.assertThat(payload.validThroughTs()).isEqualTo(offsetDateTime);
    }

    private void assertCommitComplete(int i, UUID uuid, OffsetDateTime offsetDateTime) {
        Event decode = AvroUtil.decode((byte[]) ((ProducerRecord) this.producer.history().get(i)).value());
        Assertions.assertThat(decode.type()).isEqualTo(PayloadType.COMMIT_COMPLETE);
        CommitComplete payload = decode.payload();
        Assertions.assertThat(payload.commitId()).isEqualTo(uuid);
        Assertions.assertThat(payload.validThroughTs()).isEqualTo(offsetDateTime);
    }

    private UUID coordinatorTest(List<DataFile> list, List<DeleteFile> list2, OffsetDateTime offsetDateTime) {
        Mockito.when(Integer.valueOf(this.config.commitIntervalMs())).thenReturn(0);
        Mockito.when(Integer.valueOf(this.config.commitTimeoutMs())).thenReturn(Integer.MAX_VALUE);
        Coordinator coordinator = new Coordinator(this.catalog, this.config, ImmutableList.of(), this.clientFactory, (SinkTaskContext) Mockito.mock(SinkTaskContext.class));
        coordinator.start();
        initConsumer();
        coordinator.process();
        Assertions.assertThat(this.producer.transactionCommitted()).isTrue();
        Assertions.assertThat(this.producer.history()).hasSize(1);
        Event decode = AvroUtil.decode((byte[]) ((ProducerRecord) this.producer.history().get(0)).value());
        Assertions.assertThat(decode.type()).isEqualTo(PayloadType.START_COMMIT);
        UUID commitId = decode.payload().commitId();
        this.consumer.addRecord(new ConsumerRecord("ctl-topic", 0, 1L, "key", AvroUtil.encode(new Event(this.config.connectGroupId(), new DataWritten(Types.StructType.of(new Types.NestedField[0]), commitId, new TableReference("catalog", ImmutableList.of("db"), "tbl"), list, list2)))));
        this.consumer.addRecord(new ConsumerRecord("ctl-topic", 0, 2L, "key", AvroUtil.encode(new Event(this.config.connectGroupId(), new DataComplete(commitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, offsetDateTime)))))));
        Mockito.when(Integer.valueOf(this.config.commitIntervalMs())).thenReturn(0);
        coordinator.process();
        return commitId;
    }

    @Test
    public void testCoordinatorRunning() {
        TopicPartition topicPartition = new TopicPartition("src-topic", 0);
        TopicPartition topicPartition2 = new TopicPartition("src-topic", 1);
        this.sourceConsumer.rebalance(Lists.newArrayList(new TopicPartition[]{topicPartition, topicPartition2, new TopicPartition("src-topic", 2)}));
        Assertions.assertThat(this.mockIcebergSinkTask.isCoordinatorRunning()).isTrue();
        this.sourceConsumer.rebalance(Lists.newArrayList(new TopicPartition[]{topicPartition, topicPartition2}));
        Assertions.assertThat(this.mockIcebergSinkTask.isCoordinatorRunning()).isTrue();
        this.sourceConsumer.rebalance(ImmutableList.of(topicPartition2));
        Assertions.assertThat(this.mockIcebergSinkTask.isCoordinatorRunning()).isFalse();
    }
}
