package kafka.restore;

import io.confluent.kafka.storage.checksum.E2EChecksumStore;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import kafka.restore.db.Job;
import kafka.restore.messages.AsyncTaskRequest;
import kafka.restore.messages.KafkaRequest;
import kafka.restore.messages.Message;
import kafka.restore.messages.MessageRequest;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver;
import kafka.restore.schedulers.AsyncTaskScheduler;
import kafka.restore.schedulers.KafkaConnectionPoolImpl;
import kafka.restore.schedulers.KafkaManager;
import kafka.restore.schedulers.ObjectStoreManager;
import kafka.restore.schedulers.ObjectStorePoolImpl;
import kafka.restore.snapshot.FtpsStateForRestore;
import kafka.restore.snapshot.PointInTimeTierPartitionStateBuilder;
import kafka.restore.snapshot.TierTopicConsumerForRestore;
import kafka.restore.statemachine.StateMachineController;
import kafka.restore.statemachine.api.FiniteStateMachine;
import kafka.restore.statemachine.api.State;
import kafka.restore.statemachine.events.KafkaRestoreEvent;
import kafka.server.KafkaConfig;
import kafka.tier.TopicIdPartition;
import kafka.tier.store.AzureBlockBlobTierObjectStore;
import kafka.tier.store.AzureBlockBlobTierObjectStoreConfig;
import kafka.tier.store.GcsTierObjectStore;
import kafka.tier.store.GcsTierObjectStoreConfig;
import kafka.tier.store.MockInMemoryTierObjectStore;
import kafka.tier.store.MockInMemoryTierObjectStoreConfig;
import kafka.tier.store.S3TierObjectStore;
import kafka.tier.store.S3TierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/restore/RestoreOrchestrator.class */
public class RestoreOrchestrator implements AsyncServiceSchedulerResultsReceiver, MessageEmitter {
    private static final Logger LOGGER = LoggerFactory.getLogger(RestoreOrchestrator.class);
    private static final String DEFAULT_FTPS_FILE_DIRECTORY = "/mnt/ftps";
    private volatile OrchestratorStatus status;
    private StateMachineController stateMachineController;
    private KafkaManager kafkaManager;
    private ObjectStoreManager objectStoreManager;
    private final PointInTimeTierPartitionStateBuilder ftpsBuilder;
    private Thread responseConsumerThread;
    private final RestoreMetricsManager restoreMetricsManager;
    private Time time;
    private TierTopicConsumerForRestore tierTopicConsumerForRestore;
    private int restoreParallelism = Integer.parseInt(RestoreConfig.getProperty(RestoreConfig.RESTORE_PARALLELISM));
    private final ArrayBlockingQueue<Message> responsesQueue = new ArrayBlockingQueue<>(this.restoreParallelism);
    private AsyncTaskScheduler asyncTaskScheduler = new AsyncTaskScheduler(this, this.restoreParallelism, new RestorePartitionOperatorFactory());

    public RestoreOrchestrator(RestoreMetricsManager restoreMetricsManager, Time time) {
        this.restoreMetricsManager = restoreMetricsManager;
        this.time = time;
        this.kafkaManager = new KafkaManager(this, new KafkaConnectionPoolImpl(this, Math.min(this.restoreParallelism, 20), DEFAULT_FTPS_FILE_DIRECTORY, restoreMetricsManager, time));
        ThreadPoolExecutor createThreadPool = RestoreUtil.createThreadPool(this.restoreParallelism * 10, 50);
        TierObjectStore tierObjectStore = getTierObjectStore();
        this.objectStoreManager = new ObjectStoreManager(this, new ObjectStorePoolImpl(this, createThreadPool, tierObjectStore, restoreMetricsManager, time));
        this.ftpsBuilder = new PointInTimeTierPartitionStateBuilder(tierObjectStore, createThreadPool, restoreMetricsManager);
        this.status = OrchestratorStatus.NOT_STARTED;
    }

    @Override // kafka.restore.schedulers.AsyncServiceSchedulerResultsReceiver
    public void reportServiceSchedulerResponse(MessageResponse messageResponse) {
        try {
            this.responsesQueue.put(messageResponse);
        } catch (InterruptedException e) {
            LOGGER.warn("interrupted while adding response int responseQueue: " + messageResponse, e);
        }
    }

    public synchronized boolean startUp() throws Exception {
        if (this.status == OrchestratorStatus.RUNNING) {
            LOGGER.debug("Orchestrator is already running, no action");
            return true;
        }
        if (this.status != OrchestratorStatus.NOT_STARTED && this.status != OrchestratorStatus.SHUTDOWN) {
            LOGGER.error("startUp() can only be called on a service scheduler that has not been started or is shutdown.");
            return false;
        }
        if (!this.kafkaManager.startUp() || !this.asyncTaskScheduler.startUp() || !this.objectStoreManager.startUp()) {
            return false;
        }
        this.status = OrchestratorStatus.RUNNING;
        startResponseQueueConsumerThread();
        return true;
    }

    public synchronized boolean shutdown() {
        if (this.status == OrchestratorStatus.PAUSED || this.status == OrchestratorStatus.ERROR) {
            this.status = OrchestratorStatus.SHUTDOWN;
            return true;
        }
        if (this.status == OrchestratorStatus.SHUTDOWN) {
            LOGGER.info("RestoreOrchestrator is already shutdown");
            return true;
        }
        if (this.status != OrchestratorStatus.RUNNING) {
            LOGGER.error(String.format("shutdown() can only be called when status is running, paused, or in error state, current status: %s", this.status));
            return false;
        }
        LOGGER.info("Shutting down RestoreOrchestrator");
        if (!this.kafkaManager.shutdown() || !this.asyncTaskScheduler.shutdown() || !this.objectStoreManager.shutdown()) {
            LOGGER.error("Shutdown async service schedulers failed");
            this.status = OrchestratorStatus.ERROR;
            return false;
        }
        try {
            if (this.responseConsumerThread != null && this.responseConsumerThread.isAlive()) {
                this.responseConsumerThread.interrupt();
                this.responseConsumerThread.join();
            }
            if (this.tierTopicConsumerForRestore != null) {
                this.tierTopicConsumerForRestore.shutdown();
            }
            this.status = OrchestratorStatus.SHUTDOWN;
            LOGGER.info("RestoreOrchestrator is shutdown");
            return true;
        } catch (InterruptedException e) {
            LOGGER.error("shutdown() was interrupted prior to background thread terminating", e);
            this.status = OrchestratorStatus.ERROR;
            return false;
        }
    }

    public synchronized void forceShutdown() {
        this.kafkaManager.shutdown();
        this.asyncTaskScheduler.shutdown();
        this.objectStoreManager.shutdown();
        this.responseConsumerThread.interrupt();
        this.status = OrchestratorStatus.SHUTDOWN;
    }

    public synchronized boolean pause() throws InterruptedException {
        LOGGER.info("Pausing RestoreOrchestrator");
        if (this.status != OrchestratorStatus.RUNNING) {
            LOGGER.error("pause() can only be called on service scheduler that is running.");
            return false;
        }
        this.status = OrchestratorStatus.PAUSING;
        try {
            if (this.responseConsumerThread != null && this.responseConsumerThread.isAlive()) {
                this.responseConsumerThread.interrupt();
                this.responseConsumerThread.join();
            }
            if (this.kafkaManager.pause() && this.asyncTaskScheduler.pause() && this.objectStoreManager.pause()) {
                this.status = OrchestratorStatus.PAUSED;
                LOGGER.info("RestoreOrchestrator is Paused");
                return true;
            }
            LOGGER.error("Pause async service schedulers failed");
            this.status = OrchestratorStatus.ERROR;
            return false;
        } catch (InterruptedException e) {
            LOGGER.error("Pause was interrupted prior to completing", e);
            this.status = OrchestratorStatus.ERROR;
            return false;
        }
    }

    public synchronized boolean resume() {
        if (this.status != OrchestratorStatus.PAUSED) {
            LOGGER.error("resume() can only be called on service scheduler in PAUSED state.");
            return false;
        }
        LOGGER.info("Resuming RestoreOrchestrator");
        if (!this.kafkaManager.resume() || !this.asyncTaskScheduler.resume() || !this.objectStoreManager.resume()) {
            LOGGER.error("resume async service schedulers failed");
            this.status = OrchestratorStatus.ERROR;
            return false;
        }
        this.status = OrchestratorStatus.RUNNING;
        startResponseQueueConsumerThread();
        LOGGER.info("RestoreOrchestrator is running");
        return true;
    }

    public synchronized OrchestratorStatus getStatus() {
        return this.status;
    }

    private void startResponseQueueConsumerThread() {
        LOGGER.info("start ResponseQueue Consumer Thread");
        this.responseConsumerThread = new Thread(this::consumeMessagesFromResponseQueue);
        this.responseConsumerThread.start();
    }

    private void consumeMessagesFromResponseQueue() {
        LOGGER.info("start consuming response queue");
        while (this.status == OrchestratorStatus.RUNNING) {
            try {
                Message take = this.responsesQueue.take();
                LOGGER.debug("take one message from response queue: " + take);
                if (take instanceof MessageRequest) {
                    submitRequest((MessageRequest) take);
                } else if (take instanceof MessageResponse) {
                    handleResponse((MessageResponse) take);
                }
            } catch (InterruptedException e) {
                LOGGER.info("ResponseConsumer loop interrupted, exit the loop.", e);
            } catch (Exception e2) {
                LOGGER.error("Unexpected exception caught in response consuming loop.", e2);
            }
        }
        LOGGER.info("Stop consuming response queue");
    }

    private void handleResponse(MessageResponse messageResponse) {
        TopicPartition topicPartition = new TopicPartition(messageResponse.getTopic(), messageResponse.getPartition());
        FiniteStateMachine finiteStateMachineByTopicPartition = this.stateMachineController.getFiniteStateMachineByTopicPartition(topicPartition);
        if (finiteStateMachineByTopicPartition == null) {
            LOGGER.warn("FiniteStateMachine not found for " + topicPartition + ", skip.");
            return;
        }
        State fire = finiteStateMachineByTopicPartition.fire(new KafkaRestoreEvent(messageResponse));
        if (fire == State.FAILED) {
            LOGGER.warn(String.format("[%s]: restore failed", topicPartition));
            this.stateMachineController.moveToFailMap(topicPartition);
            if (this.stateMachineController.inProgressCount() < this.restoreParallelism) {
                pickOnePartitionToStartRestore();
            }
        } else if (fire == State.END_STATE) {
            this.stateMachineController.moveToCompleteSet(topicPartition);
            LOGGER.info(String.format("[%s]: restore completed", topicPartition));
            if (this.stateMachineController.inProgressCount() < this.restoreParallelism) {
                pickOnePartitionToStartRestore();
            }
        }
        if (fire == State.FAILED || fire == State.END_STATE) {
            finiteStateMachineByTopicPartition.cleanup();
        }
    }

    @Override // kafka.restore.MessageEmitter
    public void submitRequest(MessageRequest messageRequest) {
        boolean z = false;
        AbstractAsyncServiceScheduler abstractAsyncServiceScheduler = null;
        if (messageRequest instanceof KafkaRequest) {
            abstractAsyncServiceScheduler = this.kafkaManager;
        } else if (messageRequest instanceof AsyncTaskRequest) {
            abstractAsyncServiceScheduler = this.asyncTaskScheduler;
        } else if (messageRequest instanceof ObjectStoreRequest) {
            abstractAsyncServiceScheduler = this.objectStoreManager;
        } else {
            LOGGER.warn(String.format("[%s]: Request type not support: %s, skip", messageRequest.getTopicPartition(), messageRequest.getClass().getSimpleName()));
            z = true;
        }
        if (abstractAsyncServiceScheduler != null) {
            LOGGER.info(String.format("[%s]: submit request to scheduler: %s", messageRequest.getTopicPartition(), messageRequest));
            if (MessageStatusCode.SCHEDULED == abstractAsyncServiceScheduler.submitRequest(messageRequest)) {
                z = true;
            } else {
                LOGGER.warn(String.format("[%s]: submit request to scheduler failed, maybe because request queue full.", messageRequest.getTopicPartition()));
            }
        }
        if (z) {
            return;
        }
        try {
            LOGGER.info(String.format("[%s]: submit request to responseQueue: %s", messageRequest.getTopicPartition(), messageRequest));
            this.responsesQueue.put(messageRequest);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public StateMachineController stateMachineController() {
        return this.stateMachineController;
    }

    public void setKafkaManager(KafkaManager kafkaManager) {
        this.kafkaManager = kafkaManager;
    }

    public void setAsyncTaskScheduler(AsyncTaskScheduler asyncTaskScheduler) {
        this.asyncTaskScheduler = asyncTaskScheduler;
    }

    public void setObjectStoreManager(ObjectStoreManager objectStoreManager) {
        this.objectStoreManager = objectStoreManager;
    }

    public void runRestoreJob(Job job) {
        new Thread(() -> {
            Map<TopicIdPartition, FtpsStateForRestore> hashMap = new HashMap();
            try {
                job.status = Job.JobStatus.IN_PROGRESS;
                hashMap = buildFtpsFiles(job);
            } catch (Exception e) {
                LOGGER.error("build ftps files for restore failed", e);
            }
            mayStartTierTopicConsumer(hashMap, job);
            this.stateMachineController = new StateMachineController(job, this, this.restoreMetricsManager, hashMap);
            this.restoreMetricsManager.startRestoreRecord();
            int min = Math.min(this.restoreParallelism, this.stateMachineController.waitingCount());
            for (int i = 0; i < min; i++) {
                pickOnePartitionToStartRestore();
            }
        }).start();
    }

    private void mayStartTierTopicConsumer(Map<TopicIdPartition, FtpsStateForRestore> map, Job job) {
        if (map.size() == 0) {
            return;
        }
        Map<Integer, Long> calculateTierTopicLastMaterializedOffsets = PointInTimeTierPartitionStateBuilder.calculateTierTopicLastMaterializedOffsets(map);
        if (job.getBrokerConnectionString() == null) {
            LOGGER.warn("no broker connection string found in job, can't initialize TierTopicConsumerForRestore");
            return;
        }
        this.tierTopicConsumerForRestore = new TierTopicConsumerForRestore(job.getBrokerConnectionString(), calculateTierTopicLastMaterializedOffsets, map.keySet());
        this.tierTopicConsumerForRestore.initialize();
        this.tierTopicConsumerForRestore.start();
        map.forEach((topicIdPartition, ftpsStateForRestore) -> {
            ftpsStateForRestore.liveConsumerRecords = this.tierTopicConsumerForRestore.getRecords(topicIdPartition);
        });
    }

    private void pickOnePartitionToStartRestore() {
        this.restoreMetricsManager.record(RestoreMetricsManager.RESTORE_PARTITIONS_WAITING_COUNT, this.stateMachineController.waitingCount());
        this.restoreMetricsManager.record(RestoreMetricsManager.RESTORE_PARTITIONS_IN_PROGRESS_COUNT, this.stateMachineController.inProgressCount());
        this.restoreMetricsManager.record(RestoreMetricsManager.RESTORE_PARTITIONS_FAILED_COUNT, this.stateMachineController.failedCount());
        this.restoreMetricsManager.record(RestoreMetricsManager.RESTORE_PARTITIONS_COMPLETED_COUNT, this.stateMachineController.completedCount());
        FiniteStateMachine pickOneNewPartitionToStartRestore = this.stateMachineController.pickOneNewPartitionToStartRestore();
        if (pickOneNewPartitionToStartRestore != null) {
            submitRequest(StateMachineController.buildPreConditionCheckRequest(pickOneNewPartitionToStartRestore));
            this.restoreMetricsManager.update(RestoreMetricsManager.RESTORE_STARTED, 1L);
        }
    }

    private Map<TopicIdPartition, FtpsStateForRestore> buildFtpsFiles(Job job) throws IOException, InterruptedException {
        if (job.partitionRestoreContextMap == null || job.partitionRestoreContextMap.size() == 0) {
            LOGGER.info("partition list is empty");
            return Collections.emptyMap();
        }
        long hiResClockMs = this.time.hiResClockMs();
        Map<TopicIdPartition, FtpsStateForRestore> buildFtpsFromSnapshot = this.ftpsBuilder.buildFtpsFromSnapshot(job.partitionRestoreContextMap);
        this.restoreMetricsManager.record(RestoreMetricsManager.RESTORE_TIME_TO_BUILD_FTPS_STATES_FROM_SNAPSHOTS_MS, this.time.hiResClockMs() - hiResClockMs);
        return buildFtpsFromSnapshot;
    }

    private TierObjectStore getTierObjectStore() {
        KafkaConfig kafkaConfig = RestoreConfig.kafkaConfig();
        TierObjectStore tierObjectStore = null;
        if (kafkaConfig != null && kafkaConfig.confluentConfig() != null && kafkaConfig.confluentConfig().tierFeature().booleanValue()) {
            String tierBackend = kafkaConfig.confluentConfig().tierBackend();
            boolean z = -1;
            switch (tierBackend.hashCode()) {
                case -500329445:
                    if (tierBackend.equals("AzureBlockBlob")) {
                        z = 2;
                        break;
                    }
                    break;
                case 2624:
                    if (tierBackend.equals("S3")) {
                        z = false;
                        break;
                    }
                    break;
                case 70391:
                    if (tierBackend.equals("GCS")) {
                        z = true;
                        break;
                    }
                    break;
                case 3357066:
                    if (tierBackend.equals("mock")) {
                        z = 3;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    tierObjectStore = new S3TierObjectStore(new S3TierObjectStoreConfig(Optional.empty(), kafkaConfig), Optional.empty());
                    break;
                case true:
                    tierObjectStore = new GcsTierObjectStore(Time.SYSTEM, (Metrics) null, new GcsTierObjectStoreConfig(Optional.empty(), kafkaConfig), (Optional<E2EChecksumStore>) Optional.empty());
                    break;
                case true:
                    tierObjectStore = new AzureBlockBlobTierObjectStore(new AzureBlockBlobTierObjectStoreConfig(Optional.empty(), kafkaConfig));
                    break;
                case true:
                    tierObjectStore = new MockInMemoryTierObjectStore(Time.SYSTEM, (Metrics) null, new MockInMemoryTierObjectStoreConfig((Optional<String>) Optional.empty(), kafkaConfig));
                    break;
                default:
                    throw new IllegalStateException(String.format("Unknown TierObjectStore type: %s", kafkaConfig.confluentConfig().tierBackend()));
            }
        }
        return tierObjectStore;
    }
}
