Принцип и практика Flink 1.10 Native Kubernetes

Flink

Автор: Чжоу Кайбо (Бао Ню).

Это было долгожданным.Сегодня, когда Kubernetes находится в самом разгаре, сообщество Flink, наконец, предоставляет нативную поддержку Kubernetes в версии 1.10, котораяNative Kubernetes Integration. Однако это всего лишь бета-версия, и ожидается, что она будет полностью поддерживаться в версии 1.11.

Мы знаем, что в Flink 1.9 и более ранних версиях, если вы хотите запускать задачи Flink в Kubernetes, вам необходимо заранее указать необходимое количество TaskManager(TM), ЦП и памяти. Проблема в том, что в большинстве случаев вы не можете точно предсказать, сколько TM потребуется задаче перед запуском задачи. Если указано слишком много ТМ, ресурсы будут потрачены впустую, если количество указанных ТМ слишком мало, задачи не будут запланированы. Основная причина заключается в том, что задачи Flink, работающие в Kubernetes, не обращаются за ресурсами напрямую к кластеру Kubernetes.

Флинк сделан в версии 1.10Active Kubernetes IntegrationПервая фаза поддерживает кластеры сеансов. Последующий второй этап обеспечит более полную поддержку, такую ​​как поддержка отправки задач для каждого задания, высокая доступность на основе собственного API Kubernetes и поддержка дополнительных параметров Kubernetes, таких как допуск, метка и селектор узла.Active Kubernetes IntegrationсерединаActiveЭто означает, что ResourceManager (KubernetesResourceManager) Flink может напрямую взаимодействовать с Kubernetes и подавать заявки на новые поды по запросу, аналогично тому, что Flink делает с интеграцией Yarn и Mesos. В многопользовательской среде пользователи могут использовать пространство имен в Kubernetes для изоляции ресурсов, чтобы запускать разные кластеры Flink. Конечно, учетные записи пользователей и авторизации в кластере Kubernetes нужно подготовить заранее.

принцип

flink_1.10_nativek8s.png

Принцип работы следующий (порядковый номер в начале абзаца соответствует номеру, указанному стрелкой на рисунке):

  1. Клиент Flink сначала подключается к серверу API Kubernetes и отправляет файл описания ресурсов кластера Flink, включая карту конфигурации, службу диспетчера заданий, развертывание диспетчера заданий иOwner Reference.

  2. Мастер Kubernetes создаст соответствующие объекты Kubernetes на основе этих файлов описания ресурсов. Возьмем в качестве примера развертывание диспетчера заданий, о котором мы заботимся больше всего. После того, как узел в кластере Kubernetes получит запрос, процесс Kubelet загрузит образ Flink из центрального репозитория, подготовит и смонтирует том, а затем выполнит запуск. команда. После запуска pod мастера flink также запускаются Dispacher и KubernetesResourceManager.

После выполнения первых двух шагов запускается весь кластер сеансов Flink, и запрос задачи может быть принят.

  1. Пользователи могут отправлять задачи в этот сеансовый кластер через командную строку Flink, то есть через клиент flink. В это время граф задания будет создан на стороне клиента flink, а затем загружен вместе с пользовательским jar-пакетом через RestClinet.

  2. Как только задание будет успешно отправлено, JobSubmitHandler отправит задание диспетчеру после получения запроса. Затем будет создан мастер задания.

  3. JobMaster запрашивает слоты у KubernetesResourceManager.

  4. KubernetesResourceManager назначает диспетчеры задач из кластеров Kubernetes. Каждый TaskManager — это Pod с уникальным представлением. KubernetesResourceManager создаст новый файл конфигурации для TaskManager с именем службы Flink Master в качестве адреса. Таким образом, после аварийного переключения Flink Master TaskManager все еще можно будет повторно подключить.

  5. После того, как кластер Kubernetes выделит новый Pod, запустите на нем TaskManager.

  6. TaskManager регистрируется в SlotManager после его запуска.

  7. SlotManager запрашивает слоты у TaskManager.

  8. TaskManager предоставляет слоты для JobMaster. Затем задача будет назначена для запуска в этом слоте.

упражняться

ФлинкаДокументацияВыше более подробно описано, как его использовать, но в начале всегда будут какие-то ямы. Если вы новичок в Kubernetes, это может занять некоторое время.

(1) Во-первых, должен быть кластер Kubernetes, будет~/.kube/configдокумент. Попробуйте выполнить kubectl get nodes, чтобы убедиться, что кластер в порядке.

без этого~/.kube/configфайл, сообщит об ошибке:

2020-02-17 22:27:17,253 WARN  io.fabric8.kubernetes.client.Config                           - Error reading service account token from: [/var/run/secrets/kubernetes.io/serviceaccount/token]. Ignoring.
2020-02-17 22:27:17,437 ERROR org.apache.flink.kubernetes.cli.KubernetesSessionCli          - Error while running the Flink session.
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Service]  with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c]  in namespace: [default]  failed.
	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
	at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:231)
	at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:164)
	at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getService(Fabric8FlinkKubeClient.java:334)
	at org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient.getInternalService(Fabric8FlinkKubeClient.java:246)
	at org.apache.flink.kubernetes.cli.KubernetesSessionCli.run(KubernetesSessionCli.java:104)
	at org.apache.flink.kubernetes.cli.KubernetesSessionCli.lambda$main$0(KubernetesSessionCli.java:185)
	at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
	at org.apache.flink.kubernetes.cli.KubernetesSessionCli.main(KubernetesSessionCli.java:185)
Caused by: java.net.UnknownHostException: kubernetes.default.svc: nodename nor servname provided, or not known

(2) Создайте пользователей и полномочия заранее (RBAC)

kubectl create serviceaccount flink
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=default:flink

Если пользователь не создан и для отправки используется пользователь по умолчанию, будет сообщено об ошибке:

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: https://10.10.0.1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes. 

Message: Forbidden!Configured service account doesn't have access. 
Service account may have been revoked. pods is forbidden: 
User "system:serviceaccount:default:default" cannot list resource "pods" in API group "" in the namespace "default".

(3) Этот шаг является необязательным. По умолчанию JobManager и TaskManager будут записывать журналы только в /opt/flink/log соответствующих модулей. Если вы хотите посмотреть лог через логи kubectl, вам нужно вывести лог на консоль. Чтобы сделать следующее, измените файл log4j.properties в каталоге FLINK_HOME/conf.

log4j.rootLogger=INFO, file, console

# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

Затем в командную строку для запуска сессионного кластера нужно вывести параметры:

-Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%"

(4) Наконец, вы можете запустить кластер сеансов. Следующая команда запускает кластер сеансов с памятью 4G, 2 ЦП и 4 слотами на диспетчер задач.

bin/kubernetes-session.sh -Dkubernetes.container-start-command-template="%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%" -Dkubernetes.cluster-id=kaibo-test -Dtaskmanager.memory.process.size=4096m -Dkubernetes.taskmanager.cpu=2 -Dtaskmanager.numberOfTaskSlots=4

Дополнительные параметры см. в документации:this.apache.org/projects/legal…

использоватьkubectl logs kaibo-test-6f7dffcbcf-c2p7g -fВы можете посмотреть журнал.

Если появляется большое количество таких журналов (которые в настоящее время обнаруживаются при обнаружении живучести LoadBalance поставщика облачных служб):

2020-02-17 14:58:56,323 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Unhandled exception
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:377)
	at org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)

Вы можете временно настроить его в log4j.properties:

log4j.logger.org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint=ERROR, file

Слишком большое количество журналов приведет к тому, что журнал менеджера по работе в веб-интерфейсе будет пустым, поскольку файл слишком велик для отображения во внешнем интерфейсе.

Если не выполнить предыдущие шаги (1) и (2), будут возникать различные исключения, а логи можно будет легко посмотреть через логи kubectl.

После запуска сессионного кластера можно проверить нормально ли он через kubectl get pods, svc.

Для просмотра веб-интерфейса с переадресацией портов:

kubectl port-forward service/kaibo-test 8081

Открытымhttp://127.0.0.1:8001Вы можете увидеть WebUI Flink.

(5) Отправить задачу

./bin/flink run -d -e kubernetes-session -Dkubernetes.cluster-id=kaibo-test examples/streaming/TopSpeedWindowing.jar

На странице веб-интерфейса Flink мы видим, что в начале запуска пользовательский интерфейс показывает, что общее количество/доступных слотов задач равно 0, а диспетчеры задач также равны 0. Ресурсы динамически увеличиваются по мере отправки задач. После остановки задачи ресурс освобождается.

После отправки задачи вы можете увидеть, что Flink назначил новый модуль диспетчеру задач через kubectl get pods.

pods.png

(6) Остановить кластер сеансов

echo 'stop' | ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=kaibo-test -Dexecution.attached=true

Вы также можете удалить ресурсы вручную:

kubectl delete service/<ClusterID>

Суммировать

Как видите, Flink 1.10 предпринял хорошую попытку интеграции с Kubernetes. С нетерпением ждем следующей версии сообщества 1.11, которая обеспечит поддержку для каждого задания и глубокую интеграцию с Kubernetes, например, высокую доступность на основе собственного API Kubernetes. Пожалуйста, обратите внимание на последний прогрессFLINK-14460.