Flink-on-Native-K8S源码解析
Flink on Native K8S源码解析
Flink on Native K8S源码解析
入口
通过启动脚本kubernetes-session.sh脚本可知flink session on k8s启动入口为:
org.apache.flink.kubernetes.cli.KubernetesSessionCli
集群创建流程
通过以下过程会创建出Flink session集群,
KubernetesSessionCli::main
KubernetesSessionCli::run
KubernetesClusterClientFactory::createClusterDescriptor
KubernetesClusterDescriptor::deploySessionCluster
KubernetesClusterDescriptor::createClusterClientProvider
ClusterClientProvider::getClusterClient 返回RestClusterClient
最终会返回一个ClusterClient实现类RestClusterClient,代表与集群交互的接口,可通过http方式获取集群信息、提交Job、查询Job等, 通过脚本 /bin/flink run –target kubernetes-session 向集群提交Job时也是通过cluster-id先获取RestClusterClient后调用submitJob(JobGraph) 提交任务,具体实现类:
StreamExecutionEnvironment::execute
StreamExecutionEnvironment::executeAsync
StreamExecutionEnvironment::getPipelineExecutor
KubernetesSessionClusterExecutor::execute
AbstractSessionClusterExecutor::execute
KubernetesClusterClientFactory::createClusterDescriptor
KubernetesClusterDescriptor::retrieve
ClusterClientProvider::getClusterClient
ClusterClient::submitJob
ClusterClient::close
创建集群关键类是KubernetesClusterDescriptor, 该类既可创建一个session集群,也能创建一个Application集群,都是通过deployClusterInternal方法实现,只是启动类不同,创建的Dispatch不同,Application集群创建启动完后,JobManager会在自身进程立刻执行application代码,且没有客户端提交JoGraph的入口;Session集群则是作为一个服务等待Client提交任务JobGraph后启动TaskManager执行。
KubernetesClusterDescriptor::deployClusterInternal主要功能就是创建一个启动JobManager的Deployment以及伴生的资源对象,比如ConfigMap等,KubernetesJobManagerFactory::buildKubernetesJobManagerSpecification具体定义了这些资源对象
KubernetesJobManagerFactory定义了一系列的AbstractKubernetesStepDecorator,Decorator根据配置文件定义Deployment和伴生对象,
AbstractKubernetesStepDecorator::decorateFlinkPod //定义Deploy
AbstractKubernetesStepDecorator::buildAccompanyingKubernetesResources //定义伴生对象
JobManager Pod初始化
InitJobManagerDecorator | 指定JobManager镜像,内存/CPU资源限制信息, 端口地址, Servicecount, Label、Log目录等信息 |
---|---|
EnvSecretsDecorator | 将用户指定的kubernetes.env.secretKeyRef信息注入环境变量 |
MountSecretsDecorator | 将用户指定的kubernetes.secrets 文件挂载到Pod目录中 |
CmdJobManagerDecorator | 定义Pod启动命令, 比如kubernetes-jobmanager.sh kubernetes-session |
InternalServiceDecorator | 创建内部service,如果rest-service是ClusterIP类型,定义service: ${cluster-id} |
ExternalServiceDecorator | 创建外部service,定义service: ${cluster-id}-rest |
HadoopConfMountDecorator | 创建cm:hadoop-config-${cluster-id}, 将Hadoop配置文件挂载到Container内/opt/hadoop/conf |
KerberosMountDecorator | 创建secret: kerberos-keytab-cluster−id,cm:kerberos−krb5conf−{cluster-id}, cm: kerberos-krb5conf-cluster−id,cm:kerberos−krb5conf−{cluster-id}, 并将这些信息挂载至Container内 |
FlinkConfMountDecorator | 创建cm:flink-config-${cluster-id}, 内容主要为flink-conf.yaml,flink本身相关的参数配置,同时将其挂载到Container中flink ConfDir下, 这样JobManager启动时就可以读取到session启动脚本所在机器上的flink配置参数 |
PodTemplateMountDecorator | 创建cm:pod-template-${cluster-id}, 内容主要为taskmanager-pod-template.yaml,taskmanager pod创建模版,如何没有特殊指定,则与jobmanager相同, 将其挂载/opt/flink/pod-template/taskmanager-pod-template.yaml下,启动taskmanager时读取 |
启动JobManager
从CmdJobManagerDecorator 看出JobManager启动脚本是kubernetes-jobmanager.sh, kubernetes-jobmanager.sh又调用flink-console.sh
kubernetes-session: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
kubernetes-application: org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
创建JobManager过程就是创建Dispacher, ResourceManager, RestServer的过程, Dispacher差异决定了JobManager启动后是否立即执行一个Job.
Session | Application | |
---|---|---|
Dispacher | SessionDispatcherLeaderProcessFactoryFactory | ApplicationDispatcherLeaderProcessFactoryFactory |
ResourceManager | KubernetesResourceManagerFactory | KubernetesResourceManagerFactory |
RestServer | SessionRestEndpointFactory | JobRestEndpointFactory |
启动TaskManager
Session和Application模式资源管理器是相同的,都是KubernetesResourceManagerDriver
KubernetesResourceManagerDriver::requestResource //在有资源请求时创建一个新的TaskManager
KubernetesResourceManagerDriver::createKubernetesTaskManagerParameters //构造TaskManager参数
KubernetesTaskManagerFactory::buildTaskManagerKubernetesPod // 构造Pod模板
Fabric8FlinkKubeClient::createTaskManagerPod //创建Pod
KubernetesTaskManagerFactory::buildTaskManagerKubernetesPod中有以下Decorator构造Pod模版:
InitTaskManagerDecorator | 指定TaskManagerManager镜像,内存/CPU资源限制信息, 端口地址, Servicecount, ENV、Label、Log目录等信息 |
---|---|
EnvSecretsDecorator | 将用户指定的kubernetes.env.secretKeyRef信息注入环境变量 |
MountSecretsDecorator | 将用户指定的kubernetes.secrets 文件挂载到Pod目录中 |
CmdTaskManagerDecorator | 定义Pod启动命令, 比如kubernetes-taskmanager.sh [args] |
HadoopConfMountDecorator | 创建cm:hadoop-config-${cluster-id}, 将Hadoop配置文件挂载到Container内/opt/hadoop/conf |
KerberosMountDecorator | 创建secret: kerberos-keytab-cluster−id,cm:kerberos−krb5conf−{cluster-id}, cm: kerberos-krb5conf-cluster−id,cm:kerberos−krb5conf−{cluster-id}, 并将这些信息挂载至Container内 |
FlinkConfMountDecorator | 创建cm:flink-config-${cluster-id}, 内容主要为flink-conf.yaml,flink本身相关的参数配置,同时将其挂载到Container中flink ConfDir下, 这样TaskManager启动时就可以读取到session启动脚本所在机器上的flink配置参数 |
从CmdTaskManagerDecorator看出TaskManager启动脚本是kubernetes-taskmanager.sh, kubernetes-taskmanager.sh又调用flink-console.sh kubernetes-taskmanager, 入口为:org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner
KubernetesTaskExecutorRunner与其他TaskManager基本没有差别
KubernetesTaskExecutorRunner::main
KubernetesTaskExecutorRunner::runTaskManagerSecurely
TaskManagerRunner::runTaskManagerProcessSecurely
TaskManagerRunner::runTaskManager
TaskManagerRunner::start
TaskManagerRunner::startTaskManagerRunnerServices
TaskManagerRunner::createTaskExecutorService
TaskManagerRunner::::startTaskManager
总结
以上flink on k8s 启动的整个流程, 核心是构造JobManager和TaskManager的Pod Template, 以Pod的方式启动JobManager和TaskManager, Flink Job的执行流程与其他资源管理方式没有差异。