/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.utils.factory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

public class CollectSinkTableFactory
implements DynamicTableSinkFactory {
    public static final String FACTORY_ID = "collect";
    public static final Map<Integer, List<Row>> RESULT = new HashMap<Integer, List<Row>>();

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper((DynamicTableFactory)this, (DynamicTableFactory.Context)context);
        helper.validate();
        TableSchema schema = context.getCatalogTable().getSchema();
        RESULT.clear();
        return new CollectTableSink(schema, context.getObjectIdentifier().getObjectName());
    }

    public String factoryIdentifier() {
        return FACTORY_ID;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return Collections.emptySet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return Collections.emptySet();
    }

    static class CollectSinkFunction
    extends RichSinkFunction<RowData>
    implements CheckpointedFunction {
        private static final long serialVersionUID = 1L;
        private final DynamicTableSink.DataStructureConverter converter;
        private final RowTypeInfo rowTypeInfo;
        protected transient ListState<Row> resultState;
        protected transient List<Row> localResult;
        private int taskID;

        protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter, RowTypeInfo rowTypeInfo) {
            this.converter = converter;
            this.rowTypeInfo = rowTypeInfo;
        }

        public void invoke(RowData value, SinkFunction.Context context) {
            Row row = (Row)this.converter.toExternal((Object)value);
            assert (row != null);
            row.setKind(value.getRowKind());
            RESULT.get(this.taskID).add(row);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.resultState = context.getOperatorStateStore().getListState(new ListStateDescriptor("sink-results", (TypeInformation)this.rowTypeInfo));
            this.localResult = new ArrayList<Row>();
            if (context.isRestored()) {
                for (Row value : (Iterable)this.resultState.get()) {
                    this.localResult.add(value);
                }
            }
            this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
            Class<CollectSinkTableFactory> clazz = CollectSinkTableFactory.class;
            synchronized (CollectSinkTableFactory.class) {
                RESULT.put(this.taskID, this.localResult);
                // ** MonitorExit[var2_2] (shouldn't be in output)
                return;
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.resultState.clear();
            this.resultState.addAll(RESULT.get(this.taskID));
        }
    }

    private static class CollectTableSink
    implements DynamicTableSink {
        private final TableSchema schema;
        private final String tableName;

        private CollectTableSink(TableSchema schema, String tableName) {
            this.schema = schema;
            this.tableName = tableName;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_AFTER).build();
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            DataType rowType = this.schema.toPhysicalRowDataType();
            RowTypeInfo rowTypeInfo = (RowTypeInfo)TypeConversions.fromDataTypeToLegacyInfo((DataType)rowType);
            DynamicTableSink.DataStructureConverter converter = context.createDataStructureConverter(this.schema.toPhysicalRowDataType());
            return SinkFunctionProvider.of((SinkFunction)new CollectSinkFunction(converter, rowTypeInfo));
        }

        public DynamicTableSink copy() {
            return new CollectTableSink(this.schema, this.tableName);
        }

        public String asSummaryString() {
            return "CollectSink";
        }
    }
}

