Эта статья приняла участие в мероприятии «Церемония создания новичков», чтобы вместе начать путь создания золота.
Почему-то в последнее время я скатился вниз. . .
1 Начало истории
В этот момент я взглянул на парня, сидевшего напротив: клетчатая рубашка, среднего телосложения, очки в черной оправе под чуть высокой переносицей, в чуть прищуренных глазах читался след усталости, он не моргал. смотри на меня.
Я все шептала в душе: какая же я хорошенькая? Это всего лишь инструмент, который вы используете для обмена на хлеб и машину, мистер А. Хотя он со мной уже пять лет, он такой~
Кстати говоря, я забыл представиться. Меня зовут Флинк, и, конечно, я предпочитаю, чтобы вы называли меня полным именем:Apache Flink
, потому что это звучит очень технично. В настоящее время я являюсь одним из самых популярных вычислительных движков для работы с большими данными в реальном времени.
Я осмеливаюсь сказать это, потому что в настоящее время实时领域
Он действительно занимает лидирующие позиции, если не верите, то посмотрите следующую статистику:
Здесь нужен @мой старший брат:Apache Spark
, я слышал, что он появился один раз"Flink的出现,Spark是否慢慢成为鸡肋
«Мы не смеем говорить или спрашивать, но мы по-прежнему уважаем и рационализируем наших старших.
"Кашель"~ Легкий кашель вернул меня к реальности, и Мистер А снова начал отлаживать код~
2 Я начинаю чувствовать стресс
На самом деле, на прошлой неделе я снова встретил мистера А. Я слышал, что он был у моего хорошего друга:Kafka
Останавливались там на неделю, как будто готовясь сделать что-то большое.
Только когда он нашел меня, компания была готова строить实时数仓
. Мне нужно присоединиться к моим братьям Кафка, чтобы обрабатывать миллиарды данных в реальном времени.
за实时数仓
Я, наверное, понимаю. Глядя на архитектурный план, предложенный начальником господина А, я втайне обрадовался: это моя профессиональная сфера.
Общая структура не сложна и проста для понимания.
- Программа получает исходные данные в режиме реального времени и помещает слой kafka ods на хранение
- Выполните расчет обработки в реальном времени слоя ods->dwd->dws и запишите результат в kafka.
- Добавьте автономный процесс обработки в качестве резервной копии
Я взглянул на жаждущих попробоватьKafka
Братцы, мы кивнули друг другу, начнем~
Как старые партнеры, я иKafka
Братья очень хорошо сотрудничали, а мистер А тоже ветеран, так что мы успешно выполнили начальную задачу всего за одну неделю.
Я могу показать вам некоторые из наших совместных результатов:
- src.main.scala.com.xxproject.xx
|--handler
|---FlinkODSHandler.scala
|---FlinkDWHandler.scala
|---FlinkADSHandler.scala
...
|--service
|---KafkaSchdulerService.scala
|---SchdulerService.scala
...
|--config/util/model
|---KafkaUtils.scala
|---XXDataModel.scala
...
Весенний ветерок гордится подковообразной болезнью~ У меня сейчас очень комфортное настроение, и мы втроем просто идеальные партнеры. .
Но это длилось недолго. Ко второй неделе я постепенно начал замедляться~
Конкретная производительность:
- Сначала работа идет нормально, а позже появляется большое количество Задач.
等待
- Небольшое количество задач Задачи начинает отчетность
checkpoint
проблема тайм-аута- Данные Kafka накапливаются и не могут быть использованы
Я немного запаниковал, посмотрел на свою ситуацию и был шокирован:
Независимо от того, является ли это вводом или выводом, буферная память заполнена. Данные не могут быть обработаны,barrier
очень медленное течение, большоеcheckpoint
Время генерации становится больше.
я случился背压
проблема! ! !
3 Мой механизм обратного давления
После периода молчаливой самонастройки проблема все еще не решена.
В то же время вокруг меня продолжали звенеть тревоги,内存
Частые аварийные ситуации. В мгновение ока моя страница выполнения задачи стала краснойHigh
Логотип заполнен~
Ни в коем случае, в конце концов, я все же сделал предупреждение мистеру А~
Когда г-н А. получил эту новость, он некоторое время смотрел на меня и вздыхал. Я чувствую себя немного смущенным и чувствую, что облажался. .
Он не стал много говорить, просто спросил о моем反压机制
, сказал, чтобы решить проблему из источника.
Ниже приводится разговор между мистером А и мной.
1) Каковы общие условия противодавления?
Согласно моему прошлому опыту, обратное давление обычно возникает, когда скорость обработки нисходящих данных не соответствует скорости генерации восходящих данных.
Можно разделить два случая:
-
当前Task
Скорость обработки задачи низкая.Например, сложная логика, такая как обработка алгоритма вызова в задаче задачи, приводит к недостаточному выделению памяти для восходящего потока. -
下游Task
Скорость обработки задачи низкая, например, несколько выходных данных collect() в нисходящий поток, поэтому текущий узел не может запросить достаточно памяти.
2) Каковы последствия частого противодавления?
Частая обратная нагрузка увеличит задержку данных при выполнении заданий потоковой обработки, а также повлияет наCheckpoint
.
нужно сделать контрольную точкуBarrier
Выравнивание, если в это время появляется Задача反压
, скорость потока Барьера уменьшится, что приведет к замедлению работы контрольной точки или даже к тайм-ауту, а также замедлится выполнение всей задачи.
Длительное или частое противодавление требует лечения, если оно вызвано
网络波动
илиGC
Иногда возникающее обратное давление может не нуждаться в устранении.
3) Как вы обнаружили обратное давление?
В моем веб-интерфейсе я переверну задачу с приемника на источник. посмотреть один за другимBackPressure
Для получения подробной информации найдите первую Задачу с противодавлением.
Это нормальная ситуация~
Мой внутренний принцип обнаружения
Интерфейс BackPressure периодически采样
Информация о стеке потоков задач, буфер запросов памяти потоков статистики阻塞频率
, чтобы определить, находится ли узел в состоянии противодавления.
- По умолчанию частота меньше
0.1
показать нормально -
(0.1,0.5)
НИЗКИЙ, противодавление незначительное - Превосходить
0.5
для ВЫСОКОГО,需要注意反压
В это время я указал на г-на А, чтобы увидеть текущий проектBackPressure
страница, что явно является ненормальным состоянием.
4) Каков принцип работы механизма противодавления?
Какой-то джентльмен сделал паузу в горле, напоминая мне быть здесь осторожнее. Я разобрался с мыслями и решил начать с ограничения тока:
- поток данных
Весь процесс можно сравнить с生产者->消费者
система. Восходящий производитель отправляет данные (2M/s
) в буфер отправки через передачу по сети (5M/s
) в буфер приема, и, наконец, нижестоящий потребитель потребляет (<1M/s
).
Очевидно, что это невозможно, скорость нисходящего потока ниже, чем скорость восходящего потока, данные久积成疾
~ Необходимо сделать ограничение тока.
- Ограничение
Это легко понять. Поскольку восходящая обработка выполняется быстрее, я добавляю механизм ограничения тока, чтобы замедлить ее, чтобы скорости восходящей и нисходящей передачи были в основном одинаковыми. Разве это не решит проблему? .
Не совсем так, есть несколько вопросов:
- Я не могу заранее предсказать фактическую скорость нисходящего потока (насколько установлен лимит потока)
- Часто встречаются колебания сети, а скорость потока вверх и вниз по течению неравномерна.
动态变化
из
Помня об этих причинах, шахта внутри обеспечивает мощный механизм обратного давления:
Динамическая обратная связь восходящего и нисходящего потока: если скорость нисходящего потока низкая, скорость восходящего потока будет ограничена, в противном случае скорость восходящего потока будет увеличена. Реализуйте эффект динамического автоматического обратного давления.
- механизм противодавления
Верхнее течение проходит самостоятельноNetwork Buffer
слой, а затем перенести вниз наChannel Buffer
слой (канал Netty). наконец прошло网络
Передача, послойная передача в нисходящий поток.
Сетевой буфер, буфер канала и буфер сокета обычно понимаются как
用户态
и内核态
Разница заключается в разных пространствах подкачки и операционных системах.
Для принципов режима ядра и пользовательского режима заинтересованные друзья могут добавить личный WeChat:youlong525
Давайте обсудим~
- Принцип действия механизма противодавления
Некоторые приготовления были сделаны ранее, здесь я резюмирую процесс работы моего механизма обратного давления для мистера А:
- каждый
TaskManager
доля обслуживанияNetwork BufferPool
(разделяемый пул памяти задач), при инициализации вOff-heap Memory
запрашивать память. - Каждая задача создает свою
Local BufferPool
(Задание локального пула памяти) и обмен памятью с Network BufferPool. - вверх по течению
Record Writer
Применить к Local BufferPool для буфера (памяти) для записи данных. Если локальному буферному пулу не хватает памяти,Network BufferPool
Применить и вернуть примененную память после использованияPool
. -
Netty Buffer
скопируйте буфер и пройдитеSocket Buffer
Он отправляется в сеть, и последующий нисходящий конец обрабатывает его по аналогичному механизму. - Когда нижестоящее приложение для буфера дает сбой, оно указывает текущий узел
内存
Если недостаточно, отправляйте слой за слоем反压信号
В восходящий поток медленно прекращает отправку данных, пока нисходящий поток не возобновит их снова.
Итак, мой механизм обратного давления аналогичен тому, что используется в Java.阻塞队列
, как показано на рисунке ниже для принципа работы моего обратного давления на уровне памяти.
Задачи задачи передаются сLocal BufferPool
иNetwork BufferPool
Совместное применение и освобождение памяти, в то время как использование памяти нисходящим потоком передается восходящему потоку в режиме реального времени для достижения динамического обратного давления.
Выслушав мой ответ, мистер А задумался~
4 Я хочу распаковать
На самом деле, я тоже был очень смущен. Я очень уверен в своем механизме противодавления. Могут ли быть другие причины, влияющие на управление противодавлением?
В это время мистер А сбоку открыл мой веб-интерфейс и пробормотал несколько слов:数据倾斜
и并发
.
4.1 Первая попытка
Я сразу понял это и повернулся, чтобы посмотреть на экран.
Я проверил ситуацию с каждой подзадачей отдельно и обнаружил, что соответствующее значение размера состояния в контрольной точке имеет отдельные исключения, которые на самом деле достигли10G
Правый и левый размер! !
Посмотрите на другие значения в разделе (как показано на рисунке):
Данные искажены~
У меня есть нижняя линия в моем сердце, и я сразу же узнал об этих особых вещах с мистером А.Key
, выполните предварительную агрегацию и разделение данных и запустите снова.
Я чувствую, что есть небольшой эффект, но все же есть довольно много высоких пиков. .
4.2 Вторая попытка
В этот момент снова тупик.
Ни в коем случае, я немного увеличил собственную память. Подумав, я снова увеличил оператора.并发度
, в конце концов, увеличение количества потоков всегда снижает вычислительную нагрузку.
После невольной настройки параметров результат все равно не сильно улучшился.
4.3 Третья попытка
Мистер А начал реорганизовывать мой общий процесс расчета, а затем изменил параметр.
Я посмотрел его и изменил его并发度
. Я так не думаю, я только что попробовал это. .
Что-то кажется неправильным. .
Вот результат, который я хочу! ! Я не мог не закричать.
Он улыбнулся и сказал мне, что это для меня.算子链
механизм.
цепочка операторов
Установив одинаковую степень параллелизма для нижестоящих и вышестоящих операторов, цепочки операторов могут формироваться автоматически.
Преимущества этого:
- Эффективно сократить переключение между потоками и накладные расходы кэша данных
- Увеличьте пропускную способность и уменьшите задержку
Во всем процессе формируется несколько цепочек операторов, что снижает накладные расходы потоков и использование памяти. Мое обратное давление, естественно, уменьшилось.
Я не могу не быть в шоке~~
5 Увидеть все цветы Чанъань за один день
В конце концов, с помощью мистера А, моя скорость вернулась. несколько дней高压
Дни полностью закончились, и момент стал шелковистым~
Я медленно выдохнул и с небольшим облегчением посмотрел на конечный результат:
Бессознательно он посмотрел на мистера А и тоже показал давно потерянную улыбку.
Я Флинк, сейчас нет никакого давления~
Эта статья закончилась.
》》》Чтобы получить больше хороших статей, пожалуйста, обратите внимание на мой публичный аккаунт: Арсенал больших данных