package org.apache.pinot.integration.tests;

import com.google.common.base.Function;
import com.yammer.metrics.core.MetricsRegistry;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
import org.apache.helix.participant.statemachine.StateModelInfo;
import org.apache.helix.participant.statemachine.Transition;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.controller.validation.RealtimeSegmentValidationManager;
import org.apache.pinot.server.realtime.ControllerLeaderLocator;
import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.class */
public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegrationTest {
    private static final int NUM_KAFKA_PARTITIONS = 1;
    private String _serverInstance;
    private HelixManager _serverHelixManager;
    private String _currentSegment;

    /* loaded from: input_file:org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests$FakeServerSegmentStateModelFactory.class */
    public class FakeServerSegmentStateModelFactory extends StateModelFactory<StateModel> {

        @StateModelInfo(states = {"{'OFFLINE', 'ONLINE', 'CONSUMING', 'DROPPED'}"}, initialState = "OFFLINE")
        /* loaded from: input_file:org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests$FakeServerSegmentStateModelFactory$FakeSegmentStateModel.class */
        public class FakeSegmentStateModel extends StateModel {
            public FakeSegmentStateModel() {
            }

            @Transition(from = "OFFLINE", to = "CONSUMING")
            public void onBecomeConsumingFromOffline(Message message, NotificationContext notificationContext) {
                SegmentCompletionIntegrationTests.this._currentSegment = message.getPartitionName();
            }

            @Transition(from = "CONSUMING", to = "ONLINE")
            public void onBecomeOnlineFromConsuming(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "CONSUMING", to = "OFFLINE")
            public void onBecomeOfflineFromConsuming(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "OFFLINE", to = "ONLINE")
            public void onBecomeOnlineFromOffline(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "ONLINE", to = "OFFLINE")
            public void onBecomeOfflineFromOnline(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "OFFLINE", to = "DROPPED")
            public void onBecomeDroppedFromOffline(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "ONLINE", to = "DROPPED")
            public void onBecomeDroppedFromOnline(Message message, NotificationContext notificationContext) {
            }

            @Transition(from = "ERROR", to = "OFFLINE")
            public void onBecomeOfflineFromError(Message message, NotificationContext notificationContext) {
            }
        }

        public FakeServerSegmentStateModelFactory() {
        }

        public StateModel createNewStateModel(String str, String str2) {
            return new FakeSegmentStateModel();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pinot.integration.tests.BaseClusterIntegrationTest
    public int getNumKafkaPartitions() {
        return NUM_KAFKA_PARTITIONS;
    }

    @Override // org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @BeforeClass
    public void setUp() throws Exception {
        startZk();
        startController();
        startBroker();
        startFakeServer();
        startKafka();
        setUpTable(null);
    }

    private void startFakeServer() throws Exception {
        this._serverInstance = "Server_" + NetUtil.getHostAddress() + "_8098";
        this._serverHelixManager = HelixManagerFactory.getZKHelixManager(this._clusterName, this._serverInstance, InstanceType.PARTICIPANT, "localhost:2191");
        this._serverHelixManager.getStateMachineEngine().registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), new FakeServerSegmentStateModelFactory());
        this._serverHelixManager.connect();
        this._serverHelixManager.getClusterManagmentTool().addInstanceTag(this._clusterName, this._serverInstance, TableNameBuilder.REALTIME.tableNameWithType("DefaultTenant"));
        ControllerLeaderLocator.create(this._serverHelixManager);
    }

    @Test
    public void testStopConsumingAndAutoFix() throws Exception {
        final String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentCompletionIntegrationTests.1
            @Nullable
            public Boolean apply(@Nullable Void r5) {
                try {
                    return Boolean.valueOf(((String) SegmentCompletionIntegrationTests.this._helixAdmin.getResourceExternalView(SegmentCompletionIntegrationTests.this._clusterName, tableNameWithType).getStateMap(SegmentCompletionIntegrationTests.this._currentSegment).get(SegmentCompletionIntegrationTests.this._serverInstance)).equals("CONSUMING"));
                } catch (Exception e) {
                    return null;
                }
            }
        }, 60000L, "Failed to reach CONSUMING state");
        ServerSegmentCompletionProtocolHandler serverSegmentCompletionProtocolHandler = new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()));
        SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params();
        params.withOffset(45688L).withSegmentName(this._currentSegment).withReason("RandomReason").withInstanceId(this._serverInstance);
        Assert.assertEquals(serverSegmentCompletionProtocolHandler.segmentStoppedConsuming(params).getStatus(), SegmentCompletionProtocol.ControllerResponseStatus.PROCESSED);
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentCompletionIntegrationTests.2
            @Nullable
            public Boolean apply(@Nullable Void r5) {
                try {
                    return Boolean.valueOf(((String) SegmentCompletionIntegrationTests.this._helixAdmin.getResourceExternalView(SegmentCompletionIntegrationTests.this._clusterName, tableNameWithType).getStateMap(SegmentCompletionIntegrationTests.this._currentSegment).get(SegmentCompletionIntegrationTests.this._serverInstance)).equals("OFFLINE"));
                } catch (Exception e) {
                    return null;
                }
            }
        }, 60000L, "Failed to reach OFFLINE state");
        final String str = this._currentSegment;
        RealtimeSegmentValidationManager realtimeSegmentValidationManager = this._controllerStarter.getRealtimeSegmentValidationManager();
        realtimeSegmentValidationManager.start();
        realtimeSegmentValidationManager.run();
        TestUtils.waitForCondition(new Function<Void, Boolean>() { // from class: org.apache.pinot.integration.tests.SegmentCompletionIntegrationTests.3
            @Nullable
            public Boolean apply(@Nullable Void r6) {
                try {
                    if (SegmentCompletionIntegrationTests.this._currentSegment.equals(str)) {
                        return false;
                    }
                    return Boolean.valueOf(((String) SegmentCompletionIntegrationTests.this._helixAdmin.getResourceExternalView(SegmentCompletionIntegrationTests.this._clusterName, tableNameWithType).getStateMap(SegmentCompletionIntegrationTests.this._currentSegment).get(SegmentCompletionIntegrationTests.this._serverInstance)).equals("CONSUMING") && new LLCSegmentName(SegmentCompletionIntegrationTests.this._currentSegment).getSequenceNumber() == new LLCSegmentName(str).getSequenceNumber() + SegmentCompletionIntegrationTests.NUM_KAFKA_PARTITIONS);
                } catch (Exception e) {
                    return null;
                }
            }
        }, 60000L, "Failed to get a new segment reaching CONSUMING state");
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testQueriesFromQueryFile() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testGeneratedQueriesWithMultiValues() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testInstanceShutdown() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testSegmentFlushSize() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testDictionaryBasedQueries() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest, org.apache.pinot.integration.tests.BaseClusterIntegrationTestSet
    @Test(enabled = false)
    public void testQueryExceptions() throws Exception {
    }

    @Override // org.apache.pinot.integration.tests.LLCRealtimeClusterIntegrationTest
    @Test(enabled = false)
    public void testConsumerDirectoryExists() {
    }

    @Override // org.apache.pinot.integration.tests.RealtimeClusterIntegrationTest
    @AfterClass
    public void tearDown() throws Exception {
        stopFakeServer();
        stopBroker();
        stopController();
        stopZk();
    }

    private void stopFakeServer() {
        this._serverHelixManager.disconnect();
    }
}
