Presto-MultilevelSplitQueue Обсуждение

Большие данные

Общее обсуждение планирования выполнения

Сценарии проблем планирования

Для механизма запросов важным вопросом является решение проблемы планирования вычислительных задач. С какими проблемами мы столкнемся?

  1. Чтобы привести простой пример, например, у меня сейчас 10 запросов, задача с номером [0] занимает 10 с, а задача с номером [1]-[9] занимает 1 с, если lantency всех задач выполняется по порядку, lantency не будет будет высоким?
    • Простое решение состоит в том, чтобы сначала выполнить задачи [1]-[9] и, наконец, выполнить задачу [0].
  2. Но для системы онлайн-запросов, если она еще входит в процесс выполнения. Время выполнения всех задач 1s, значит ли это, что задача номер [0] никогда не может быть выполнена?
    • Поэтому нам нужна относительно честная система, и не может быть сценария, когда какие-то задачи вечно голодны.
  3. Задачи могут быть связаны друг с другом, например, задачи [0]-[5] должны быть выполнены до выполнения задачи [6].
    • Существует определенная корреляция между задачами, и может потребоваться единая стратегия планирования.
  4. Может быть, результат планирования в распределенной системе неуправляем? Например, запросы к зависимым службам блокируются. Как адаптироваться, когда фактическое время и ожидаемое время сильно отличаются?
    • Рассмотрите неконтролируемые распределенные среды и экстремальные сценарии.

Общие принципы и стратегии

Моделирование таких задач на самом деле во многих местах похоже Идея presto, вероятно, такова:

  1. Выделите квант времени задачам выполнения и поочередно разделите кванты времени между задачами, чтобы свести к минимуму задержки и избежать голодания.
  2. Presto имеет встроенную очередь с многоуровневым приоритетом, которая уравновешивает соотношение конкуренции между различными продолжительностями, накапливая время выполнения задач, и постоянно обновляет очередь, накапливая время выполнения, так что задачи одинаковой стоимости могут выполняться только с задачи одного уровня как можно больше соревнуются.
  3. Ссылаться наpresto-conceptЗависимости между задачами решаются путем создания DAG, который здесь не будет обсуждаться.В рамках одной задачи несколько разбиений часто связаны в определенной степени.Presto использует TaskPriorityTracker для сбора времени выполнения типов и управления приоритетным планированием унифицированным образом. Стратегия.

Абстракция модели для presto

Presto создает следующие классы для решения этой задачи планирования.

  1. TaskExecutor: существует только один в одном экземпляре, который принимает внешние запросы на планирование задач и управление состоянием задач, содержит ресурсы рабочего потока и поддерживает некоторую необходимую статистическую информацию. MultilevelSplitQueue поддерживается внутри, что поддерживает разделение, ожидающее планирования.
  2. MultilevelSplitQueue: встроенная иерархическая очередь приоритетов, которая поддерживает время планирования и статистику ожидания оценки приоритета в соответствии с уровнем
  3. PrioritizedSplitRunner: чтобы реализовать возможность разделения планирования аналогично операционной системе, presto абстрагирует интерфейс SplitRunner.При планировании он каждый раз будет планировать процессор этого интерфейса только для одного временного интервала, а затем повторно найдет подходящий задание на Выполнение следующего шарда. Основное различие между PrioritizedSplitRunner и SplitRunner заключается в том, что есть встроенный объект Priority, а Priority имеет встроенный текущий уровень и счет внутри уровня.
  4. TaskHandler: Смысл этого типа в том, чтобы пометить группу сплитов.Этот тип сплитов может принадлежать одной задаче, и у них одинаковая логика обработки.В то же время они будут иметь некоторые внешние ограничения при планировании, такие как требуется их совокупное время. Запланируйте вместе, параллелизм разделения может иметь определенные требования.

Общий процесс выполнения примерно следующий.

image.png

Реализация многоуровневой приоритетной очереди

Переменные-члены

Сначала подробно опишите внутреннюю реализацию MultilevelSplitQueue, начиная с основных переменных-членов.

private final List<PriorityQueue<PrioritizedSplitRunner>> levelWaitingSplits;
private final AtomicLong[] levelScheduledTime = new AtomicLong[LEVEL_THRESHOLD_SECONDS.length];
private final AtomicLong[] levelMinPriority;
private final List<CounterStat> selectedLevelCounters;
  • levelWaitingSplits: двумерный массив очередей с приоритетом размера 5.
  • levelSchedueTime: накопитель времени планирования с размером 5, указывающим используемое время планирования текущего уровня. PrioritizedSplitRunner увеличит время планирования текущего уровня после выполнения процесса задачи. Эта часть данных в основном используется для расчета того, какой уровень очереди следует получить при опросе данных.
  • levelTimeMultiplier: это поле используется для установки распределения процессорного времени между различными уровнями.
  • levelMinPriority — это оценка приоритета размером 5, и этот параметр — единственный параметр конструктора, который можно указать. Минимальная оценка приоритета текущего уровня названия таблицы, и оценка приоритета текущего уровня будут обновляться каждый раз при успешном выносе данных из команды.Когда использовать это будет рассмотрено позже.Как PriorityTracker управляет приоритетом группы шпагатов Обсудите эту часть еще раз.

Реализация функции очереди

Два самых основных метода для приоритетных очередей — это предлагать и принимать.

  1. Для сценария предложения это относительно просто, MultilevelSplitQueue примет Runner с приоритетом и поместит очередь в очередь указанного уровня.
  2. Для сценария take все будет немного сложнее, во-первых, у нас 5 очередей, как определить, какой уровень очереди наиболее подходит для планирования?

Presto делает это, количество уровней фиксируется на 5: Во-первых, предполагается, что ожидаемое распределение процессорного времени на разных уровнях определено, и конкретная реализация выбирает использование мощности levelMinPriority для определения коэффициента распределения.Например, когда levelMinPriority=2, коэффициент распределения времени уровни 0-5 это 1:2:4:8:16:32.

Как упоминалось ранее, мы храним массив levelSchedueTime, который идентифицирует запланированное время.Простое решение состоит в том, чтобы сравнить, соответствует ли соотношение распределения запланированного времени на разных уровнях ожидаемому, а затем найти наименьшее значение, соответствующее этому соотношению. уровень очереди, который в данный момент необходимо взять и обработать.

public PrioritizedSplitRunner take()
        throws InterruptedException
{
    while (true) {
        lock.lockInterruptibly();
        try {
            PrioritizedSplitRunner result;
            while ((result = pollSplit()) == null) {
                notEmpty.await();
            }

            if (result.updateLevelPriority()) {
                offer(result);
                continue;
            }

            int selectedLevel = result.getPriority().getLevel();
            levelMinPriority[selectedLevel].set(result.getPriority().getLevelPriority());
            selectedLevelCounters.get(selectedLevel).update(1);

            return result;
        }
        finally {
            lock.unlock();
        }
    }
}

@GuardedBy("lock")
private PrioritizedSplitRunner pollSplit()
{
    long targetScheduledTime = getLevel0TargetTime();
    double worstRatio = 1;
    int selectedLevel = -1;
    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
        if (!levelWaitingSplits.get(level).isEmpty()) {
            long levelTime = levelScheduledTime[level].get();
            double ratio = levelTime == 0 ? 0 : targetScheduledTime / (1.0 * levelTime);
            if (selectedLevel == -1 || ratio > worstRatio) {
                worstRatio = ratio;
                selectedLevel = level;
            }
        }

        targetScheduledTime /= levelTimeMultiplier;
    }

    if (selectedLevel == -1) {
        return null;
    }

    PrioritizedSplitRunner result = levelWaitingSplits.get(selectedLevel).poll();
    checkState(result != null, "pollSplit cannot return null");

    return result;
}

public void offer(PrioritizedSplitRunner split)
{
    checkArgument(split != null, "split is null");

    split.setReady();
    int level = split.getPriority().getLevel();
    lock.lock();
    try {
        if (levelWaitingSplits.get(level).isEmpty()) {
            // Accesses to levelScheduledTime are not synchronized, so we have a data race
            // here - our level time math will be off. However, the staleness is bounded by
            // the fact that only running splits that complete during this computation
            // can update the level time. Therefore, this is benign.
            long level0Time = getLevel0TargetTime();
            long levelExpectedTime = (long) (level0Time / Math.pow(levelTimeMultiplier, level));
            long delta = levelExpectedTime - levelScheduledTime[level].get();
            levelScheduledTime[level].addAndGet(delta);
        }

        levelWaitingSplits.get(level).offer(split);
        notEmpty.signal();
    }
    finally {
        lock.unlock();
    }
}

Обсуждение level0TagetTime базового времени

Одна из интересных вещей здесь заключается в том, как реализация обрабатывает расчетный коэффициент.Реализация не использует напрямую константу для расчета, а вычисляет эталонное время target0, а затем использует это время bastime для каждого дубля.Выполнить расчет, и когда предложен новый уровень, он также будет установлен на предполагаемое значение, основанное на текущем рассчитанном базовом времени, умноженном на ранее заданный коэффициент, при этом предполагается, что задача была выполнена.

А почему? На самом деле я не думал об этом очень четко.Одна из возможных причин заключается в том, чтобы не добавлять слишком много запросов на один и тот же уровень [например, level0], а затем добавлять некоторые разбиения на другие уровни.Согласно нормальной логике, это может занять много времени. нет возможности запустить сплит уровня 0 внутри. Потому что время планирования, занимаемое ранее, слишком велико. Но это тоже имеет свою цену, то есть ее нужно рассчитывать каждый раз, когда вы берете.

@GuardedBy("lock")
private long getLevel0TargetTime()
{
    long level0TargetTime = levelScheduledTime[0].get();
    double currentMultiplier = levelTimeMultiplier;

    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.length; level++) {
        currentMultiplier /= levelTimeMultiplier;
        long levelTime = levelScheduledTime[level].get();
        level0TargetTime = Math.max(level0TargetTime, (long) (levelTime / currentMultiplier));
    }

    return level0TargetTime;
}

Как настроить приоритет группы сплитов.

В приведенном выше коде взятия есть такой абзац, который как бы указывает на то, что вынутые задачи могут быть возвращены в очередь? Почему это?


            if (result.updateLevelPriority()) {
                offer(result);
                continue;
            }

Возвращаясь к сценарию 3, который мы первоначально обсуждали, разбиения по группе задач на самом деле в определенной степени связаны, например, мы хотим использовать ЦП, занятый группой задач, для накопления вместо одного разбиения.

  • Во-первых, нам нужен трекер, который должен быть связан с группой расщеплений, потому что только общий объект может связать все объекты, связанные с задачей вместе. Это продолжает обновляться каждый раз, когда информация обновляется.
  • Во-вторых, просто иметь ссылку недостаточно.Одна проблема заключается в том, что разделение управляется в куче.Хотя функция сравнения реализована, расчет может запускаться только при входе и выходе из кучи, поэтому presto поддерживает каждое разделение.Приоритетный объект , а затем сохраните приоритет для группы объектов-дескрипторов. Приоритет TaskHandler фактически можно рассматривать как своего рода кэш.

Обновленная логика, вероятно, такова: Например, группа TaskHandler имеет 2 разделенных объекта (A, B).Когда разделение времени задачи A завершено, обновляются Priority A и Priority TaskHandler, который не обновляется для B, только когда B занимает из кучи При выходе обнаруживается, что Приоритет Б отличается от уровня TaskHandler, и текущий Б нужно заново заносить в кучу.

public boolean updateLevelPriority()
{
    Priority newPriority = taskHandle.getPriority();
    Priority oldPriority = priority.getAndSet(newPriority);
    return newPriority.getLevel() != oldPriority.getLevel();
}

Вопрос вот в чем, почему только изменение уровня нужно заново заносить в кучу? Мое собственное понимание, вероятно, таково. Теоретически самая строгая логика заключается в том, что все разбиения одного и того же TaskHandler пересчитывают и обновляют Cache. Конечно, это невозможно, потому что цена слишком высока. А почему уровень разный? Основная причина заключается в том, чтобы обеспечить точность статистики на уровне, потому что эта часть данных требует последующего принятия решений.После путаницы точность планирования будет сильно различаться.

Как реализовать настройку приоритета совокупного времени и разделения

Во-первых, логика реализации принятия решений проста, но на самом деле это жестко закодированный код, Presto задает порог для каждого уровня, и если накопленное время превысит этот порог, он перейдет на следующий уровень.

Вопрос в том, когда принять это решение? О том, как поддерживать Приоритет, уже говорилось ранее.

  • Во-первых, для первого разбиения на кучу уровень решения делается в соответствии с указанным Приоритетом.
  • Для объектов, которые уже находятся в очереди, Приоритет будет обновляться после выполнения временного среза, в это время в Трекере хранится накопленное время всех сплитов текущего TaskHandler, в это время уровень будет рассчитываться в соответствии с накопленным раз и снова вошли в кучу.
public Priority updatePriority(Priority oldPriority, long quantaNanos, long scheduledNanos)
{
    int oldLevel = oldPriority.getLevel();
    int newLevel = computeLevel(scheduledNanos);

    long levelContribution = Math.min(quantaNanos, LEVEL_CONTRIBUTION_CAP);

    if (oldLevel == newLevel) {
        addLevelTime(oldLevel, levelContribution);
        return new Priority(oldLevel, oldPriority.getLevelPriority() + quantaNanos);
    }

    long remainingLevelContribution = levelContribution;
    long remainingTaskTime = quantaNanos;

    // a task normally slowly accrues scheduled time in a level and then moves to the next, but
    // if the split had a particularly long quanta, accrue time to each level as if it had run
    // in that level up to the level limit.
    for (int currentLevel = oldLevel; currentLevel < newLevel; currentLevel++) {
        long timeAccruedToLevel = Math.min(SECONDS.toNanos(LEVEL_THRESHOLD_SECONDS[currentLevel + 1] - LEVEL_THRESHOLD_SECONDS[currentLevel]), remainingLevelContribution);
        addLevelTime(currentLevel, timeAccruedToLevel);
        remainingLevelContribution -= timeAccruedToLevel;
        remainingTaskTime -= timeAccruedToLevel;
    }

    addLevelTime(newLevel, remainingLevelContribution);
    long newLevelMinPriority = getLevelMinPriority(newLevel, scheduledNanos);
    return new Priority(newLevel, newLevelMinPriority + remainingTaskTime);
}

В этом коде также обсуждается очень интересный вопрос.Хотя presto делит сплит на кванты времени, это не означает, что этот квант времени строго действителен.Например, в некоторых сценариях весь раскол может быть запланирован из-за IO и других проблем. Время намного больше, чем временной отрезок, например несколько минут, в это время, если какая-то дополнительная обработка не выполняется, это может вызвать проблемы с голоданием:

  • С одной стороны, presto устанавливает верхний предел времени планирования LEVEL_CONTRIBUTION_CAP (30 с), то есть процесс можно рассматривать не более чем как 30 с.
  • С другой стороны, когда разница между новым уровнем и старым уровнем велика, время ЦП нового уровня не будет напрямую увеличиваться, но все время ЦП уровня в интервале будет заполнено в соответствии с к настройке порога.Это может максимально избежать голодания других разбиений нового уровня из-за одной задачи.