/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.utils;

import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanEvent;
import org.apache.hudi.sink.compact.CompactionPlanOperator;
import org.apache.hudi.sink.utils.CollectorOutput;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.MockStreamingRuntimeContext;
import org.apache.hudi.sink.utils.NonThrownExecutor;

public class CompactFunctionWrapper {
    private final Configuration conf;
    private final IOManager ioManager;
    private final StreamingRuntimeContext runtimeContext;
    private final StreamTask<?, ?> streamTask;
    private final StreamConfig streamConfig;
    private CompactionPlanOperator compactionPlanOperator;
    private CollectorOutput<CompactionCommitEvent> commitEventOutput;
    private CompactOperator compactOperator;
    private CompactionCommitSink commitSink;

    public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, StreamConfig streamConfig) {
        this.conf = conf;
        this.ioManager = new IOManagerAsync();
        MockEnvironment environment = new MockEnvironmentBuilder().setTaskName("mockTask").setManagedMemorySize(131072L).setIOManager(this.ioManager).build();
        this.runtimeContext = new MockStreamingRuntimeContext(false, 1, 0, environment);
        this.streamTask = streamTask;
        this.streamConfig = streamConfig;
    }

    public void openFunction() throws Exception {
        this.compactionPlanOperator = new CompactionPlanOperator(this.conf);
        this.compactionPlanOperator.open();
        this.compactOperator = new CompactOperator(this.conf);
        this.compactOperator.setProcessingTimeService((ProcessingTimeService)new TestProcessingTimeService());
        this.commitEventOutput = new CollectorOutput();
        this.compactOperator.setup(this.streamTask, this.streamConfig, this.commitEventOutput);
        this.compactOperator.open();
        MockCoordinatorExecutor syncExecutor = new MockCoordinatorExecutor((OperatorCoordinator.Context)new MockOperatorCoordinatorContext(new OperatorID(), 1));
        this.compactOperator.setExecutor((NonThrownExecutor)syncExecutor);
        this.commitSink = new CompactionCommitSink(this.conf);
        this.commitSink.setRuntimeContext((RuntimeContext)this.runtimeContext);
        this.commitSink.open(this.conf);
    }

    public void compact(long checkpointID) throws Exception {
        CollectorOutput output = new CollectorOutput();
        this.compactionPlanOperator.setOutput(output);
        this.compactionPlanOperator.notifyCheckpointComplete(checkpointID);
        for (CompactionPlanEvent compactionPlanEvent : output.getRecords()) {
            this.compactOperator.processElement(new StreamRecord((Object)compactionPlanEvent));
        }
        for (CompactionCommitEvent compactionCommitEvent : this.commitEventOutput.getRecords()) {
            this.commitSink.invoke(compactionCommitEvent, null);
        }
    }

    public void close() throws Exception {
        this.ioManager.close();
    }
}

