package org.apache.flink.connectors.hive;

import java.time.Duration;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/connectors/hive/HiveTableSpeculativeSinkITCase.class */
class HiveTableSpeculativeSinkITCase {
    private static final int PARALLELISM = 3;

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setConfiguration(configure(new Configuration())).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());
    private HiveCatalog hiveCatalog;

    HiveTableSpeculativeSinkITCase() {
    }

    @BeforeEach
    void createCatalog() {
        this.hiveCatalog = HiveTestUtils.createHiveCatalog();
        this.hiveCatalog.open();
    }

    @AfterEach
    void closeCatalog() {
        if (this.hiveCatalog != null) {
            this.hiveCatalog.close();
        }
    }

    @Test
    void testBatchWritingWithoutCompactionWithSpeculativeSink() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment(configure(new Configuration())), EnvironmentSettings.inBatchMode());
        create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
        create.getConfig().setSqlDialect(SqlDialect.HIVE);
        create.registerCatalog(this.hiveCatalog.getName(), this.hiveCatalog);
        create.useCatalog(this.hiveCatalog.getName());
        TableEnvExecutorUtil.executeInSeparateDatabase(create, true, () -> {
            create.executeSql("create table append_table(i int, j int) TBLPROPERTIES ('sink.parallelism' ='3')");
            Table fromChangelogStream = create.fromChangelogStream(create.toChangelogStream(create.sqlQuery("select 0, 0")).map(new RichMapFunction<Row, Row>() { // from class: org.apache.flink.connectors.hive.HiveTableSpeculativeSinkITCase.1
                static final /* synthetic */ boolean $assertionsDisabled;

                public Row map(Row row) throws Exception {
                    if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
                        Thread.sleep(2147483647L);
                    }
                    if (!$assertionsDisabled && getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
                        throw new AssertionError();
                    }
                    row.setField(1, Integer.valueOf(getRuntimeContext().getTaskInfo().getAttemptNumber()));
                    return row;
                }

                static {
                    $assertionsDisabled = !HiveTableSpeculativeSinkITCase.class.desiredAssertionStatus();
                }
            }).name("slowMap").returns(Types.ROW_NAMED(new String[]{"i", "j"}, new TypeInformation[]{Types.INT, Types.INT})).setParallelism(PARALLELISM), Schema.newBuilder().column("i", DataTypes.INT()).column("j", DataTypes.INT()).build(), ChangelogMode.insertOnly());
            create.getConfig().setSqlDialect(SqlDialect.DEFAULT);
            create.createTemporaryView("mappedTable", fromChangelogStream);
            StreamTableEnvironmentImpl streamTableEnvironmentImpl = (StreamTableEnvironmentImpl) create;
            for (JobVertex jobVertex : streamTableEnvironmentImpl.execEnv().generateStreamGraph(streamTableEnvironmentImpl.getPlanner().translate(Collections.singletonList((ModifyOperation) streamTableEnvironmentImpl.getParser().parse("insert into append_table select * from mappedTable").get(0)))).getJobGraph().getVertices()) {
                if (jobVertex.getName().contains("slowMap")) {
                    Assertions.assertThat(jobVertex.getName().contains("Sink")).isTrue();
                }
            }
            create.executeSql("insert into append_table select * from mappedTable").await();
            List iteratorToList = CollectionUtil.iteratorToList(create.executeSql("select * from append_table").collect());
            iteratorToList.sort(Comparator.comparingInt(row -> {
                return ((Integer) row.getField(0)).intValue();
            }));
            Assertions.assertThat(iteratorToList).isEqualTo(Collections.singletonList(Row.of(new Object[]{0, 1})));
        });
    }

    private static Configuration configure(Configuration configuration) {
        configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
        configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, Duration.ZERO);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, Double.valueOf(1.0d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, Double.valueOf(0.2d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, Duration.ofMillis(0L));
        return configuration;
    }
}
