/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.jbatch.container.impl;

import com.ibm.jbatch.container.artifact.proxy.InjectionReferences;
import com.ibm.jbatch.container.artifact.proxy.PartitionAnalyzerProxy;
import com.ibm.jbatch.container.artifact.proxy.PartitionMapperProxy;
import com.ibm.jbatch.container.artifact.proxy.PartitionReducerProxy;
import com.ibm.jbatch.container.artifact.proxy.ProxyFactory;
import com.ibm.jbatch.container.exception.BatchContainerRuntimeException;
import com.ibm.jbatch.container.exception.BatchContainerServiceException;
import com.ibm.jbatch.container.impl.BaseStepControllerImpl;
import com.ibm.jbatch.container.impl.PartitionedStepBuilder;
import com.ibm.jbatch.container.jobinstance.RuntimeJobExecutionHelper;
import com.ibm.jbatch.container.jsl.CloneUtility;
import com.ibm.jbatch.container.util.BatchPartitionPlan;
import com.ibm.jbatch.container.util.BatchWorkUnit;
import com.ibm.jbatch.container.util.PartitionDataWrapper;
import com.ibm.jbatch.container.validation.ArtifactValidationException;
import com.ibm.jbatch.jsl.model.Analyzer;
import com.ibm.jbatch.jsl.model.JSLJob;
import com.ibm.jbatch.jsl.model.JSLProperties;
import com.ibm.jbatch.jsl.model.PartitionMapper;
import com.ibm.jbatch.jsl.model.PartitionReducer;
import com.ibm.jbatch.jsl.model.Property;
import com.ibm.jbatch.jsl.model.Step;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Stack;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.batch.api.partition.PartitionPlan;
import javax.batch.api.partition.PartitionReducer;
import javax.batch.operations.JobOperator;
import javax.batch.operations.JobRestartException;
import javax.batch.operations.JobStartException;

public class PartitionedStepControllerImpl
extends BaseStepControllerImpl {
    private static final String sourceClass = PartitionedStepControllerImpl.class.getName();
    private static final Logger logger = Logger.getLogger(sourceClass);
    private static final int DEFAULT_PARTITION_INSTANCES = 1;
    private static final int DEFAULT_THREADS = 0;
    private int partitions = 1;
    private int threads = 0;
    private Properties[] partitionProperties = null;
    private volatile List<BatchWorkUnit> parallelBatchWorkUnits;
    private PartitionReducerProxy partitionReducerProxy = null;
    private PartitionAnalyzerProxy analyzerProxy = null;
    final List<JSLJob> subJobs = new ArrayList<JSLJob>();

    protected PartitionedStepControllerImpl(RuntimeJobExecutionHelper jobExecutionImpl, Step step) {
        super(jobExecutionImpl, step);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void stop() {
        this.stepContext.setBatchStatus(JobOperator.BatchStatus.STOPPING);
        List<JSLJob> list = this.subJobs;
        synchronized (list) {
            if (this.parallelBatchWorkUnits != null) {
                for (BatchWorkUnit subJob : this.parallelBatchWorkUnits) {
                    try {
                        batchKernel.stopJob(subJob.getJobExecutionImpl().getExecutionId());
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                }
            }
        }
    }

    private PartitionPlan generatePartitionPlan() {
        BatchPartitionPlan plan = null;
        PartitionPlan previousPlan = null;
        PartitionMapper partitionMapper = this.step.getPartition().getMapper();
        if (this.stepStatus.getPlan() != null) {
            previousPlan = this.stepStatus.getPlan();
        }
        if (partitionMapper != null) {
            PartitionMapperProxy partitionMapperProxy;
            List<Property> propertyList = partitionMapper.getProperties() == null ? null : partitionMapper.getProperties().getPropertyList();
            InjectionReferences injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propertyList);
            try {
                partitionMapperProxy = ProxyFactory.createPartitionMapperProxy(partitionMapper.getRef(), injectionRef, this.stepContext);
            }
            catch (ArtifactValidationException e) {
                throw new BatchContainerServiceException("Cannot create the PartitionMapper [" + partitionMapper.getRef() + "]", e);
            }
            PartitionPlan mapperPlan = partitionMapperProxy.mapPartitions();
            plan = new BatchPartitionPlan();
            plan.setPartitionsOverride(mapperPlan.getPartitionsOverride());
            if (mapperPlan.getPartitionsOverride() || previousPlan == null) {
                plan.setPartitions(mapperPlan.getPartitions());
            } else {
                plan.setPartitions(previousPlan.getPartitions());
            }
            if (mapperPlan.getThreads() == 0) {
                plan.setThreads(plan.getPartitions());
            } else {
                plan.setThreads(mapperPlan.getThreads());
            }
            plan.setPartitionProperties(mapperPlan.getPartitionProperties());
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("Partition plan defined by partition mapper: " + plan);
            }
        } else if (this.step.getPartition().getPlan() != null) {
            int numThreads;
            String partitionsAttr = this.step.getPartition().getPlan().getPartitions();
            String threadsAttr = null;
            int numPartitions = Integer.MIN_VALUE;
            Properties[] partitionProps = null;
            if (partitionsAttr != null) {
                try {
                    numPartitions = Integer.parseInt(partitionsAttr);
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Could not parse partition instances value in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr, e);
                }
                partitionProps = new Properties[numPartitions];
                if (numPartitions < 1) {
                    throw new IllegalArgumentException("Partition instances value must be 1 or greater in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr);
                }
            }
            if ((threadsAttr = this.step.getPartition().getPlan().getThreads()) != null) {
                try {
                    numThreads = Integer.parseInt(partitionsAttr);
                    if (numThreads == 0) {
                        numThreads = numPartitions;
                    }
                }
                catch (NumberFormatException e) {
                    throw new IllegalArgumentException("Could not parse partition threads value in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr, e);
                }
                if (numThreads < 0) {
                    throw new IllegalArgumentException("Threads value must be 0 or greater in stepId: " + this.step.getId() + ", with instances=" + partitionsAttr);
                }
            } else {
                numThreads = numPartitions;
            }
            if (this.step.getPartition().getPlan().getProperties() != null) {
                List<JSLProperties> jslProperties = this.step.getPartition().getPlan().getProperties();
                for (JSLProperties props : jslProperties) {
                    int targetPartition = Integer.parseInt(props.getPartition());
                    try {
                        partitionProps[targetPartition - 1] = CloneUtility.jslPropertiesToJavaProperties(props);
                    }
                    catch (ArrayIndexOutOfBoundsException e) {
                        throw new BatchContainerRuntimeException("There are only " + numPartitions + " partition instances, but there are " + jslProperties.size() + " partition properties lists defined.", e);
                    }
                }
            }
            plan = new BatchPartitionPlan();
            plan.setPartitions(numPartitions);
            plan.setThreads(numThreads);
            plan.setPartitionProperties(partitionProps);
            plan.setPartitionsOverride(false);
        }
        return plan;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void invokeCoreStep() throws JobRestartException, JobStartException {
        PartitionPlan plan = this.generatePartitionPlan();
        this.partitions = plan.getPartitions();
        this.threads = plan.getThreads();
        this.partitionProperties = plan.getPartitionProperties();
        this.stepStatus.setPlan(plan);
        if (plan.getPartitionsOverride() && this.partitionReducerProxy != null) {
            this.partitionReducerProxy.rollbackPartitionedStep();
        }
        if (logger.isLoggable(Level.FINE)) {
            logger.fine("Number of partitions in step: " + this.partitions + " in step " + this.step.getId());
            logger.fine("Subjob properties defined by partition mapper: " + this.partitionProperties);
        }
        LinkedBlockingQueue<PartitionDataWrapper> analyzerQueue = null;
        if (this.analyzerProxy != null) {
            analyzerQueue = new LinkedBlockingQueue<PartitionDataWrapper>();
        }
        LinkedBlockingQueue<BatchWorkUnit> completedWorkQueue = new LinkedBlockingQueue<BatchWorkUnit>();
        this.subJobExitStatusQueue = new Stack();
        List<JSLJob> list = this.subJobs;
        synchronized (list) {
            if (this.jobExecutionImpl.getJobContext().getBatchStatus().equals((Object)JobOperator.BatchStatus.STOPPING)) {
                this.stepContext.setBatchStatus(JobOperator.BatchStatus.STOPPED);
                return;
            }
            for (int instance = 0; instance < this.partitions; ++instance) {
                this.subJobs.add(PartitionedStepBuilder.buildSubJob(this.jobExecutionImpl.getInstanceId(), this.jobExecutionImpl.getJobContext(), this.step, instance));
            }
            this.parallelBatchWorkUnits = this.stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride() ? batchKernel.buildRestartableParallelJobs(this.subJobs, this.partitionProperties, analyzerQueue, this.subJobExitStatusQueue, completedWorkQueue, this.containment, null) : batchKernel.buildNewParallelJobs(this.subJobs, this.partitionProperties, analyzerQueue, this.subJobExitStatusQueue, completedWorkQueue, this.containment, null);
        }
        Iterator<BatchWorkUnit> iterator = this.parallelBatchWorkUnits.iterator();
        for (int i = 0; i < this.threads && iterator.hasNext(); ++i) {
            if (this.stepStatus.getStartCount() > 1 && !plan.getPartitionsOverride()) {
                batchKernel.restartGeneratedJob(iterator.next());
                continue;
            }
            batchKernel.startGeneratedJob(iterator.next());
        }
        ArrayList completedWork = new ArrayList(this.partitions);
        int completedPartitions = 0;
        try {
            while (true) {
                if (this.analyzerProxy != null) {
                    PartitionDataWrapper dataWrapper = analyzerQueue.take();
                    switch (dataWrapper.getEventType()) {
                        case ANALYZE_COLLECTOR_DATA: {
                            this.analyzerProxy.analyzeCollectorData(dataWrapper.getCollectorData());
                            break;
                        }
                        case ANALYZE_STATUS: {
                            this.analyzerProxy.analyzeStatus(dataWrapper.getBatchstatus(), dataWrapper.getExitStatus());
                            ++completedPartitions;
                            if (!iterator.hasNext()) break;
                            completedWork.add(completedWorkQueue.take());
                            if (this.stepStatus.getStartCount() > 1) {
                                batchKernel.startGeneratedJob(iterator.next());
                                break;
                            }
                            batchKernel.restartGeneratedJob(iterator.next());
                            break;
                        }
                        case STEP_ALREADY_COMPLETED: {
                            ++completedPartitions;
                            if (!iterator.hasNext()) break;
                            completedWork.add(completedWorkQueue.take());
                            if (this.stepStatus.getStartCount() > 1) {
                                batchKernel.startGeneratedJob(iterator.next());
                                break;
                            }
                            batchKernel.restartGeneratedJob(iterator.next());
                            break;
                        }
                        default: {
                            throw new IllegalStateException("Invalid partition state");
                        }
                    }
                    if (completedPartitions != this.partitions) continue;
                } else if (iterator.hasNext()) {
                    completedWork.add(completedWorkQueue.take());
                    if (this.stepStatus.getStartCount() > 1) {
                        batchKernel.startGeneratedJob(iterator.next());
                        continue;
                    }
                    batchKernel.restartGeneratedJob(iterator.next());
                    continue;
                }
                break;
            }
        }
        catch (InterruptedException e) {
            throw new BatchContainerRuntimeException(e);
        }
        boolean rollback = false;
        boolean partitionFailed = false;
        int i = 0;
        while (completedWork.size() < this.partitions) {
            try {
                completedWork.add(completedWorkQueue.take());
            }
            catch (InterruptedException e) {
                throw new BatchContainerRuntimeException(e);
            }
            ++i;
        }
        for (BatchWorkUnit subJob : completedWork) {
            JobOperator.BatchStatus batchStatus = subJob.getJobExecutionImpl().getJobContext().getBatchStatus();
            if (batchStatus.equals((Object)JobOperator.BatchStatus.FAILED)) {
                if (logger.isLoggable(Level.FINE)) {
                    logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + " ended with status '" + batchStatus + "'");
                    logger.fine("Starting logical transaction rollback.");
                }
                rollback = true;
                partitionFailed = true;
                this.stepContext.setBatchStatus(JobOperator.BatchStatus.FAILED);
                continue;
            }
            if (!batchStatus.equals((Object)JobOperator.BatchStatus.STOPPED)) continue;
            if (logger.isLoggable(Level.FINE)) {
                logger.fine("Subjob " + subJob.getJobExecutionImpl().getExecutionId() + "ended with status '" + batchStatus + "'");
                logger.fine("Starting logical transaction rollback.");
            }
            rollback = true;
            if (JobOperator.BatchStatus.FAILED.equals((Object)this.stepContext.getBatchStatus())) continue;
            this.stepContext.setBatchStatus(JobOperator.BatchStatus.STOPPED);
        }
        if (rollback) {
            if (this.partitionReducerProxy != null) {
                this.partitionReducerProxy.rollbackPartitionedStep();
            }
        } else if (this.partitionReducerProxy != null) {
            this.partitionReducerProxy.beforePartitionedStepCompletion();
        }
        if (this.stepContext.getExitStatus() == null) {
            this.stepContext.setExitStatus((String)this.subJobExitStatusQueue.pop());
            this.subJobExitStatusQueue.clear();
        }
        if (partitionFailed) {
            throw new BatchContainerRuntimeException("One or more partitions failed");
        }
    }

    @Override
    protected void setupStepArtifacts() {
        PartitionReducer partitionReducer;
        Analyzer analyzer = this.step.getPartition().getAnalyzer();
        if (analyzer != null) {
            List<Property> propList = analyzer.getProperties() == null ? null : analyzer.getProperties().getPropertyList();
            InjectionReferences injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propList);
            try {
                this.analyzerProxy = ProxyFactory.createPartitionAnalyzerProxy(analyzer.getRef(), injectionRef, this.stepContext);
            }
            catch (ArtifactValidationException e) {
                throw new BatchContainerServiceException("Cannot create the analyzer [" + analyzer.getRef() + "]", e);
            }
        }
        if ((partitionReducer = this.step.getPartition().getReducer()) != null) {
            List<Property> propList = partitionReducer.getProperties() == null ? null : partitionReducer.getProperties().getPropertyList();
            InjectionReferences injectionRef = new InjectionReferences(this.jobExecutionImpl.getJobContext(), this.stepContext, propList);
            try {
                this.partitionReducerProxy = ProxyFactory.createPartitionReducerProxy(partitionReducer.getRef(), injectionRef, this.stepContext);
            }
            catch (ArtifactValidationException e) {
                throw new BatchContainerServiceException("Cannot create the analyzer [" + partitionReducer.getRef() + "]", e);
            }
        }
    }

    @Override
    protected void invokePreStepArtifacts() {
        if (this.partitionReducerProxy != null) {
            this.partitionReducerProxy.beginPartitionedStep();
        }
    }

    @Override
    protected void invokePostStepArtifacts() {
        if (this.partitionReducerProxy != null) {
            if (JobOperator.BatchStatus.COMPLETED.equals((Object)this.stepContext.getBatchStatus())) {
                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionReducer.PartitionStatus.COMMIT);
            } else {
                this.partitionReducerProxy.afterPartitionedStepCompletion(PartitionReducer.PartitionStatus.ROLLBACK);
            }
        }
    }
}

