package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinatorProvider.class */
public class TestDataStatisticsCoordinatorProvider {
    private static final OperatorID OPERATOR_ID = new OperatorID();
    private static final int NUM_SUBTASKS = 1;
    private final Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(NUM_SUBTASKS, "str", Types.StringType.get())});
    private final SortOrder sortOrder = ((SortOrder.Builder) SortOrder.builderFor(this.schema).asc("str")).build();
    private final SortKey sortKey = new SortKey(this.schema, this.sortOrder);
    private final MapDataStatisticsSerializer statisticsSerializer = MapDataStatisticsSerializer.fromSortKeySerializer(new SortKeySerializer(this.schema, this.sortOrder));
    private DataStatisticsCoordinatorProvider<MapDataStatistics, Map<SortKey, Long>> provider;
    private EventReceivingTasks receivingTasks;

    @Before
    public void before() {
        this.provider = new DataStatisticsCoordinatorProvider<>("DataStatisticsCoordinatorProvider", OPERATOR_ID, this.statisticsSerializer);
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
    }

    @Test
    public void testCheckpointAndReset() throws Exception {
        SortKey copy = this.sortKey.copy();
        copy.set(0, "a");
        SortKey copy2 = this.sortKey.copy();
        copy2.set(0, "b");
        SortKey copy3 = this.sortKey.copy();
        copy3.set(0, "c");
        SortKey copy4 = this.sortKey.copy();
        copy4.set(0, "c");
        SortKey copy5 = this.sortKey.copy();
        copy5.set(0, "c");
        RecreateOnResetOperatorCoordinator create = this.provider.create(new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SUBTASKS));
        Throwable th = null;
        try {
            try {
                DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator = (DataStatisticsCoordinator) create.getInternalCoordinator();
                create.start();
                TestDataStatisticsCoordinator.setAllTasksReady(NUM_SUBTASKS, dataStatisticsCoordinator, this.receivingTasks);
                MapDataStatistics mapDataStatistics = new MapDataStatistics();
                mapDataStatistics.add(copy);
                mapDataStatistics.add(copy2);
                mapDataStatistics.add(copy3);
                create.handleEventFromOperator(0, 0, DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer));
                TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(dataStatisticsCoordinator);
                MapDataStatistics dataStatistics = dataStatisticsCoordinator.completedStatistics().dataStatistics();
                Assertions.assertThat(dataStatistics.statistics()).isEqualTo(mapDataStatistics.statistics());
                byte[] waitForCheckpoint = waitForCheckpoint(1L, dataStatisticsCoordinator);
                MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
                mapDataStatistics2.add(copy4);
                mapDataStatistics2.add(copy5);
                mapDataStatistics2.add(copy5);
                create.handleEventFromOperator(0, 0, DataStatisticsEvent.create(2L, mapDataStatistics2, this.statisticsSerializer));
                TestDataStatisticsCoordinator.waitForCoordinatorToProcessActions(dataStatisticsCoordinator);
                Assertions.assertThat(dataStatisticsCoordinator.completedStatistics().dataStatistics().statistics()).isEqualTo(mapDataStatistics2.statistics());
                waitForCheckpoint(2L, dataStatisticsCoordinator);
                create.resetToCheckpoint(1L, waitForCheckpoint);
                DataStatisticsCoordinator internalCoordinator = create.getInternalCoordinator();
                Assertions.assertThat(dataStatisticsCoordinator).isNotEqualTo(internalCoordinator);
                Assertions.assertThat(internalCoordinator.completedStatistics().dataStatistics().statistics()).isEqualTo(dataStatistics.statistics());
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private byte[] waitForCheckpoint(long j, DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator) throws InterruptedException, ExecutionException {
        CompletableFuture completableFuture = new CompletableFuture();
        dataStatisticsCoordinator.checkpointCoordinator(j, completableFuture);
        return (byte[]) completableFuture.get();
    }
}
