package org.apache.flink.table.planner.factories;

import java.io.File;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.catalog.StagedTable;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsStaging;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.FileUtils;

/* loaded from: input_file:org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory.class */
public class TestSupportsStagingTableFactory implements DynamicTableSinkFactory {
    public static final String IDENTIFIER = "test-staging";
    public static final List<String> JOB_STATUS_CHANGE_PROCESS = new LinkedList();
    public static final List<SupportsStaging.StagingPurpose> STAGING_PURPOSE_LIST = new LinkedList();
    private static final ConfigOption<String> DATA_DIR = ConfigOptions.key("data-dir").stringType().noDefaultValue().withDescription("The data id used to write the rows.");
    private static final ConfigOption<Boolean> SINK_FAIL = ConfigOptions.key("sink-fail").booleanType().defaultValue(false).withDescription("If set to true, then sink will throw an exception causing the job to fail, used to verify the TestStagedTable#abort.");

    /* loaded from: input_file:org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory$StagedSinkFunction.class */
    private static class StagedSinkFunction extends RichSinkFunction<RowData> {
        private final String dataDir;
        private final boolean sinkFail;

        public StagedSinkFunction(String str, boolean z) {
            this.dataDir = str;
            this.sinkFail = z;
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            File file = new File(this.dataDir);
            if (file.exists()) {
                file.delete();
            }
            file.mkdirs();
            new File(this.dataDir, "_data").createNewFile();
        }

        public void invoke(RowData rowData, SinkFunction.Context context) throws Exception {
            if (this.sinkFail) {
                throw new RuntimeException("Test StagedTable abort method.");
            }
            FileUtils.writeFileUtf8(new File(this.dataDir, "_data"), rowData.getInt(0) + "," + rowData.getString(1));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory$SupportsStagingTableSink.class */
    private static class SupportsStagingTableSink implements DynamicTableSink, SupportsStaging {
        private final String dataDir;
        private final boolean sinkFail;
        private TestStagedTable stagedTable;

        public SupportsStagingTableSink(String str, boolean z) {
            this(str, z, null);
        }

        public SupportsStagingTableSink(String str, boolean z, TestStagedTable testStagedTable) {
            this.dataDir = str;
            this.sinkFail = z;
            this.stagedTable = testStagedTable;
        }

        public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
            return ChangelogMode.insertOnly();
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            return new DataStreamSinkProvider() { // from class: org.apache.flink.table.planner.factories.TestSupportsStagingTableFactory.SupportsStagingTableSink.1
                public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                    return dataStream.addSink(new StagedSinkFunction(SupportsStagingTableSink.this.dataDir, SupportsStagingTableSink.this.sinkFail)).setParallelism(1);
                }
            };
        }

        public DynamicTableSink copy() {
            return new SupportsStagingTableSink(this.dataDir, this.sinkFail, this.stagedTable);
        }

        public String asSummaryString() {
            return "SupportsStagingTableSink";
        }

        public StagedTable applyStaging(SupportsStaging.StagingContext stagingContext) {
            TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.clear();
            TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.clear();
            this.stagedTable = new TestStagedTable(this.dataDir);
            TestSupportsStagingTableFactory.STAGING_PURPOSE_LIST.add(stagingContext.getStagingPurpose());
            return this.stagedTable;
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/factories/TestSupportsStagingTableFactory$TestStagedTable.class */
    private static class TestStagedTable implements StagedTable {
        private final String dataDir;

        public TestStagedTable(String str) {
            this.dataDir = str;
        }

        public void begin() {
            TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.add("begin");
        }

        public void commit() {
            TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.add("commit");
            new File(this.dataDir, "_data").renameTo(new File(this.dataDir, "data"));
        }

        public void abort() {
            TestSupportsStagingTableFactory.JOB_STATUS_CHANGE_PROCESS.add("abort");
        }
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper createTableFactoryHelper = FactoryUtil.createTableFactoryHelper(this, context);
        createTableFactoryHelper.validate();
        return new SupportsStagingTableSink((String) createTableFactoryHelper.getOptions().get(DATA_DIR), ((Boolean) createTableFactoryHelper.getOptions().get(SINK_FAIL)).booleanValue());
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.singleton(DATA_DIR);
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return Collections.singleton(SINK_FAIL);
    }
}
