package org.apache.hyracks.control.nc.work;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hyracks.api.application.INCApplicationContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.IPartitionCollector;
import org.apache.hyracks.api.comm.IPartitionWriterFactory;
import org.apache.hyracks.api.comm.NetworkAddress;
import org.apache.hyracks.api.comm.PartitionChannel;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.ConnectorDescriptorId;
import org.apache.hyracks.api.dataflow.IActivity;
import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.connectors.IConnectorPolicy;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.deployment.DeploymentId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.ActivityCluster;
import org.apache.hyracks.api.job.ActivityClusterGraph;
import org.apache.hyracks.api.job.JobFlag;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.partitions.PartitionId;
import org.apache.hyracks.comm.channels.NetworkInputChannel;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.Joblet;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
import org.apache.hyracks.control.nc.partitions.MaterializedPartitionWriter;
import org.apache.hyracks.control.nc.partitions.MaterializingPipelinedPartition;
import org.apache.hyracks.control.nc.partitions.PipelinedPartition;
import org.apache.hyracks.control.nc.partitions.ReceiveSideMaterializingCollector;
import org.apache.hyracks.control.nc.profiling.ProfilingPartitionWriterFactory;

/* loaded from: input_file:org/apache/hyracks/control/nc/work/StartTasksWork.class */
public class StartTasksWork extends AbstractWork {
    private static final Logger LOGGER = Logger.getLogger(StartTasksWork.class.getName());
    private final NodeControllerService ncs;
    private final DeploymentId deploymentId;
    private final JobId jobId;
    private final byte[] acgBytes;
    private final List<TaskAttemptDescriptor> taskDescriptors;
    private final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPoliciesMap;
    private final EnumSet<JobFlag> flags;

    public StartTasksWork(NodeControllerService nodeControllerService, DeploymentId deploymentId, JobId jobId, byte[] bArr, List<TaskAttemptDescriptor> list, Map<ConnectorDescriptorId, IConnectorPolicy> map, EnumSet<JobFlag> enumSet) {
        this.ncs = nodeControllerService;
        this.deploymentId = deploymentId;
        this.jobId = jobId;
        this.acgBytes = bArr;
        this.taskDescriptors = list;
        this.connectorPoliciesMap = map;
        this.flags = enumSet;
    }

    public void run() {
        try {
            Joblet orCreateLocalJoblet = getOrCreateLocalJoblet(this.deploymentId, this.jobId, this.ncs.getApplicationContext(), this.acgBytes);
            final ActivityClusterGraph activityClusterGraph = orCreateLocalJoblet.getActivityClusterGraph();
            IRecordDescriptorProvider iRecordDescriptorProvider = new IRecordDescriptorProvider() { // from class: org.apache.hyracks.control.nc.work.StartTasksWork.1
                public RecordDescriptor getOutputRecordDescriptor(ActivityId activityId, int i) {
                    ActivityCluster activityCluster = (ActivityCluster) activityClusterGraph.getActivityMap().get(activityId);
                    return (RecordDescriptor) activityCluster.getConnectorRecordDescriptorMap().get(((IConnectorDescriptor) ((List) activityCluster.getActivityOutputMap().get(activityId)).get(i)).getConnectorId());
                }

                public RecordDescriptor getInputRecordDescriptor(ActivityId activityId, int i) {
                    ActivityCluster activityCluster = (ActivityCluster) activityClusterGraph.getActivityMap().get(activityId);
                    return (RecordDescriptor) activityCluster.getConnectorRecordDescriptorMap().get(((IConnectorDescriptor) ((List) activityCluster.getActivityInputMap().get(activityId)).get(i)).getConnectorId());
                }
            };
            for (TaskAttemptDescriptor taskAttemptDescriptor : this.taskDescriptors) {
                TaskAttemptId taskAttemptId = taskAttemptDescriptor.getTaskAttemptId();
                TaskId taskId = taskAttemptId.getTaskId();
                ActivityId activityId = taskId.getActivityId();
                ActivityCluster activityCluster = (ActivityCluster) activityClusterGraph.getActivityMap().get(activityId);
                IActivity iActivity = (IActivity) activityCluster.getActivityMap().get(activityId);
                if (LOGGER.isLoggable(Level.INFO)) {
                    LOGGER.info("Initializing " + taskAttemptId + " -> " + iActivity);
                }
                int partition = taskId.getPartition();
                List<IConnectorDescriptor> list = (List) activityCluster.getActivityInputMap().get(activityId);
                Task task = new Task(orCreateLocalJoblet, taskAttemptId, iActivity.getClass().getName(), this.ncs.getExecutor(), this.ncs, createInputChannels(taskAttemptDescriptor, list));
                IOperatorNodePushable createPushRuntime = iActivity.createPushRuntime(task, iRecordDescriptorProvider, partition, taskAttemptDescriptor.getPartitionCount());
                ArrayList arrayList = new ArrayList();
                if (list != null) {
                    for (int i = 0; i < list.size(); i++) {
                        IConnectorDescriptor iConnectorDescriptor = list.get(i);
                        IConnectorPolicy iConnectorPolicy = this.connectorPoliciesMap.get(iConnectorDescriptor.getConnectorId());
                        if (LOGGER.isLoggable(Level.INFO)) {
                            LOGGER.info("input: " + i + ": " + iConnectorDescriptor.getConnectorId());
                        }
                        arrayList.add(createPartitionCollector(taskAttemptDescriptor, partition, task, i, iConnectorDescriptor, (RecordDescriptor) activityCluster.getConnectorRecordDescriptorMap().get(iConnectorDescriptor.getConnectorId()), iConnectorPolicy));
                    }
                }
                List list2 = (List) activityCluster.getActivityOutputMap().get(activityId);
                if (list2 != null) {
                    for (int i2 = 0; i2 < list2.size(); i2++) {
                        IConnectorDescriptor iConnectorDescriptor2 = (IConnectorDescriptor) list2.get(i2);
                        RecordDescriptor recordDescriptor = (RecordDescriptor) activityCluster.getConnectorRecordDescriptorMap().get(iConnectorDescriptor2.getConnectorId());
                        IPartitionWriterFactory createPartitionWriterFactory = createPartitionWriterFactory(task, this.connectorPoliciesMap.get(iConnectorDescriptor2.getConnectorId()), this.jobId, iConnectorDescriptor2, partition, taskAttemptId, this.flags);
                        if (LOGGER.isLoggable(Level.INFO)) {
                            LOGGER.info("output: " + i2 + ": " + iConnectorDescriptor2.getConnectorId());
                        }
                        createPushRuntime.setOutputFrameWriter(i2, iConnectorDescriptor2.createPartitioner(task, recordDescriptor, createPartitionWriterFactory, partition, taskAttemptDescriptor.getPartitionCount(), taskAttemptDescriptor.getOutputPartitionCounts()[i2]), recordDescriptor);
                    }
                }
                task.setTaskRuntime((IPartitionCollector[]) arrayList.toArray(new IPartitionCollector[arrayList.size()]), createPushRuntime);
                orCreateLocalJoblet.addTask(task);
                task.start();
            }
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    private Joblet getOrCreateLocalJoblet(DeploymentId deploymentId, JobId jobId, INCApplicationContext iNCApplicationContext, byte[] bArr) throws Exception {
        Map<JobId, Joblet> jobletMap = this.ncs.getJobletMap();
        Joblet joblet = jobletMap.get(jobId);
        if (joblet == null) {
            if (bArr == null) {
                throw new NullPointerException("JobActivityGraph was null");
            }
            joblet = new Joblet(this.ncs, deploymentId, jobId, iNCApplicationContext, (ActivityClusterGraph) DeploymentUtils.deserialize(bArr, deploymentId, iNCApplicationContext));
            jobletMap.put(jobId, joblet);
        }
        return joblet;
    }

    private IPartitionCollector createPartitionCollector(TaskAttemptDescriptor taskAttemptDescriptor, int i, Task task, int i2, IConnectorDescriptor iConnectorDescriptor, RecordDescriptor recordDescriptor, IConnectorPolicy iConnectorPolicy) throws HyracksDataException {
        IPartitionCollector createPartitionCollector = iConnectorDescriptor.createPartitionCollector(task, recordDescriptor, i, taskAttemptDescriptor.getInputPartitionCounts()[i2], taskAttemptDescriptor.getPartitionCount());
        return iConnectorPolicy.materializeOnReceiveSide() ? new ReceiveSideMaterializingCollector(task, this.ncs.getPartitionManager(), createPartitionCollector, task.getTaskAttemptId(), this.ncs.getExecutor()) : createPartitionCollector;
    }

    private IPartitionWriterFactory createPartitionWriterFactory(final IHyracksTaskContext iHyracksTaskContext, IConnectorPolicy iConnectorPolicy, final JobId jobId, final IConnectorDescriptor iConnectorDescriptor, final int i, final TaskAttemptId taskAttemptId, EnumSet<JobFlag> enumSet) {
        IPartitionWriterFactory iPartitionWriterFactory = iConnectorPolicy.materializeOnSendSide() ? iConnectorPolicy.consumerWaitsForProducerToFinish() ? new IPartitionWriterFactory() { // from class: org.apache.hyracks.control.nc.work.StartTasksWork.2
            public IFrameWriter createFrameWriter(int i2) throws HyracksDataException {
                return new MaterializedPartitionWriter(iHyracksTaskContext, StartTasksWork.this.ncs.getPartitionManager(), new PartitionId(jobId, iConnectorDescriptor.getConnectorId(), i, i2), taskAttemptId, StartTasksWork.this.ncs.getExecutor());
            }
        } : new IPartitionWriterFactory() { // from class: org.apache.hyracks.control.nc.work.StartTasksWork.3
            public IFrameWriter createFrameWriter(int i2) throws HyracksDataException {
                return new MaterializingPipelinedPartition(iHyracksTaskContext, StartTasksWork.this.ncs.getPartitionManager(), new PartitionId(jobId, iConnectorDescriptor.getConnectorId(), i, i2), taskAttemptId, StartTasksWork.this.ncs.getExecutor());
            }
        } : new IPartitionWriterFactory() { // from class: org.apache.hyracks.control.nc.work.StartTasksWork.4
            public IFrameWriter createFrameWriter(int i2) throws HyracksDataException {
                return new PipelinedPartition(iHyracksTaskContext, StartTasksWork.this.ncs.getPartitionManager(), new PartitionId(jobId, iConnectorDescriptor.getConnectorId(), i, i2), taskAttemptId);
            }
        };
        if (enumSet.contains(JobFlag.PROFILE_RUNTIME)) {
            iPartitionWriterFactory = new ProfilingPartitionWriterFactory(iHyracksTaskContext, iConnectorDescriptor, i, iPartitionWriterFactory);
        }
        return iPartitionWriterFactory;
    }

    private List<List<PartitionChannel>> createInputChannels(TaskAttemptDescriptor taskAttemptDescriptor, List<IConnectorDescriptor> list) throws UnknownHostException {
        NetworkAddress[][] inputPartitionLocations = taskAttemptDescriptor.getInputPartitionLocations();
        ArrayList arrayList = new ArrayList();
        if (inputPartitionLocations != null) {
            for (int i = 0; i < inputPartitionLocations.length; i++) {
                ArrayList arrayList2 = new ArrayList();
                if (inputPartitionLocations[i] != null) {
                    for (int i2 = 0; i2 < inputPartitionLocations[i].length; i2++) {
                        NetworkAddress networkAddress = inputPartitionLocations[i][i2];
                        PartitionId partitionId = new PartitionId(this.jobId, list.get(i).getConnectorId(), i2, taskAttemptDescriptor.getTaskAttemptId().getTaskId().getPartition());
                        arrayList2.add(new PartitionChannel(partitionId, new NetworkInputChannel(this.ncs.getNetworkManager(), new InetSocketAddress(InetAddress.getByAddress(networkAddress.lookupIpAddress()), networkAddress.getPort()), partitionId, 5)));
                    }
                }
                arrayList.add(arrayList2);
            }
        }
        return arrayList;
    }
}
