package org.apache.flink.table.store.connector.sink;

import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.connector.FileStoreITCase;
import org.apache.flink.table.store.connector.TableStore;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaLogSinkProvider;
import org.apache.flink.table.store.kafka.KafkaLogSourceProvider;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
import org.apache.flink.table.store.kafka.KafkaLogTestUtils;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.table.store.log.LogOptions;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
/* loaded from: input_file:org/apache/flink/table/store/connector/sink/LogStoreSinkITCase.class */
public class LogStoreSinkITCase extends KafkaTableTestBase {
    @Test
    public void testStreamingPartitioned() throws Exception {
        innerTest("testStreamingPartitioned", false, true, true, true);
    }

    @Test
    public void testStreamingNonPartitioned() throws Exception {
        innerTest("testStreamingNonPartitioned", false, false, true, true);
    }

    @Test
    public void testBatchPartitioned() throws Exception {
        innerTest("testBatchPartitioned", true, true, true, true);
    }

    @Test
    public void testStreamingEventual() throws Exception {
        innerTest("testStreamingEventual", false, true, false, true);
    }

    @Test
    public void testStreamingPartitionedNonKey() throws Exception {
        innerTest("testStreamingPartitionedNonKey", false, true, true, false);
    }

    @Test
    public void testBatchPartitionedNonKey() throws Exception {
        innerTest("testBatchPartitionedNonKey", true, true, true, false);
    }

    private void innerTest(String str, boolean z, boolean z2, boolean z3, boolean z4) throws Exception {
        Row[] rowArr;
        StreamExecutionEnvironment buildBatchEnv = z ? FileStoreITCase.buildBatchEnv() : FileStoreITCase.buildStreamEnv();
        TableStore buildTableStore = FileStoreITCase.buildTableStore(z || !z3, TEMPORARY_FOLDER);
        if (z2) {
            if (z4) {
                buildTableStore.withPrimaryKeys(new int[]{1, 2});
            } else {
                buildTableStore.withPrimaryKeys(new int[0]);
            }
            buildTableStore.withPartitions(new int[]{1});
        } else {
            buildTableStore.withPartitions(new int[0]);
        }
        if (!z4) {
            buildTableStore.withPrimaryKeys(new int[0]);
        }
        DynamicTableFactory.Context testContext = KafkaLogTestUtils.testContext(str, getBootstrapServers(), LogOptions.LogChangelogMode.AUTO, z3 ? LogOptions.LogConsistency.TRANSACTIONAL : LogOptions.LogConsistency.EVENTUAL, FileStoreITCase.TABLE_TYPE, z4 ? new int[]{2} : new int[0]);
        KafkaLogStoreFactory discoverKafkaLogFactory = KafkaLogTestUtils.discoverKafkaLogFactory();
        KafkaLogSinkProvider createSinkProvider = discoverKafkaLogFactory.createSinkProvider(testContext, KafkaLogTestUtils.SINK_CONTEXT);
        KafkaLogSourceProvider createSourceProvider = discoverKafkaLogFactory.createSourceProvider(testContext, KafkaLogTestUtils.SOURCE_CONTEXT, (int[][]) null);
        discoverKafkaLogFactory.onCreateTable(testContext, 3, true);
        try {
            buildTableStore.sinkBuilder().withInput(FileStoreITCase.buildTestSource(buildBatchEnv, z)).withLogSinkProvider(createSinkProvider).build();
            buildBatchEnv.execute();
            List<Row> executeAndCollect = FileStoreITCase.executeAndCollect(buildTableStore.sourceBuilder().withEnv(buildBatchEnv).build());
            if (z4) {
                rowArr = z2 ? new Row[]{Row.of(new Object[]{5, "p2", 1}), Row.of(new Object[]{3, "p2", 5}), Row.of(new Object[]{5, "p1", 1}), Row.of(new Object[]{0, "p1", 2})} : new Row[]{Row.of(new Object[]{5, "p2", 1}), Row.of(new Object[]{0, "p1", 2}), Row.of(new Object[]{3, "p2", 5})};
            } else {
                Stream<RowData> stream = z ? FileStoreITCase.SOURCE_DATA.stream() : Stream.concat(FileStoreITCase.SOURCE_DATA.stream(), FileStoreITCase.SOURCE_DATA.stream());
                DataStructureConverter<RowData, Row> dataStructureConverter = FileStoreITCase.CONVERTER;
                dataStructureConverter.getClass();
                rowArr = (Row[]) stream.map((v1) -> {
                    return r1.toExternal(v1);
                }).toArray(i -> {
                    return new Row[i];
                });
            }
            Assertions.assertThat(executeAndCollect).containsExactlyInAnyOrder(rowArr);
            CloseableIterator executeAndCollect2 = buildTableStore.sourceBuilder().withContinuousMode(true).withLogSourceProvider(createSourceProvider).withEnv(FileStoreITCase.buildStreamEnv()).build().executeAndCollect();
            DataStructureConverter<RowData, Row> dataStructureConverter2 = FileStoreITCase.CONVERTER;
            dataStructureConverter2.getClass();
            Assertions.assertThat(BlockingIterator.of(executeAndCollect2, (v1) -> {
                return r1.toExternal(v1);
            }).collectAndClose(rowArr.length)).containsExactlyInAnyOrder(rowArr);
            discoverKafkaLogFactory.onDropTable(testContext, true);
        } catch (Throwable th) {
            discoverKafkaLogFactory.onDropTable(testContext, true);
            throw th;
        }
    }
}
