package org.apache.iceberg.flink.sink.shuffle;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.util.ExceptionUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.class */
public class TestDataStatisticsCoordinator {
    private static final String OPERATOR_NAME = "TestCoordinator";
    private static final OperatorID TEST_OPERATOR_ID = new OperatorID(1234, 5678);
    private static final int NUM_SUBTASKS = 2;
    private final Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "str", Types.StringType.get())});
    private final SortOrder sortOrder = ((SortOrder.Builder) SortOrder.builderFor(this.schema).asc("str")).build();
    private final SortKey sortKey = new SortKey(this.schema, this.sortOrder);
    private final MapDataStatisticsSerializer statisticsSerializer = MapDataStatisticsSerializer.fromSortKeySerializer(new SortKeySerializer(this.schema, this.sortOrder));
    private EventReceivingTasks receivingTasks;
    private DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator;

    @Before
    public void before() throws Exception {
        this.receivingTasks = EventReceivingTasks.createForRunningTasks();
        this.dataStatisticsCoordinator = new DataStatisticsCoordinator<>(OPERATOR_NAME, new MockOperatorCoordinatorContext(TEST_OPERATOR_ID, NUM_SUBTASKS), this.statisticsSerializer);
    }

    private void tasksReady() throws Exception {
        this.dataStatisticsCoordinator.start();
        setAllTasksReady(NUM_SUBTASKS, this.dataStatisticsCoordinator, this.receivingTasks);
    }

    @Test
    public void testThrowExceptionWhenNotStarted() {
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, DataStatisticsEvent.create(0L, new MapDataStatistics(), this.statisticsSerializer));
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.executionAttemptFailed(0, 0, (Throwable) null);
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
        Assertions.assertThatThrownBy(() -> {
            this.dataStatisticsCoordinator.checkpointCoordinator(0L, (CompletableFuture) null);
        }).isInstanceOf(IllegalStateException.class).hasMessage("The coordinator of TestCoordinator has not started yet.");
    }

    @Test
    public void testDataStatisticsEventHandling() throws Exception {
        tasksReady();
        SortKey copy = this.sortKey.copy();
        MapDataStatistics mapDataStatistics = new MapDataStatistics();
        copy.set(0, "a");
        mapDataStatistics.add(copy);
        copy.set(0, "b");
        mapDataStatistics.add(copy);
        copy.set(0, "b");
        mapDataStatistics.add(copy);
        copy.set(0, "c");
        mapDataStatistics.add(copy);
        copy.set(0, "c");
        mapDataStatistics.add(copy);
        copy.set(0, "c");
        mapDataStatistics.add(copy);
        DataStatisticsEvent create = DataStatisticsEvent.create(1L, mapDataStatistics, this.statisticsSerializer);
        MapDataStatistics mapDataStatistics2 = new MapDataStatistics();
        copy.set(0, "a");
        mapDataStatistics2.add(copy);
        copy.set(0, "b");
        mapDataStatistics2.add(copy);
        copy.set(0, "c");
        mapDataStatistics2.add(copy);
        copy.set(0, "c");
        mapDataStatistics2.add(copy);
        DataStatisticsEvent create2 = DataStatisticsEvent.create(1L, mapDataStatistics2, this.statisticsSerializer);
        this.dataStatisticsCoordinator.handleEventFromOperator(0, 0, create);
        this.dataStatisticsCoordinator.handleEventFromOperator(1, 0, create2);
        waitForCoordinatorToProcessActions(this.dataStatisticsCoordinator);
        SortKey copy2 = this.sortKey.copy();
        copy2.set(0, "a");
        SortKey copy3 = this.sortKey.copy();
        copy3.set(0, "b");
        SortKey copy4 = this.sortKey.copy();
        copy4.set(0, "c");
        Assertions.assertThat(this.dataStatisticsCoordinator.completedStatistics().dataStatistics().statistics()).containsExactlyInAnyOrderEntriesOf(ImmutableMap.of(copy2, 2L, copy3, 3L, copy4, 5L));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void setAllTasksReady(int i, DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator, EventReceivingTasks eventReceivingTasks) {
        for (int i2 = 0; i2 < i; i2++) {
            dataStatisticsCoordinator.executionAttemptReady(i2, 0, eventReceivingTasks.createGatewayForSubtask(i2, 0));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void waitForCoordinatorToProcessActions(DataStatisticsCoordinator<MapDataStatistics, Map<SortKey, Long>> dataStatisticsCoordinator) {
        CompletableFuture completableFuture = new CompletableFuture();
        dataStatisticsCoordinator.callInCoordinatorThread(() -> {
            completableFuture.complete(null);
            return null;
        }, "Coordinator fails to process action");
        try {
            completableFuture.get();
        } catch (InterruptedException e) {
            throw new AssertionError("test interrupted");
        } catch (ExecutionException e2) {
            ExceptionUtils.rethrow(ExceptionUtils.stripExecutionException(e2));
        }
    }
}
