/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorEventGateway;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.CollectOutputAdapter;
import org.apache.hudi.adapter.TestStreamConfigs;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.sort.SortOperator;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.StreamerUtil;

public class BulkInsertFunctionWrapper<I>
implements TestFunctionWrapper<I> {
    private final Configuration conf;
    private final RowType rowType;
    private final RowType rowTypeWithFileId;
    private final IOManager ioManager = new IOManagerAsync();
    private final MockStreamingRuntimeContext runtimeContext;
    private final MockOperatorEventGateway gateway;
    private final MockOperatorCoordinatorContext coordinatorContext;
    private StreamWriteOperatorCoordinator coordinator;
    private final boolean needSortInput;
    private BulkInsertWriteFunction<RowData> writeFunction;
    private MapFunction<RowData, RowData> mapFunction;
    private Map<String, String> bucketIdToFileId;
    private SortOperator sortOperator;
    private CollectOutputAdapter<RowData> output;

    public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.gateway = new MockOperatorEventGateway();
        this.conf = conf;
        this.rowType = (RowType)AvroSchemaConverter.convertToDataType((Schema)StreamerUtil.getSourceSchema((Configuration)conf)).getLogicalType();
        this.rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId((RowType)this.rowType);
        this.coordinatorContext = new MockOperatorCoordinatorContext(new OperatorID(), 1);
        this.coordinator = new StreamWriteOperatorCoordinator(conf, (OperatorCoordinator.Context)this.coordinatorContext);
        this.needSortInput = conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT);
    }

    @Override
    public void openFunction() throws Exception {
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
        this.setupWriteFunction();
        this.setupMapFunction();
        if (this.needSortInput) {
            this.setupSortOperator();
        }
    }

    @Override
    public void invoke(I record) throws Exception {
        RowData recordWithFileId = (RowData)this.mapFunction.map((Object)((RowData)record));
        if (this.needSortInput) {
            this.sortOperator.processElement(new StreamRecord((Object)recordWithFileId));
        } else {
            this.writeFunction.processElement((Object)recordWithFileId, null, null);
        }
    }

    @Override
    public WriteMetadataEvent[] getEventBuffer() {
        return this.coordinator.getEventBuffer();
    }

    @Override
    public OperatorEvent getNextEvent() {
        return this.gateway.getNextEvent();
    }

    @Override
    public void checkpointFunction(long checkpointId) {
    }

    @Override
    public void endInput() {
        if (this.needSortInput) {
            try {
                this.sortOperator.endInput();
                List sortedRecords = this.output.getRecords();
                for (RowData record : sortedRecords) {
                    this.writeFunction.processElement((Object)record, null, null);
                }
            }
            catch (Exception e) {
                throw new HoodieException((Throwable)e);
            }
        }
        this.writeFunction.endInput();
        if (this.bucketIdToFileId != null) {
            this.bucketIdToFileId.clear();
        }
    }

    @Override
    public void checkpointComplete(long checkpointId) {
        this.coordinator.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void coordinatorFails() throws Exception {
        this.coordinator.close();
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
    }

    @Override
    public void restartCoordinator() throws Exception {
        this.coordinator.close();
        this.coordinator = new StreamWriteOperatorCoordinator(this.conf, (OperatorCoordinator.Context)this.coordinatorContext);
        this.coordinator.start();
        this.coordinator.setExecutor((NonThrownExecutor)new MockCoordinatorExecutor((OperatorCoordinator.Context)this.coordinatorContext));
    }

    @Override
    public void checkpointFails(long checkpointId) {
        this.coordinator.notifyCheckpointAborted(checkpointId);
    }

    @Override
    public StreamWriteOperatorCoordinator getCoordinator() {
        return this.coordinator;
    }

    @Override
    public MockOperatorCoordinatorContext getCoordinatorContext() {
        return this.coordinatorContext;
    }

    @Override
    public void close() throws Exception {
        this.coordinator.close();
        this.ioManager.close();
        this.writeFunction.close();
        if (this.bucketIdToFileId != null) {
            this.bucketIdToFileId.clear();
        }
        if (this.needSortInput) {
            this.sortOperator.close();
        }
    }

    private void setupWriteFunction() throws Exception {
        this.writeFunction = new BulkInsertWriteFunction(this.conf, this.rowType);
        this.writeFunction.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.writeFunction.setOperatorEventGateway((OperatorEventGateway)this.gateway);
        this.writeFunction.open(this.conf);
        this.coordinator.handleEventFromOperator(0, this.getNextEvent());
    }

    private void setupMapFunction() {
        RowDataKeyGen keyGen = RowDataKeyGen.instance((Configuration)this.conf, (RowType)this.rowType);
        String indexKeys = OptionsResolver.getIndexKeyField((Configuration)this.conf);
        int numBuckets = this.conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
        boolean needFixedFileIdSuffix = OptionsResolver.isNonBlockingConcurrencyControl((Configuration)this.conf);
        this.bucketIdToFileId = new HashMap<String, String>();
        this.mapFunction = (MapFunction & Serializable)r -> BucketBulkInsertWriterHelper.rowWithFileId(this.bucketIdToFileId, (RowDataKeyGen)keyGen, (RowData)r, (String)indexKeys, (int)numBuckets, (boolean)needFixedFileIdSuffix);
    }

    private void setupSortOperator() throws Exception {
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(393216L).setIOManager(this.ioManager).build();
        MockStreamTask streamTask = new MockStreamTaskBuilder((Environment)environment).setConfig(new StreamConfig(this.conf)).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen((RowType)this.rowTypeWithFileId);
        this.sortOperator = (SortOperator)sortOperatorGen.createSortOperator(this.conf);
        this.sortOperator.setProcessingTimeService((ProcessingTimeService)new TestProcessingTimeService());
        this.output = new CollectOutputAdapter();
        StreamConfig streamConfig = new StreamConfig(this.conf);
        streamConfig.setOperatorID(new OperatorID());
        RowDataSerializer inputSerializer = new RowDataSerializer(this.rowTypeWithFileId);
        TestStreamConfigs.setupNetworkInputs((StreamConfig)streamConfig, (TypeSerializer[])new TypeSerializer[]{inputSerializer});
        streamConfig.setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
        this.sortOperator.setup((StreamTask)streamTask, streamConfig, this.output);
        this.sortOperator.open();
    }
}

