package org.apache.carbondata.spark.rdd;

import java.util.UUID;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.locks.CarbonLockFactory;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.events.OperationContext;
import org.apache.carbondata.events.OperationListenerBus;
import org.apache.carbondata.processing.loading.events.LoadEvents;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.CarbonLoaderUtil;
import org.apache.carbondata.spark.HandoffResultImpl;
import org.apache.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Tuple2;
import scala.collection.mutable.ArrayOps;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ObjectRef;

/* compiled from: StreamHandoffRDD.scala */
/* loaded from: input_file:org/apache/carbondata/spark/rdd/StreamHandoffRDD$.class */
public final class StreamHandoffRDD$ implements Serializable {
    public static StreamHandoffRDD$ MODULE$;
    private final Logger LOGGER;

    static {
        new StreamHandoffRDD$();
    }

    private Logger LOGGER() {
        return this.LOGGER;
    }

    public void iterateStreamingHandoff(CarbonLoadModel carbonLoadModel, OperationContext operationContext, SparkSession sparkSession) {
        boolean z;
        CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
        AbsoluteTableIdentifier absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier();
        ICarbonLock carbonLockObj = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, "handoff.lock");
        try {
            if (carbonLockObj.lockWithRetries()) {
                LOGGER().info(new StringBuilder(35).append("Acquired the handoff lock for table").append(new StringBuilder(2).append(" ").append(carbonTable.getDatabaseName()).append(".").append(carbonTable.getTableName()).toString()).toString());
                do {
                    LoadMetadataDetails[] loadMetadataDetailsArr = null;
                    ICarbonLock tableStatusLock = new SegmentStatusManager(absoluteTableIdentifier).getTableStatusLock();
                    try {
                        if (tableStatusLock.lockWithRetries()) {
                            loadMetadataDetailsArr = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath()));
                        }
                        if (tableStatusLock != null) {
                            tableStatusLock.unlock();
                        }
                        if (loadMetadataDetailsArr != null) {
                            LoadMetadataDetails[] loadMetadataDetailsArr2 = (LoadMetadataDetails[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(loadMetadataDetailsArr)).filter(loadMetadataDetails -> {
                                return BoxesRunTime.boxToBoolean($anonfun$iterateStreamingHandoff$1(loadMetadataDetails));
                            });
                            z = loadMetadataDetailsArr2.length > 0;
                            if (z) {
                                LoadMetadataDetails loadMetadataDetails2 = loadMetadataDetailsArr2[0];
                                carbonLoadModel.setSegmentId(loadMetadataDetails2.getLoadName());
                                executeStreamingHandoff(carbonLoadModel, sparkSession, operationContext, loadMetadataDetails2.getLoadName());
                            }
                        } else {
                            z = false;
                        }
                    } catch (Throwable th) {
                        if (tableStatusLock != null) {
                            tableStatusLock.unlock();
                        }
                        throw th;
                    }
                } while (z);
            }
        } finally {
            if (carbonLockObj != null) {
                carbonLockObj.unlock();
            }
        }
    }

    public void startStreamingHandoffThread(final CarbonLoadModel carbonLoadModel, final OperationContext operationContext, final SparkSession sparkSession, boolean z) {
        if (z) {
            iterateStreamingHandoff(carbonLoadModel, operationContext, sparkSession);
        } else {
            new Thread(carbonLoadModel, operationContext, sparkSession) { // from class: org.apache.carbondata.spark.rdd.StreamHandoffRDD$$anon$2
                private final CarbonLoadModel carbonLoadModel$1;
                private final OperationContext operationContext$1;
                private final SparkSession sparkSession$1;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    StreamHandoffRDD$.MODULE$.iterateStreamingHandoff(this.carbonLoadModel$1, this.operationContext$1, this.sparkSession$1);
                }

                {
                    this.carbonLoadModel$1 = carbonLoadModel;
                    this.operationContext$1 = operationContext;
                    this.sparkSession$1 = sparkSession;
                }
            }.start();
        }
    }

    public void executeStreamingHandoff(CarbonLoadModel carbonLoadModel, SparkSession sparkSession, OperationContext operationContext, String str) {
        ObjectRef create = ObjectRef.create(SegmentStatus.SUCCESS);
        String str2 = "Handoff failure";
        try {
            LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis());
            CarbonLoaderUtil.populateNewLoadMetaEntry(loadMetadataDetails, SegmentStatus.INSERT_IN_PROGRESS, carbonLoadModel.getFactTimeStamp(), false);
            CarbonLoaderUtil.recordNewLoadMetadata(loadMetadataDetails, carbonLoadModel, true, false);
            new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Tuple2[]) new StreamHandoffRDD(sparkSession, new HandoffResultImpl(), carbonLoadModel, str).collect())).foreach(tuple2 -> {
                $anonfun$executeStreamingHandoff$1(create, tuple2);
                return BoxedUnit.UNIT;
            });
        } catch (Exception e) {
            create.elem = SegmentStatus.LOAD_FAILURE;
            LOGGER().error(new StringBuilder(36).append("Handoff failed on streaming segment ").append(str).toString(), e);
            str2 = new StringBuilder(2).append(str2).append(": ").append(e.getCause().getMessage()).toString();
            LOGGER().error(str2);
        }
        SegmentStatus segmentStatus = (SegmentStatus) create.elem;
        SegmentStatus segmentStatus2 = SegmentStatus.LOAD_FAILURE;
        if (segmentStatus != null ? segmentStatus.equals(segmentStatus2) : segmentStatus2 == null) {
            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel);
            LOGGER().info("********starting clean up**********");
            CarbonLoaderUtil.deleteSegmentForFailure(carbonLoadModel);
            LOGGER().info("********clean up done**********");
            LOGGER().error("Cannot write load metadata file as handoff failed");
            throw new Exception(str2);
        }
        SegmentStatus segmentStatus3 = (SegmentStatus) create.elem;
        SegmentStatus segmentStatus4 = SegmentStatus.SUCCESS;
        if (segmentStatus3 == null) {
            if (segmentStatus4 != null) {
                return;
            }
        } else if (!segmentStatus3.equals(segmentStatus4)) {
            return;
        }
        operationContext.setProperty("uuid", UUID.randomUUID().toString());
        OperationListenerBus.getInstance().fireEvent(new LoadEvents.LoadTablePreStatusUpdateEvent(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(), carbonLoadModel), operationContext);
        boolean updateLoadMetadata = updateLoadMetadata(str, carbonLoadModel);
        OperationListenerBus.getInstance().fireEvent(new LoadEvents.LoadTablePostStatusUpdateEvent(carbonLoadModel), operationContext);
        if (updateLoadMetadata) {
            return;
        }
        LOGGER().error("Handoff failed due to failure in table status update.");
        throw new Exception(str2);
    }

    private boolean updateLoadMetadata(String str, CarbonLoadModel carbonLoadModel) {
        boolean z = false;
        String metadataPath = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getMetadataPath();
        AbsoluteTableIdentifier absoluteTableIdentifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
        String metadataPath2 = CarbonTablePath.getMetadataPath(absoluteTableIdentifier.getTablePath());
        if (FileFactory.isFileExist(metadataPath2)) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(FileFactory.mkdirs(metadataPath2));
        }
        String tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(absoluteTableIdentifier.getTablePath());
        ICarbonLock tableStatusLock = new SegmentStatusManager(absoluteTableIdentifier).getTableStatusLock();
        try {
            if (tableStatusLock.lockWithRetries()) {
                LOGGER().info(new StringBuilder(48).append("Acquired lock for table").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).append(" for table status update").toString());
                LoadMetadataDetails[] readLoadMetadata = SegmentStatusManager.readLoadMetadata(metadataPath);
                Option find = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(readLoadMetadata)).find(loadMetadataDetails -> {
                    return BoxesRunTime.boxToBoolean($anonfun$updateLoadMetadata$1(carbonLoadModel, loadMetadataDetails));
                });
                if (find.isEmpty()) {
                    throw new Exception("Failed to update table status for new segment");
                }
                ((LoadMetadataDetails) find.get()).setSegmentStatus(SegmentStatus.SUCCESS);
                ((LoadMetadataDetails) find.get()).setLoadEndTime(System.currentTimeMillis());
                CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry((LoadMetadataDetails) find.get(), carbonLoadModel.getSegmentId(), carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
                Option find2 = new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(readLoadMetadata)).find(loadMetadataDetails2 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$updateLoadMetadata$2(str, loadMetadataDetails2));
                });
                if (find2.isEmpty()) {
                    throw new Exception("Failed to update table status for streaming segment");
                }
                ((LoadMetadataDetails) find2.get()).setSegmentStatus(SegmentStatus.COMPACTED);
                ((LoadMetadataDetails) find2.get()).setMergedLoadName(carbonLoadModel.getSegmentId());
                SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFilePath, readLoadMetadata);
                z = true;
            } else {
                LOGGER().error(new StringBuilder(64).append("Not able to acquire the lock for Table status update for table ").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).toString());
            }
            return z;
        } finally {
            if (tableStatusLock.unlock()) {
                LOGGER().info(new StringBuilder(54).append("Table unlocked successfully after table status update").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).toString());
            } else {
                LOGGER().error(new StringBuilder(65).append("Unable to unlock Table lock for table").append(carbonLoadModel.getDatabaseName()).append(".").append(carbonLoadModel.getTableName()).append(" during table status update").toString());
            }
        }
    }

    private Object readResolve() {
        return MODULE$;
    }

    public static final /* synthetic */ boolean $anonfun$iterateStreamingHandoff$1(LoadMetadataDetails loadMetadataDetails) {
        SegmentStatus segmentStatus = loadMetadataDetails.getSegmentStatus();
        SegmentStatus segmentStatus2 = SegmentStatus.STREAMING_FINISH;
        return segmentStatus != null ? segmentStatus.equals(segmentStatus2) : segmentStatus2 == null;
    }

    public static final /* synthetic */ void $anonfun$executeStreamingHandoff$1(ObjectRef objectRef, Tuple2 tuple2) {
        if (tuple2._2$mcZ$sp()) {
            return;
        }
        objectRef.elem = SegmentStatus.LOAD_FAILURE;
    }

    public static final /* synthetic */ boolean $anonfun$updateLoadMetadata$1(CarbonLoadModel carbonLoadModel, LoadMetadataDetails loadMetadataDetails) {
        return loadMetadataDetails.getLoadName().equals(carbonLoadModel.getSegmentId());
    }

    public static final /* synthetic */ boolean $anonfun$updateLoadMetadata$2(String str, LoadMetadataDetails loadMetadataDetails) {
        return loadMetadataDetails.getLoadName().equals(str);
    }

    private StreamHandoffRDD$() {
        MODULE$ = this;
        this.LOGGER = LogServiceFactory.getLogService(getClass().getCanonicalName());
    }
}
