package org.apache.iceberg.flink.sink.shuffle;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortKey;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataWrapper;
import org.apache.iceberg.flink.sink.shuffle.DataStatistics;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.class */
class DataStatisticsOperator<D extends DataStatistics<D, S>, S> extends AbstractStreamOperator<DataStatisticsOrRecord<D, S>> implements OneInputStreamOperator<RowData, DataStatisticsOrRecord<D, S>>, OperatorEventHandler {
    private static final long serialVersionUID = 1;
    private final String operatorName;
    private final RowDataWrapper rowDataWrapper;
    private final SortKey sortKey;
    private final OperatorEventGateway operatorEventGateway;
    private final TypeSerializer<DataStatistics<D, S>> statisticsSerializer;
    private volatile transient DataStatistics<D, S> localStatistics;
    private volatile transient DataStatistics<D, S> globalStatistics;
    private transient ListState<DataStatistics<D, S>> globalStatisticsState;

    DataStatisticsOperator(String str, Schema schema, SortOrder sortOrder, OperatorEventGateway operatorEventGateway, TypeSerializer<DataStatistics<D, S>> typeSerializer) {
        this.operatorName = str;
        this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct());
        this.sortKey = new SortKey(schema, sortOrder);
        this.operatorEventGateway = operatorEventGateway;
        this.statisticsSerializer = typeSerializer;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        this.localStatistics = (DataStatistics) this.statisticsSerializer.createInstance();
        this.globalStatisticsState = stateInitializationContext.getOperatorStateStore().getUnionListState(new ListStateDescriptor("globalStatisticsState", this.statisticsSerializer));
        if (!stateInitializationContext.isRestored()) {
            this.globalStatistics = (DataStatistics) this.statisticsSerializer.createInstance();
            return;
        }
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        if (this.globalStatisticsState.get() == null || !((Iterable) this.globalStatisticsState.get()).iterator().hasNext()) {
            LOG.warn("Operator {} subtask {} doesn't have global statistics state to restore", this.operatorName, Integer.valueOf(indexOfThisSubtask));
            this.globalStatistics = (DataStatistics) this.statisticsSerializer.createInstance();
        } else {
            LOG.info("Restoring operator {} global statistics state for subtask {}", this.operatorName, Integer.valueOf(indexOfThisSubtask));
            this.globalStatistics = (DataStatistics) ((Iterable) this.globalStatisticsState.get()).iterator().next();
        }
    }

    public void open() throws Exception {
        if (this.globalStatistics.isEmpty()) {
            return;
        }
        this.output.collect(new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
    }

    public void handleOperatorEvent(OperatorEvent operatorEvent) {
        Preconditions.checkArgument(operatorEvent instanceof DataStatisticsEvent, String.format("Operator %s subtask %s received unexpected operator event %s", this.operatorName, Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), operatorEvent.getClass()));
        DataStatisticsEvent dataStatisticsEvent = (DataStatisticsEvent) operatorEvent;
        LOG.info("Operator {} received global data event from coordinator checkpoint {}", this.operatorName, Long.valueOf(dataStatisticsEvent.checkpointId()));
        this.globalStatistics = DataStatisticsUtil.deserializeDataStatistics(dataStatisticsEvent.statisticsBytes(), this.statisticsSerializer);
        this.output.collect(new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
    }

    public void processElement(StreamRecord<RowData> streamRecord) {
        RowData rowData = (RowData) streamRecord.getValue();
        this.sortKey.wrap(this.rowDataWrapper.wrap(rowData));
        this.localStatistics.add(this.sortKey);
        this.output.collect(new StreamRecord(DataStatisticsOrRecord.fromRecord(rowData)));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        long checkpointId = stateSnapshotContext.getCheckpointId();
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Snapshotting data statistics operator {} for checkpoint {} in subtask {}", new Object[]{this.operatorName, Long.valueOf(checkpointId), Integer.valueOf(indexOfThisSubtask)});
        if (!this.globalStatistics.isEmpty()) {
            this.output.collect(new StreamRecord(DataStatisticsOrRecord.fromDataStatistics(this.globalStatistics)));
        }
        if (!this.globalStatistics.isEmpty() && getRuntimeContext().getIndexOfThisSubtask() == 0) {
            this.globalStatisticsState.clear();
            LOG.info("Saving operator {} global statistics {} to state in subtask {}", new Object[]{this.operatorName, this.globalStatistics, Integer.valueOf(indexOfThisSubtask)});
            this.globalStatisticsState.add(this.globalStatistics);
        }
        this.operatorEventGateway.sendEventToCoordinator(DataStatisticsEvent.create(checkpointId, this.localStatistics, this.statisticsSerializer));
        LOG.debug("Subtask {} of operator {} sent local statistics to coordinator at checkpoint{}: {}", new Object[]{Integer.valueOf(indexOfThisSubtask), this.operatorName, Long.valueOf(checkpointId), this.localStatistics});
        this.localStatistics = (DataStatistics) this.statisticsSerializer.createInstance();
    }

    @VisibleForTesting
    DataStatistics<D, S> localDataStatistics() {
        return this.localStatistics;
    }

    @VisibleForTesting
    DataStatistics<D, S> globalDataStatistics() {
        return this.globalStatistics;
    }
}
