package com.twitter.finatra.kafkastreams.prerestore;

import com.twitter.app.App;
import com.twitter.app.Flag;
import com.twitter.app.Flaggable$;
import com.twitter.conversions.DurationOps$;
import com.twitter.conversions.DurationOps$RichDuration$;
import com.twitter.finatra.annotations.Experimental;
import com.twitter.finatra.kafkastreams.internal.utils.ReflectionUtils$;
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning;
import com.twitter.finatra.kafkastreams.partitioning.StaticPartitioning$;
import com.twitter.util.Duration;
import com.twitter.util.TimeLike;
import com.twitter.util.logging.Logging;
import java.lang.reflect.Field;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.joda.time.DateTimeUtils;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.mutable.ArrayOps;
import scala.math.Numeric$DoubleIsFractional$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: PreRestoreState.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u0015faB\u0001\u0003!\u0003\r\t!\u0004\u0002\u0010!J,'+Z:u_J,7\u000b^1uK*\u00111\u0001B\u0001\u000baJ,'/Z:u_J,'BA\u0003\u0007\u00031Y\u0017MZ6bgR\u0014X-Y7t\u0015\t9\u0001\"A\u0004gS:\fGO]1\u000b\u0005%Q\u0011a\u0002;xSR$XM\u001d\u0006\u0002\u0017\u0005\u00191m\\7\u0004\u0001M\u0019\u0001A\u0004\n\u0011\u0005=\u0001R\"\u0001\u0003\n\u0005E!!!G&bM.\f7\u000b\u001e:fC6\u001cHk^5ui\u0016\u00148+\u001a:wKJ\u0004\"a\u0005\f\u000e\u0003QQ!!\u0006\u0003\u0002\u0019A\f'\u000f^5uS>t\u0017N\\4\n\u0005]!\"AE*uCRL7\rU1si&$\u0018n\u001c8j]\u001eDQ!\u0007\u0001\u0005\u0002i\ta\u0001J5oSR$C#A\u000e\u0011\u0005qyR\"A\u000f\u000b\u0003y\tQa]2bY\u0006L!\u0001I\u000f\u0003\tUs\u0017\u000e\u001e\u0005\bE\u0001\u0011\r\u0011\"\u0003$\u0003=\u0001(/\u001a*fgR|'/Z*uCR,W#\u0001\u0013\u0011\u0007\u0015B#&D\u0001'\u0015\t9\u0003\"A\u0002baBL!!\u000b\u0014\u0003\t\u0019c\u0017m\u001a\t\u00039-J!\u0001L\u000f\u0003\u000f\t{w\u000e\\3b]\"9a\u0006\u0001b\u0001\n\u0013y\u0013A\b9sKJ+7\u000f^8sK\u0012+(/\u0019;j_:Le.\u001b;jC2$U\r\\1z+\u0005\u0001\u0004cA\u0013)cA\u0011!'N\u0007\u0002g)\u0011A\u0007C\u0001\u0005kRLG.\u0003\u00027g\tAA)\u001e:bi&|g\u000e\u0003\u00049\u0001\u0011UcAG\u0001\u001bGJ,\u0017\r^3B]\u0012\u001cF/\u0019:u\u0017\u000647.Y*ue\u0016\fWn\u001d\u0005\u0006u\u0001!IaO\u0001\u001egR\f'\u000f^,bSR4uN\u001d)sKJ+7\u000f^8sK\u0012$\u0006N]3bIR\u00111\u0004\u0010\u0005\u0006{e\u0002\rAP\u0001\u0017aJ,'+Z:u_J,7*\u00194lCN#(/Z1ngB\u0011q\bS\u0007\u0002\u0001*\u0011\u0011IQ\u0001\bgR\u0014X-Y7t\u0015\t\u0019E)A\u0003lC\u001a\\\u0017M\u0003\u0002F\r\u00061\u0011\r]1dQ\u0016T\u0011aR\u0001\u0004_J<\u0017BA%A\u00051Y\u0015MZ6b'R\u0014X-Y7t\u0011\u0015Y\u0005\u0001\"\u0003M\u0003e9\u0018-\u001b;G_J\u0004&/\u001a*fgR|'/\u001a$j]&\u001c\b.\u001a3\u0015\u0005mi\u0005\"B\u001fK\u0001\u0004q\u0004\"B(\u0001\t\u0013\u0001\u0016a\u00054j]\u0012$v\u000e^1m%\u0016\u001cHo\u001c:f\u0019\u0006<GCA)U!\ta\"+\u0003\u0002T;\t1Ai\\;cY\u0016DQ!\u0010(A\u0002yBQA\u0016\u0001\u0005\n]\u000bQdY8oM&<WO]3Qe\u0016\u0014Vm\u001d;pe\u0016\u0004&o\u001c9feRLWm\u001d\u000b\u00031~\u0003\"!W/\u000e\u0003iS!\u0001N.\u000b\u0003q\u000bAA[1wC&\u0011aL\u0017\u0002\u000b!J|\u0007/\u001a:uS\u0016\u001c\b\"\u00021V\u0001\u0004A\u0016A\u00039s_B,'\u000f^5fg\")!\r\u0001C\u00055\u0005\u0019\"/Z:fiN#(/Z1n)\"\u0014X-\u00193JI\")A\r\u0001C\u0005K\u0006ib-\u001b8e%\u0016\u001cHo\u001c:f\u0007>t7/^7fe2\u000bw-T3ue&\u001c7\u000f\u0006\u0002gqB\u0019qm\u001c:\u000f\u0005!lgBA5m\u001b\u0005Q'BA6\r\u0003\u0019a$o\\8u}%\ta$\u0003\u0002o;\u00059\u0001/Y2lC\u001e,\u0017B\u00019r\u0005\r\u0019V-\u001d\u0006\u0003]v\u0001\"a\u001d<\u000e\u0003QT!!\u001e\"\u0002\r\r|W.\\8o\u0013\t9HO\u0001\u0004NKR\u0014\u0018n\u0019\u0005\u0006s\u000e\u0004\rAP\u0001\rW\u000647.Y*ue\u0016\fWn\u001d\u0005\u0006w\u0002!I\u0001`\u0001\u0016M&tGmQ8ogVlWM\u001d'bO6+GO]5d)\ri\u0018Q\u0002\t\u00059y\f\t!\u0003\u0002��;\t1q\n\u001d;j_:\u0004b\u0001HA\u0002\u0003\u000f\u0011\u0018bAA\u0003;\t1A+\u001e9mKJ\u00022a]A\u0005\u0013\r\tY\u0001\u001e\u0002\u000b\u001b\u0016$(/[2OC6,\u0007bBA\bu\u0002\u0007\u0011\u0011C\u0001\u0010e\u0016\u001cHo\u001c:f\u0007>t7/^7feBA\u00111CA\u000f\u0003C\t\t#\u0004\u0002\u0002\u0016)!\u0011qCA\r\u0003!\u0019wN\\:v[\u0016\u0014(bAA\u000e\u0005\u000691\r\\5f]R\u001c\u0018\u0002BA\u0010\u0003+\u0011\u0001bQ8ogVlWM\u001d\t\u00069\u0005\r\u0012qE\u0005\u0004\u0003Ki\"!B!se\u0006L\bc\u0001\u000f\u0002*%\u0019\u00111F\u000f\u0003\t\tKH/\u001a\u0005\b\u0003_\u0001A\u0011BA\u0019\u0003)9W\r\u001e+ie\u0016\fGm\u001d\u000b\u0005\u0003g\t)\u0005E\u0003\u001d\u0003G\t)\u0004\u0005\u0003\u00028\u0005\u0005SBAA\u001d\u0015\u0011\tY$!\u0010\u0002\u0013%tG/\u001a:oC2\u001c(bAA \u0001\u0006I\u0001O]8dKN\u001cxN]\u0005\u0005\u0003\u0007\nID\u0001\u0007TiJ,\u0017-\u001c+ie\u0016\fG\r\u0003\u0004z\u0003[\u0001\rA\u0010\u0005\b\u0003\u0013\u0002A\u0011BA&\u0003I9W\r\u001e*fgR|'/Z\"p]N,X.\u001a:\u0015\t\u0005E\u0011Q\n\u0005\t\u0003\u001f\n9\u00051\u0001\u00026\u00051A\u000f\u001b:fC\u0012Da\"a\u0015\u0001!\u0003\r\t\u0011!C\u0005\u0003+\n9&\u0001\ttkB,'\u000f\n9s_B,'\u000f^5fgV\t\u0001,\u0003\u0002a!!q\u00111\f\u0001\u0011\u0002\u0007\u0005\t\u0011\"\u0003\u0002^\u0005\u0015\u0014AD:va\u0016\u0014H\u0005^8q_2|w-_\u000b\u0003\u0003?\u00022aPA1\u0013\r\t\u0019\u0007\u0011\u0002\t)>\u0004x\u000e\\8hs&\u0019\u0011q\r\t\u0002\u0011Q|\u0007o\u001c7pOfDa\"a\u001b\u0001!\u0003\r\t\u0011!C\u0005\u0003[\n\t(A\rtkB,'\u000fJ:fi\u0016C8-\u001a9uS>t\u0007*\u00198eY\u0016\u0014HcA\u000e\u0002p!1\u0011)!\u001bA\u0002yJ1!a\u001d\u0011\u0003M\u0019X\r^#yG\u0016\u0004H/[8o\u0011\u0006tG\r\\3s\u00115\t9\b\u0001I\u0001\u0004\u0003\u0005I\u0011\u0002\u000e\u0002z\u0005\u00013/\u001e9fe\u0012\u001a'/Z1uK\u0006sGm\u0015;beR\\\u0015MZ6b'R\u0014X-Y7t\u0013\tA\u0004\u0003\u0003\b\u0002~\u0001\u0001\n1!A\u0001\n\u0013\ty(a%\u0002;M,\b/\u001a:%CB\u0004H.[2bi&|gnU3sm\u0016\u00148i\u001c8gS\u001e,\"!!!\u0011\t\u0015B\u00131\u0011\t\u0005\u0003\u000b\u000biI\u0004\u0003\u0002\b\u0006%\u0005CA5\u001e\u0013\r\tY)H\u0001\u0007!J,G-\u001a4\n\t\u0005=\u0015\u0011\u0013\u0002\u0007'R\u0014\u0018N\\4\u000b\u0007\u0005-U$C\u0002\u0002\u0016B\tq#\u00199qY&\u001c\u0017\r^5p]N+'O^3s\u0007>tg-[4)\u0007\u0001\tI\n\u0005\u0003\u0002\u001c\u0006\u0005VBAAO\u0015\r\tyJB\u0001\fC:tw\u000e^1uS>t7/\u0003\u0003\u0002$\u0006u%\u0001D#ya\u0016\u0014\u0018.\\3oi\u0006d\u0007")
@Experimental
/* loaded from: input_file:com/twitter/finatra/kafkastreams/prerestore/PreRestoreState.class */
public interface PreRestoreState extends StaticPartitioning {
    void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState_$eq(Flag<Object> flag);

    void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay_$eq(Flag<Duration> flag);

    /* synthetic */ Properties com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$properties();

    /* synthetic */ Topology com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$topology();

    /* synthetic */ void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$setExceptionHandler(KafkaStreams kafkaStreams);

    /* synthetic */ void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();

    /* synthetic */ Flag com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$applicationServerConfig();

    Flag<Object> com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState();

    Flag<Duration> com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay();

    default void createAndStartKafkaStreams() {
        if (!BoxesRunTime.unboxToBoolean(com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState().apply())) {
            com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();
            return;
        }
        KafkaStreams kafkaStreams = new KafkaStreams(com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$topology(), configurePreRestoreProperties((Properties) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$properties().clone()), kafkaStreamsClientSupplier());
        com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$setExceptionHandler(kafkaStreams);
        kafkaStreams.start();
        startWaitForPreRestoredThread(kafkaStreams);
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [com.twitter.finatra.kafkastreams.prerestore.PreRestoreState$$anon$1] */
    private default void startWaitForPreRestoredThread(final KafkaStreams kafkaStreams) {
        new Thread(this, kafkaStreams) { // from class: com.twitter.finatra.kafkastreams.prerestore.PreRestoreState$$anon$1
            private final /* synthetic */ PreRestoreState $outer;
            private final KafkaStreams preRestoreKafkaStreams$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$waitForPreRestoreFinished(this.preRestoreKafkaStreams$1);
                    this.$outer.info(() -> {
                        return "Closing pre-restoring server";
                    });
                    this.preRestoreKafkaStreams$1.close(1L, TimeUnit.MINUTES);
                    this.$outer.info(() -> {
                        return "Pre-restore complete.";
                    });
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$resetStreamThreadId();
                    this.$outer.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$createAndStartKafkaStreams();
                } catch (Throwable th) {
                    Option unapply = NonFatal$.MODULE$.unapply(th);
                    if (unapply.isEmpty()) {
                        throw th;
                    }
                    this.$outer.error(() -> {
                        return "PreRestore error";
                    }, (Throwable) unapply.get());
                    this.$outer.close(this.$outer.defaultCloseGracePeriod());
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                }
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("wait-for-pre-restoring-server-thread");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.preRestoreKafkaStreams$1 = kafkaStreams;
            }
        }.start();
    }

    default void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$waitForPreRestoreFinished(KafkaStreams kafkaStreams) {
        ((Logging) this).info(() -> {
            return new StringBuilder(83).append("Waiting for Total Restore Lag to be less than 1000 after an initial wait period of ").append(this.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay().apply()).toString();
        });
        long currentTimeMillis = DateTimeUtils.currentTimeMillis() + ((TimeLike) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay().apply()).inMillis();
        double d = Double.MAX_VALUE;
        while (true) {
            if (d < 10000 && DateTimeUtils.currentTimeMillis() >= currentTimeMillis) {
                return;
            }
            d = findTotalRestoreLag(kafkaStreams);
            Thread.sleep(1000L);
        }
    }

    private default double findTotalRestoreLag(KafkaStreams kafkaStreams) {
        double unboxToDouble = BoxesRunTime.unboxToDouble(((TraversableOnce) findRestoreConsumerLagMetrics(kafkaStreams).map(metric -> {
            return BoxesRunTime.boxToDouble($anonfun$findTotalRestoreLag$1(metric));
        }, Seq$.MODULE$.canBuildFrom())).sum(Numeric$DoubleIsFractional$.MODULE$));
        ((Logging) this).info(() -> {
            return new StringBuilder(19).append("Total Restore Lag: ").append(unboxToDouble).toString();
        });
        return unboxToDouble;
    }

    private default Properties configurePreRestoreProperties(Properties properties) {
        properties.put("application.server", new StringBuilder(1).append(Utils.getHost((String) com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$super$applicationServerConfig().apply())).append(":").append(StaticPartitioning$.MODULE$.PreRestoreSignalingPort()).toString());
        properties.put("poll.ms", "0");
        return properties;
    }

    default void com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$resetStreamThreadId() {
        try {
            Field declaredField = StreamThread.class.getDeclaredField("STREAM_THREAD_ID_SEQUENCE");
            declaredField.setAccessible(true);
            ((AtomicInteger) declaredField.get(null)).set(1);
        } catch (Throwable th) {
            Option unapply = NonFatal$.MODULE$.unapply(th);
            if (unapply.isEmpty()) {
                throw th;
            }
            ((Logging) this).error(() -> {
                return "Error resetting stream threads";
            }, (Throwable) unapply.get());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    private default Seq<Metric> findRestoreConsumerLagMetrics(KafkaStreams kafkaStreams) {
        return (Seq) ((TraversableLike) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(getThreads(kafkaStreams))).toSeq().map(streamThread -> {
            return new Tuple2(streamThread, this.getRestoreConsumer(streamThread));
        }, Seq$.MODULE$.canBuildFrom())).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return Option$.MODULE$.option2Iterable(this.findConsumerLagMetric((Consumer) tuple2._2()).withFilter(tuple2 -> {
                return BoxesRunTime.boxToBoolean($anonfun$findRestoreConsumerLagMetrics$3(tuple2));
            }).map(tuple22 -> {
                if (tuple22 != null) {
                    return (Metric) tuple22._2();
                }
                throw new MatchError(tuple22);
            }));
        }, Seq$.MODULE$.canBuildFrom());
    }

    private default Option<Tuple2<MetricName, Metric>> findConsumerLagMetric(Consumer<byte[], byte[]> consumer) {
        return ((IterableLike) JavaConverters$.MODULE$.mapAsScalaMapConverter(consumer.metrics()).asScala()).find(tuple2 -> {
            return BoxesRunTime.boxToBoolean($anonfun$findConsumerLagMetric$1(tuple2));
        });
    }

    private default StreamThread[] getThreads(KafkaStreams kafkaStreams) {
        return (StreamThread[]) ReflectionUtils$.MODULE$.getFinalField(kafkaStreams, "threads");
    }

    private default Consumer<byte[], byte[]> getRestoreConsumer(StreamThread streamThread) {
        return (Consumer) ReflectionUtils$.MODULE$.getFinalField(streamThread, "restoreConsumer");
    }

    static /* synthetic */ double $anonfun$findTotalRestoreLag$1(Metric metric) {
        return BoxesRunTime.unboxToDouble(metric.metricValue());
    }

    static /* synthetic */ boolean $anonfun$findRestoreConsumerLagMetrics$3(Tuple2 tuple2) {
        return tuple2 != null;
    }

    static /* synthetic */ boolean $anonfun$findConsumerLagMetric$1(Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        String name = ((MetricName) tuple2._1()).name();
        return name != null ? name.equals("records-lag") : "records-lag" == 0;
    }

    static void $init$(PreRestoreState preRestoreState) {
        preRestoreState.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreState_$eq(((App) preRestoreState).flag().apply("kafka.prerestore.state", () -> {
            return true;
        }, "Pre-Restore state", Flaggable$.MODULE$.ofBoolean()));
        preRestoreState.com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$_setter_$com$twitter$finatra$kafkastreams$prerestore$PreRestoreState$$preRestoreDurationInitialDelay_$eq(((App) preRestoreState).flag().apply("kafka.prerestore.duration", () -> {
            return DurationOps$RichDuration$.MODULE$.minutes$extension(DurationOps$.MODULE$.RichDuration(2L));
        }, "Pre-Restore min delay", Flaggable$.MODULE$.ofDuration()));
    }
}
