Kubernetes is a container orchestration system.
A basic Flink cluster deployment in Kubernetes has three components:
Using the resource definitions found below, launch the cluster with the
kubectl create -f jobmanager-deployment.yaml kubectl create -f taskmanager-deployment.yaml kubectl create -f jobmanager-service.yaml
You can then access the Flink UI via
kubectl proxyin a terminal
kubectl to delete the cluster:
kubectl delete -f jobmanager-deployment.yaml kubectl delete -f jobmanager-service.yaml kubectl delete -f taskmanager-deployment.yaml
kubectl auth can-i <list|create|edit|delete> pods
Use the following command to build a image with user jar
Use the following command to start a session
This command will show you the following overview:
Example: Issue the following command to allocate 4 Task Managers, with 8GB of memory and 32 processing slots each:
Blob Server and Task Manager are required to use nonrandom RPC ports. They can be configured with the following config options, either in flink-conf.yaml or as
-D flags at starting the session.
Once Flink is deployed in your kubernetes cluster, it will show you the connection details of the Job Manager.
In detached mode, the Flink client will exit after submitting the the service to the kubernetes cluster. If you want to stop the Kubernetes session, please use the Kubernetes utilities(
kubectl delete service <ServiceName>). You can also start another client and attach to the session to stop it.
There are several ways to expose a Service onto an external (outside of your cluster) IP address. This could be changed by
<NodeIP>:<NodePort>could be used to contact the Job Manager Service.
kubectl get services/<ServiceName>to get EXTERNAL-IP for ServiceAddress argument.
Navigate to publishing services in Kubernetes to get more information.
Use the following command to submit job
-nmwhen starting a session. If you do not specify a certain name, Flink client will generate a UUID for you session cluster.
EXTERNAL-IPbased on exposed type.
Use the following command to attach to a session.
The documentation above describes how to start a Flink cluster within a Kubernetes environment. It is also possible to launch a new Flink cluster for executing each individual job with better isolation.
The command line options of the Kubernetes session are also available with the ./bin/flink tool. They are prefixed with a k or kubernetes (for the long argument options).
Note: In attach mode, the argument
-kn (number of TaskManagers) is required and
kubernetes.service.exposed.type must be either
Note: You can use a different configuration directory per job by setting the environment variable FLINK_CONF_DIR. To use this copy the conf directory from the Flink distribution and modify, for example, the logging settings on a per-job basis.
Note: It is also possible to “fire and forget” a Flink job to the Kubernetes cluster in detached mode. Use -m to specify the kubernetes-cluster and -d for detached mode. The
-kn argument will not take effect and resource is allocated as demand. Also in this case, your application will not get any accumulator results or exceptions from the ExecutionEnvironment.execute() call!
Note: If you want to accessing the Job Manager UI or get the logs, set
kubernetes.destroy-perjob-cluster.after-job-finished=false and the Flink cluster will not be destroyed after finished.
Users could use the following command to retrieve logs of Job Manager and Task Manager.
Namespaces in Kubernetes are a way to divide cluster resources between multiple users (via resource quota). It is similar to queue concept in Yarn cluster. Flink on Kubernetes can use namespaces to launch Flink clusters. The namespace could be specified by
-ns argument when starting a Flink cluster.
ResourceQuota provides constraints that limit aggregate resource consumption per namespace. It can limit the quantity of objects that can be created in a namespace by type, as well as the total amount of compute resources that may be consumed by resources in that project.
Role-based access control (RBAC) is a method of regulating access to compute or network resources based on the roles of individual users within an enterprise. So users can configure RBAC roles and service accounts used by Flink JobManager to access the Kubernetes API server within the Kubernetes cluster.
Every namespace will have a default service account. However, the
default service account may not have the permission to create or delete pods within the Kubernetes cluster. So users may need to specify another service account that has the right role binded. The configuration option
kubernetes.jobmanager.service-account could be used to set the service account.
Use the following command to make the JobManager pod use the
flink service account to create and delete TaskManager pods.
flink service account does not exist, use the following command to create a new one and set the role binding.
Navigate to RBAC Authorization for more information.
An early version of a Flink Helm chart is available on GitHub.
This section briefly describes how Flink and Kubernetes interact.
When starting a Kubernetes session, Flink client will first (step 1) contact to Kubernetes ApiServer to submit the cluster description, including ConfigMap spec, Job Manager Service spec, Job Manager Replica Controller spec and Owner Reference.
The next step (step 2), Kubernetes Master create the required components. The Kubelet will pull the image, prepare and mount the volume and then execute the start command.
Once Flink JobManager pod is launched, the ResourceManager will allocate (step 3) the specified number of Task Managers. The JobManager will generate a new configuration for the TaskManagers, with the address of Job Manager set to ServiceName. This allows the TaskManagers to connect back to the JobManager after failover).
After all TaskManagers are launched and registered to ResourceManager and JobManager, the session is ready to accept jobs. Back to top