package kafka.restore.schedulers;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import kafka.restore.messages.MessageResponse;
import kafka.restore.messages.MessageResult;
import kafka.restore.messages.MessageStatusCode;
import kafka.restore.messages.ObjectStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreRequest;
import kafka.restore.messages.RestoreObjectsInStoreResponse;
import kafka.restore.messages.UploadFtpsToStoreRequest;
import kafka.restore.messages.UploadFtpsToStoreResponse;
import kafka.restore.schedulers.AbstractAsyncServiceScheduler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:kafka/restore/schedulers/ObjectStoreManagerTest.class */
public class ObjectStoreManagerTest implements AsyncServiceSchedulerResultsReceiver {
    private static final int RESPONSE_QUEUE_CAPACITY = 1;
    private static final String SEGMENT_OBJECT_STORE_PATH = "fakePath";
    private static final int REQUEST_UUID = 0;
    private static final String TOPIC = "fakeTopic";
    private static final int PARTITION = 1;
    private static final String FTPS_FILE_PATH = "ftpsFilePath";
    private ArrayBlockingQueue<MessageResponse> responseQueue;
    private ObjectStorePool objectStorePool;
    private ObjectStoreManager objectStoreManager;

    @BeforeEach
    public void setUp() {
        this.responseQueue = new ArrayBlockingQueue<>(1);
        this.objectStorePool = (ObjectStorePool) Mockito.mock(ObjectStorePool.class);
        this.objectStoreManager = new ObjectStoreManager(this, this.objectStorePool);
        this.objectStoreManager.startUp();
        Assertions.assertNotNull(this.objectStoreManager);
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.objectStoreManager.getStatus());
        this.objectStoreManager.pause();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.PAUSED, this.objectStoreManager.getStatus());
        this.objectStoreManager.resume();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.RUNNING, this.objectStoreManager.getStatus());
    }

    @AfterEach
    public void shutdown() {
        this.objectStoreManager.shutdown();
        Assertions.assertEquals(AbstractAsyncServiceScheduler.AsyncServiceSchedulerStatus.SHUTDOWN, this.objectStoreManager.getStatus());
    }

    @Test
    public void testSubmitNonBatchedRestoreObjectsInStoreRequest() {
        RestoreObjectsInStoreRequest constructRestoreObjectsInStoreRequest = constructRestoreObjectsInStoreRequest(198);
        mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest(constructRestoreObjectsInStoreRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(RestoreObjectsInStoreResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, TOPIC, 1, constructRestoreObjectsInStoreRequest.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    @Test
    public void testSubmitBatchedRestoreObjectsInStoreRequest() {
        RestoreObjectsInStoreRequest constructRestoreObjectsInStoreRequest = constructRestoreObjectsInStoreRequest(200 * 4);
        mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest(constructRestoreObjectsInStoreRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(RestoreObjectsInStoreResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, TOPIC, 1, constructRestoreObjectsInStoreRequest.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    @Test
    public void testSubmitUploadFtpsToStoreRequest() {
        UploadFtpsToStoreRequest constructUploadFtpsToStoreRequest = constructUploadFtpsToStoreRequest();
        mockObjectStorePoolToReplyWithSuccessfulResponse();
        this.objectStoreManager.submitRequest(constructUploadFtpsToStoreRequest);
        MessageResponse nextMessageResponse = getNextMessageResponse();
        Assertions.assertEquals(UploadFtpsToStoreResponse.class, nextMessageResponse.getClass());
        assertCorrectResponse(nextMessageResponse, TOPIC, 1, constructUploadFtpsToStoreRequest.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
    }

    private Map<UUID, String> createSegmentPathMap(int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(UUID.randomUUID(), SEGMENT_OBJECT_STORE_PATH);
        }
        return hashMap;
    }

    private MessageResponse getNextMessageResponse() {
        try {
            return this.responseQueue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException("KafkaManager results receiver was interrupted prior to receiving result.");
        }
    }

    private RestoreObjectsInStoreRequest constructRestoreObjectsInStoreRequest(int i) {
        return new RestoreObjectsInStoreRequest(0, TOPIC, 1, createSegmentPathMap(i));
    }

    private UploadFtpsToStoreRequest constructUploadFtpsToStoreRequest() {
        return new UploadFtpsToStoreRequest(0, TOPIC, 1, FTPS_FILE_PATH);
    }

    private void assertCorrectResponse(MessageResponse messageResponse, String str, int i, int i2, MessageStatusCode messageStatusCode, MessageResult messageResult) {
        Assertions.assertEquals(str, messageResponse.getTopic());
        Assertions.assertEquals(i, messageResponse.getPartition());
        Assertions.assertEquals(i2, messageResponse.getRequestID());
        Assertions.assertEquals(messageStatusCode, messageResponse.getStatusCode());
        Assertions.assertEquals(messageResult, messageResponse.getResult());
    }

    private void mockObjectStorePoolToReplyWithSuccessfulResponse() {
        ((ObjectStorePool) Mockito.doAnswer(new Answer() { // from class: kafka.restore.schedulers.ObjectStoreManagerTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                RestoreObjectsInStoreResponse uploadFtpsToStoreResponse;
                ObjectStoreRequest objectStoreRequest = (ObjectStoreRequest) invocationOnMock.getArgument(0);
                if (objectStoreRequest instanceof RestoreObjectsInStoreRequest) {
                    uploadFtpsToStoreResponse = new RestoreObjectsInStoreResponse(0, ObjectStoreManagerTest.TOPIC, 1, objectStoreRequest.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS, new HashSet());
                } else {
                    if (!(objectStoreRequest instanceof UploadFtpsToStoreRequest)) {
                        throw new RuntimeException("ObjectStorePool received request of unrecognized type.");
                    }
                    uploadFtpsToStoreResponse = new UploadFtpsToStoreResponse(0, ObjectStoreManagerTest.TOPIC, 1, objectStoreRequest.getUuid(), MessageStatusCode.OK, MessageResult.SUCCESS);
                }
                ObjectStoreManagerTest.this.objectStoreManager.reportServiceSchedulerResponse(uploadFtpsToStoreResponse);
                return null;
            }
        }).when(this.objectStorePool)).submitObjectStoreRequest((ObjectStoreRequest) Mockito.any());
    }

    public void reportServiceSchedulerResponse(MessageResponse messageResponse) {
        try {
            this.responseQueue.put(messageResponse);
        } catch (InterruptedException e) {
            throw new RuntimeException("S3Manager results receiver was interrupted while result was being reported");
        } catch (Exception e2) {
            throw new RuntimeException("S3Manager results receiver failed while result was being reported");
        }
    }
}
