Configuration

For single-node setups Flink is ready to go out of the box and you don’t need to change the default configuration to get started.

The out of the box configuration will use your default Java installation. You can manually set the environment variable JAVA_HOME or the configuration key env.java.home in conf/flink-conf.yaml if you want to manually override the Java runtime to use.

This page lists the most common options that are typically needed to set up a well performing (distributed) installation. In addition a full list of all available configuration parameters is listed here.

All configuration is done in conf/flink-conf.yaml, which is expected to be a flat collection of YAML key value pairs with format key: value.

The system and run scripts parse the config at startup time. Changes to the configuration file require restarting the Flink JobManager and TaskManagers.

The configuration files for the TaskManagers can be different, Flink does not assume uniform machines in the cluster.

Common Options

  • env.java.home: The path to the Java installation to use (DEFAULT: system’s default Java installation, if found). Needs to be specified if the startup scripts fail to automatically resolve the java home directory. Can be specified to point to a specific java installation or version. If this option is not specified, the startup scripts also evaluate the $JAVA_HOME environment variable.

  • env.java.opts: Set custom JVM options. This value is respected by Flink’s start scripts, both JobManager and TaskManager, and Flink’s YARN client. This can be used to set different garbage collectors or to include remote debuggers into the JVMs running Flink’s services. Enclosing options in double quotes delays parameter substitution allowing access to variables from Flink’s startup scripts. Use env.java.opts.jobmanager and env.java.opts.taskmanager for JobManager or TaskManager-specific options, respectively.

  • env.java.opts.jobmanager: JobManager-specific JVM options. These are used in addition to the regular env.java.opts.

  • env.java.opts.taskmanager: TaskManager-specific JVM options. These are used in addition to the regular env.java.opts.

  • jobmanager.rpc.address: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client.

  • jobmanager.rpc.port: The port number of the JobManager (DEFAULT: 6123).

  • jobmanager.heap.mb: JVM heap size (in megabytes) for the JobManager. You may have to increase the heap size for the JobManager if you are running very large applications (with many operators), or if you are keeping a long history of them.

  • taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible. If the cluster is exclusively running Flink, the total amount of available memory per machine minus some memory for the operating system (maybe 1-2 GB) is a good value. On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value.

  • taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores). More about task slots.

  • parallelism.default: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program’s execution. Note: The default parallelism can be overwritten for an entire job by calling setParallelism(int parallelism) on the ExecutionEnvironment or by passing -p <parallelism> to the Flink Command-line frontend. It can be overwritten for single transformations by calling setParallelism(int parallelism) on an operator. See Parallel Execution for more information about parallelism.

  • fs.default-scheme: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). By default, this is set to file:/// which points to the local filesystem. This means that the local filesystem is going to be used to search for user-specified files without an explicit scheme definition. As another example, if this is set to hdfs://localhost:9000/, then a user-specified file path without explicit scheme definition, such as /user/USERNAME/in.txt, is going to be transformed into hdfs://localhost:9000/user/USERNAME/in.txt. This scheme is used ONLY if no other scheme is specified (explicitly) in the user-provided URI.

  • classloader.resolve-order: Whether Flink should use a child-first ClassLoader when loading user-code classes or a parent-first ClassLoader. Can be one of parent-first or child-first. (default: child-first)

  • classloader.parent-first-patterns.default: A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. By default, this is set to "java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback". To extend this list beyond the default it is recommended to configure classloader.parent-first-patterns.additional instead of modifying this setting directly.

  • classloader.parent-first-patterns.additional: A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This list is appended to classloader.parent-first-patterns.default.

Advanced Options

Compute

  • taskmanager.compute.numa: When enabled a TaskManager is started on each NUMA node for each worker listed in conf/slaves (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.

Managed Memory

Managed memory helps Flink to run the batch operators efficiently. It prevents OutOfMemoryExceptions because Flink knows how much memory it can use to execute operations. If Flink runs out of managed memory, it utilizes disk space. Using managed memory, some operations can be performed directly on the raw data without having to deserialize the data to convert it into Java objects. All in all, managed memory improves the robustness and speed of the system.

In session mode, Flink does not allocate any byte for its managed memory. It could be configured by taskmanager.managed.memory.size parameter. In per-job mode, the managed memory of each TaskManager will be calculated by resources of all slots which are located on this TaskManager. Navigate to TaskManager Resource to know how to set resources for each operator. If resources of all operators are not specified in per-job mode, a fraction of 0.7 of the free memory(total jvm memory minus memory used for network buffers) will be used. If desired, the managed memory may be allocated outside the JVM heap. This may improve performance in setups with large memory sizes.

  • taskmanager.managed.memory.size: The amount of memory (in megabytes) that the task manager reserves on-heap or off-heap (depending on taskmanager.memory.off-heap) for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio with respect to the size of the task manager JVM as specified by taskmanager.memory.fraction. (DEFAULT: -1)

  • taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that a task manager reserves 80% of its memory (on-heap or off-heap depending on taskmanager.memory.off-heap) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if taskmanager.memory.size is not set.

  • taskmanager.memory.off-heap: If set to true, the task manager allocates memory which is used for sorting, hash tables, and caching of intermediate results outside of the JVM heap. For setups with larger quantities of memory, this can improve the efficiency of the operations performed on the memory (DEFAULT: false).

  • taskmanager.memory.segment-size: The size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

  • taskmanager.memory.preallocate: Can be either of true or false. Specifies whether task managers should allocate all managed memory when starting up. (DEFAULT: false). When taskmanager.memory.off-heap is set to true, then it is advised that this configuration is also set to true. If this configuration is set to false cleaning up of the allocated offheap memory happens only when the configured JVM parameter MaxDirectMemorySize is reached by triggering a full GC. Note: For streaming setups, we highly recommend to set this value to false as the core state backends currently do not use the managed memory.

Memory and Performance Debugging

These options are useful for debugging a Flink application for memory and garbage collection related issues, such as performance and out-of-memory process kills or exceptions.

  • taskmanager.debug.memory.startLogThread: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.

  • taskmanager.debug.memory.logIntervalMs: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if taskmanager.debug.memory.startLogThread is set to true.

Kerberos-based Security

Flink supports Kerberos authentication for the following services:

  • Hadoop Components, such as HDFS, YARN, or HBase (version 2.6.1 and above; all other versions have critical bugs which might fail the Flink job unexpectedly).
  • Kafka Connectors (version 0.9+ and above).
  • Zookeeper

Configuring Flink for Kerberos security involves three aspects, explained separately in the following sub-sections.

1. Providing the cluster with a Kerberos credential (i.e. a keytab or a ticket via kinit)

To provide the cluster with a Kerberos credential, Flink supports using a Kerberos keytab file or ticket caches managed by kinit.

  • security.kerberos.login.use-ticket-cache: Indicates whether to read from your Kerberos ticket cache (default: true).

  • security.kerberos.login.keytab: Absolute path to a Kerberos keytab file that contains the user credentials.

  • security.kerberos.login.principal: Kerberos principal name associated with the keytab.

If both security.kerberos.login.keytab and security.kerberos.login.principal have values provided, keytabs will be used for authentication. It is preferable to use keytabs for long-running jobs, to avoid ticket expiration issues. If you prefer to use the ticket cache, talk to your administrator about increasing the Hadoop delegation token lifetime.

Note that authentication using ticket caches is only supported when deploying Flink as a standalone cluster or on YARN.

2. Making the Kerberos credential available to components and connectors as needed

For Hadoop components, Flink will automatically detect if the configured Kerberos credentials should be used when connecting to HDFS, HBase, and other Hadoop components depending on whether Hadoop security is enabled (in core-site.xml).

For any connector or component that uses a JAAS configuration file, make the Kerberos credentials available to them by configuring JAAS login contexts for each one respectively, using the following configuration:

  • security.kerberos.login.contexts: A comma-separated list of login contexts to provide the Kerberos credentials to (for example, Client,KafkaClient to use the credentials for ZooKeeper authentication and for Kafka authentication).

This allows enabling Kerberos authentication for different connectors or components independently. For example, you can enable Hadoop security without necessitating the use of Kerberos for ZooKeeper, or vice versa.

You may also provide a static JAAS configuration file using the mechanisms described in the Java SE Documentation, whose entries will override those produced by the above configuration option.

3. Configuring the component and/or connector to use Kerberos authentication

Finally, be sure to configure the connector within your Flink program or component as necessary to use Kerberos authentication.

Below is a list of currently first-class supported connectors or components by Flink for Kerberos authentication:

  • Kafka: see here for details on configuring the Kafka connector to use Kerberos authentication.

  • Zookeeper (for HA): see here for details on Zookeeper security configuration to work with the Kerberos-based security configurations mentioned here.

For more information on how Flink security internally setups Kerberos authentication, please see here.

Other

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.log.path: The config parameter defining the taskmanager log file location

  • jobmanager.web.address: Address of the JobManager’s web interface (DEFAULT: anyLocalAddress()).

  • jobmanager.web.port: Port of the JobManager’s web interface (DEFAULT: 8081).

  • jobmanager.web.tmpdir: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used.

  • jobmanager.web.upload.dir: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by jobmanager.web.tmpdir.

  • fs.overwrite-files: Specifies whether file output writers should overwrite existing files by default. Set to true to overwrite by default, false otherwise. (DEFAULT: false)

  • fs.output.always-create-directory: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to true, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to false, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

  • state.backend: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:
    • jobmanager: In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    • filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
    • rocksdb: State in rocksDB on the TaskManagers, and state snapshots are stored in a file system. Should be used for large state.
  • state.checkpoints.dir: The target directory for meta data of externalized checkpoints.

  • state.checkpoints.num-retained: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)

  • high-availability.zookeeper.storageDir: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named recovery.zookeeper.storageDir.

  • blob.storage.directory: Directory for storing blobs (such as user JARs) on the TaskManagers. If not set or empty, Flink will fall back to taskmanager.tmp.dirs and select one temp directory at random.

  • blob.service.cleanup.interval: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour). Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and let the periodic cleanup task (executed every blob.service.cleanup.interval seconds) remove them after this TTL has passed. We do the same for transient blob files at both server and caches but immediately after accessing them, i.e. an put or get operation. This means that a blob will be retained at most 2 * blob.service.cleanup.interval seconds after not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs, this means that a recovery still has the chance to use existing files rather downloading them again.

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

  • blob.service.ssl.enabled: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

  • restart-strategy: Default restart strategy to use in case no restart strategy has been specified for the job. The options are:
    • fixed delay strategy: fixed-delay.
    • failure rate strategy: failure-rate.
    • no restarts: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: Number of restart attempts, used if the default restart strategy is set to “fixed-delay”. Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: Delay between restart attempts, used if the default restart strategy is set to “fixed-delay”. (default: 1 s)

  • restart-strategy.failure-rate.max-failures-per-interval: Maximum number of restarts in given time interval before failing a job in “failure-rate” strategy. Default value is 1.

  • restart-strategy.failure-rate.failure-rate-interval: Time interval for measuring failure rate in “failure-rate” strategy. Default value is 1 minute.

  • restart-strategy.failure-rate.delay: Delay between restart attempts, used if the default restart strategy is set to “failure-rate”. Default value is the akka.ask.timeout.

  • jobstore.cache-size: The job store cache size in bytes which is used to keep completed jobs in memory (DEFAULT: 52428800 (50 MB)).

  • jobstore.expiration-time: The time in seconds after which a completed job expires and is purged from the job store (DEFAULT: 3600).

Full Reference

HDFS

Note: These keys are deprecated and it is recommended to configure the Hadoop path with the environment variable HADOOP_CONF_DIR instead.

These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files) Files will also be written with default HDFS parameters (block size, replication factor).

  • fs.hdfs.hadoopconf: The absolute path to the Hadoop File System’s (HDFS) configuration directory (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (hdfs:///path/to/files, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like hdfs://address:port/path/to/files. This option also causes file writers to pick up the HDFS’s default values for block sizes and replication factors. Flink will look for the “core-site.xml” and “hdfs-site.xml” files in the specified directory.

  • fs.hdfs.hdfsdefault: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null).

  • fs.hdfs.hdfssite: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).

Core

Key Default Description
chain.eagerly.enabled
false Whether operators are chained more eagerly when the parallelism is one
classloader.parent-first-patterns.additional
(none) A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. These patterns are appended to "classloader.parent-first-patterns.default".
classloader.parent-first-patterns.default
"java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback" A (semicolon-separated) list of patterns that specifies which classes should always be resolved through the parent ClassLoader first. A pattern is a simple prefix that is checked against the fully qualified class name. This setting should generally not be modified. To add another pattern we recommend to use "classloader.parent-first-patterns.additional" instead.
classloader.resolve-order
"child-first" Defines the class resolution strategy when loading classes from user code, meaning whether to first check the user code jar ("child-first") or the application classpath ("parent-first"). The default settings indicate to load classes first from the user code jar, which means that user code jars can include and load different dependencies than Flink uses (transitively).
io.tmp.dirs
System.getProperty("java.io.tmpdir")
mode
"new" Switch to select the execution mode. Possible values are 'new' and 'legacy'.
parallelism.default
1
partitioner.default
"REBALANCE" The default stream partitioner, used when the upstream and downstream parallelisms are not equal and partitioner is not specified. Possible values are 'RESCALE' and 'REBALANCE'.
resource.cpu.cores.default
0.01 CPU cores for operators, use double so we can specify cpu like 0.1.
resource.heap.mb.default
16 Java heap size (in megabytes) for operators.
user-jars.upload.disabled
false

JobManager

Key Default Description
jobmanager.archive.fs.dir
(none)
jobmanager.execution.attempts-history-size
16 The maximum number of prior execution attempts kept in history.
jobmanager.execution.failover-strategy
"full" The strategy to handle task failures. 'full' failover strategy will restart all tasks in the job. 'region' failover strategy will restart the tasks in the same region with the failed task. Regions are PIPELINED connected task groups in a job.
jobmanager.execution.failover-strategy.region.attempts
100 The maximum number that a region can attempt to restart before triggering job failures. This only works with 'region' failover strategy.
jobmanager.execution.graph-manager-plugin
(none) The class name of the graph manager plugin.
jobmanager.failover.operation-log-flush-interval
3000 The operation log store flush interval in ms.
jobmanager.failover.operation-log-store
"none" The operation log store type for job master failover.
jobmanager.failover.reconcile-timeout
60 The timeout for job master to reconcile with task executors for recovering the execution status.
jobmanager.heap.mb
1024 JVM heap size (in megabytes) for the JobManager.
jobmanager.resourcemanager.reconnect-interval
2000 This option specifies the interval in order to trigger a resource manager reconnection if the connection to the resource manager has been lost. This option is only intended for internal use.
jobmanager.rpc.address
(none) The config parameter defining the network address to connect to for communication with the job manager. This value is only interpreted in setups where a single JobManager with static name or address exists (simple standalone setups, or container setups with dynamic service name resolution). It is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.rpc.port
6123 The config parameter defining the network port to connect to for communication with the job manager. Like jobmanager.rpc.address, this value is only interpreted in setups where a single JobManager with static name/address and port exists (simple standalone setups, or container setups with dynamic service name resolution). This config option is not used in many high-availability setups, when a leader-election service (like ZooKeeper) is used to elect and discover the JobManager leader from potentially multiple standby JobManagers.
jobmanager.update-partition-info.send-interval
10 The interval of send update-partition-info message.
jobstore.cache-size
52428800 The job store cache size in bytes which is used to keep completed jobs in memory.
jobstore.expiration-time
3600 The time in seconds after which a completed job expires and is purged from the job store.
slot.enable-shared-slot
true Whether to enable slot sharing group when allocating slots in Slot Pool.
slot.idle.timeout
50000 The timeout in milliseconds for a idle slot in Slot Pool.
slot.request.timeout
300000 The timeout in milliseconds for requesting a slot from Slot Pool.

TaskManager

Key Default Description
io.manager.async.num-read-write-thread
-1 The number of async read write thread. If not positive, it will be adjusted to max(1, number of temp dirs) for TM shuffle and max(2, 2 * number of disks) for YARN shuffle.
io.manager.buffered.read.size
-1 The buffer size of io manager buffered read, -1 mean not use buffered read, this will reduce random IO, but will result in more than one copy.
io.manager.buffered.write.size
-1 The buffer size of io manager buffered write, -1 mean not use buffered write, this will reduce random IO, but will result in more than one copy.
task.blocking.shuffle.type
"TM" The type of shuffle service used for blocking edge. Currently it can be configured to TM or YARN.
task.cancellation.interval
30000 Time interval between two successive task cancellation attempts in milliseconds.
task.cancellation.timeout
180000 Timeout in milliseconds after which a task cancellation times out and leads to a fatal TaskManager error. A value of 0 deactivates the watch dog.
task.cancellation.timers.timeout
7500
task.checkpoint.alignment.max-size
-1 The maximum number of bytes that a checkpoint alignment may buffer. If the checkpoint alignment buffers more than the configured amount of data, the checkpoint is aborted (skipped). A value of -1 indicates that there is no limit.
task.external.shuffle.compression.buffer-size
65536 The max buffer size to compress external shuffle data.
task.external.shuffle.compression.codec
"lz4" The codec to use when compress or decompress external shuffle data. Currently supported codecs are lz4, bzip2, gzip. User can also implement interface BlockCompressionFactory and set its class to specify other codecs.
task.external.shuffle.compression.enable
true Whether to enable compress shuffle data when using external shuffle.
task.external.shuffle.consumed-partition-ttl-in-seconds
3600 The time interval to delete the fully consumed shuffle data directories since they become inactive.
task.external.shuffle.max-concurrent-requests
2000 The maximum number of concurrent requests in the reduce-side tasks.
task.external.shuffle.partial-consumed-partition-ttl-in-seconds
43200 The time interval to delete the partially consumed shuffle data directories since they become inactive.
task.external.shuffle.unconsumed-partition-ttl-in-seconds
43200 TThe time interval to delete the unconsumed shuffle data directories since they are ready to consume.
task.external.shuffle.unfinished-partition-ttl-in-seconds
3600 The time interval to delete the writing shuffle data directories since the last writing.
taskmanager.capacity.cpu.core
-1.0 The overall cpu cores allocated to the task manager.
taskmanager.capacity.memory.mb
-1 The overall memory in MB that allocated to the task manager.
taskmanager.cpu.core
1.0 How many physical cpu cores a task manager will supply for user
taskmanager.data.port
0 The task manager’s port used for data exchange operations.
taskmanager.data.ssl.enabled
true Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true
taskmanager.debug.memory.log
false Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
taskmanager.debug.memory.log-interval
5000 The interval (in ms) for the log thread to log the current memory usage.
taskmanager.direct.memory.mb
0 How many direct memory (in megabytes) a task manager will supply for user.
taskmanager.exit-on-fatal-akka-error
false Whether the quarantine monitor for task managers shall be started. The quarantine monitor shuts down the actor system if it detects that it has quarantined another actor system or if it has been quarantined by another actor system.
taskmanager.extended.resources
(none) Extended resources will supply for user. Specified as resource-type:value pairs separated by commas. such as GPU:1,FPGA:1.
taskmanager.floating.memory.size
0
taskmanager.heap.mb
1024 How many heap memory (in megabytes) a task manager will supply for user, not including managed memory.
taskmanager.host
(none) The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.
taskmanager.jvm-exit-on-oom
false Whether to kill the TaskManager when the task thread throws an OutOfMemoryError.
taskmanager.jvm.memory.dynamic.young.ratio
0.25 Ratio of young generation for dynamic memory in task manager.
taskmanager.jvm.memory.persistent.young.ratio
0.1 Ratio of young generation for persistent memory in task manager.
taskmanager.managed.memory.size
-1 Amount of memory to be allocated by the task manager's memory manager (in megabytes). If not set, a relative fraction will be allocated.
taskmanager.memory.fraction
0.7 The relative amount of memory (after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of `0.8` means that a task manager reserves 80% of its memory for internal data buffers, leaving 20% of free memory for the task manager's heap for objects created by user-defined functions. This parameter is only evaluated, if taskmanager.managed.memory.size is not set.
taskmanager.memory.off-heap
false Memory allocation method (JVM heap or off-heap), used for managed memory of the TaskManager as well as the network buffers.
taskmanager.memory.preallocate
false Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
taskmanager.memory.segment-size
32768 Size of memory buffers used by the network stack and the memory manager (in bytes).
taskmanager.multi-slots.max.cpu.core
1.0 Cpu core limitation, used to decide how many slots can be placed on a taskmanager.
taskmanager.multi-slots.max.extended-resources
(none) Extended resources limitation, used to decide how many slots can be placed on a taskmanger. String format is like "GPU=10,FPGA=12".
taskmanager.multi-slots.max.memory.mb
32768 Memory (in megabytes) limitation, used to decide how many slots can be placed on a taskmanager.
taskmanager.multi-slots.min.cpu.core
1.0 Min cpu core for a taskmanager.
taskmanager.multi-slots.min.memory.mb
1024 Min memory (in megabytes) for taskmanager.
taskmanager.native.memory.mb
0 How many native memory (in megabytes) a task manager will supply for user.
taskmanager.network.check-partition-producer-state
false Boolean flag indicates whether to check partition producer state if the task requests a partition failed and wants to re-trigger the partition request. The task will re-trigger the partition request if the producer is healthy or fail otherwise.
taskmanager.network.detailed-metrics
false Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue lengths.
taskmanager.network.memory.buffers-per-channel
2 Maximum number of network buffers to use for each outgoing/incoming channel (subpartition/input channel).In credit-based flow control mode, this indicates how many credits are exclusive in each input channel. It should be configured at least 2 for good performance. 1 buffer is for receiving in-flight data in the subpartition and 1 buffer is for parallel serialization.
taskmanager.network.memory.buffers-per-external-blocking-channel
16 The number of buffers available for each external blocking channel.
taskmanager.network.memory.buffers-per-subpartition
2
taskmanager.network.memory.floating-buffers-per-external-blocking-gate
0 taskmanager.network.memory.floating-buffers-per-external-blocking-gate
taskmanager.network.memory.floating-buffers-per-gate
8 Number of extra network buffers to use for each outgoing/incoming gate (result partition/input gate). In credit-based flow control mode, this indicates how many floating credits are shared among all the input channels. The floating buffers are distributed based on backlog (real-time output buffers in the subpartition) feedback, and can help relieve back-pressure caused by unbalanced data distribution among the subpartitions. This value should be increased in case of higher round trip times between nodes and/or larger number of machines in the cluster.
taskmanager.network.memory.fraction
0.1 Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that "taskmanager.network.memory.min"` and "taskmanager.network.memory.max" may override this fraction.
taskmanager.network.memory.max
1073741824 Maximum memory size for network buffers (in bytes).
taskmanager.network.memory.min
67108864 Minimum memory size for network buffers (in bytes).
taskmanager.network.request-backoff.initial
100 Minimum backoff for partition requests of input channels.
taskmanager.network.request-backoff.max
10000 Maximum backoff for partition requests of input channels.
taskmanager.numberOfTaskSlots
1 The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).
taskmanager.output.hash.max-subpartitions
200 The maximum number of subpartitions supported by the hash writer.
taskmanager.output.local-disk.type
(none) The disk type preferred to write the shuffle data. If not specified, all the root directories are feasible. If specified, only directories with the configured type are feasible.
taskmanager.output.local-output-dirs
(none) The available directories for the external shuffle service. It will be configured automatically and should not be configured manually.
taskmanager.output.memory.mb
200 The write buffer size for each output in a task.
taskmanager.output.merge.enable-async-merge
false Whether to start merge while writing has not been finished.
taskmanager.output.merge.factor
64 The maximum number of files to merge at once when using the merge writer.
taskmanager.output.merge.merge-to-one-file
true Whether to merge to one file finally when using the merge writer. If not, the merge stops once the number of files are less than taskmanager.output.merge.factor.
taskmanager.process.heap.memory.mb
128 The heap memory (in megabytes) used for task manager process.
taskmanager.process.native.memory.mb
0 The native memory (in megabytes) used for task manager process.
taskmanager.process.netty.memory.mb
64 The direct memory (in megabytes) used for netty framework in the task manager process.
taskmanager.reconnection.timeout
"1 min" Defines the maximum time it can take for the TaskManager reconnection. If the duration is exceeded without a successful reconnection, then disassociate from JM.
taskmanager.registration.initial-backoff
"500 ms" The initial registration backoff between two consecutive registration attempts. The backoff is doubled for each new registration attempt until it reaches the maximum registration backoff.
taskmanager.registration.max-backoff
"30 s" The maximum registration backoff between two consecutive registration attempts. The max registration backoff requires a time unit specifier (ms/s/min/h/d).
taskmanager.registration.refused-backoff
"10 s" The backoff after a registration has been refused by the job manager before retrying to connect.
taskmanager.registration.timeout
"5 min" Defines the timeout for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates.
taskmanager.resourceProfile
(none) The resource profile of a slot in a task executor.
taskmanager.rpc.port
"0" The task manager’s IPC port. Accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.
taskmanager.total.resourceProfile
(none) The total resource profile of all the slots in a task executor.

Distributed Coordination (via Akka)

Key Default Description
akka.ask.timeout
"10 s" Timeout used for all futures and blocking Akka calls. If Flink fails due to timeouts then you should try to increase this value. Timeouts can be caused by slow machines or a congested network. The timeout value requires a time-unit specifier (ms/s/min/h/d).
akka.client.timeout
"60 s" Timeout for all blocking calls on the client side.
akka.framesize
"10485760b" Maximum size of messages which are sent between the JobManager and the TaskManagers. If Flink fails because messages exceed this limit, then you should increase it. The message size requires a size-unit specifier.
akka.jvm-exit-on-fatal-error
true Exit JVM on fatal Akka errors.
akka.log.lifecycle.events
false Turns on the Akka’s remote logging of events. Set this value to ‘true’ in case of debugging.
akka.lookup.timeout
"10 s" Timeout used for the lookup of the JobManager. The timeout value has to contain a time-unit specifier (ms/s/min/h/d).
akka.retry-gate-closed-for
50 Milliseconds a gate should be closed for after a remote connection was disconnected.
akka.ssl.enabled
true Turns on SSL for Akka’s remote communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true.
akka.startup-timeout
(none) Timeout after which the startup of a remote component is considered being failed.
akka.tcp.timeout
"20 s" Timeout for all outbound connections. If you should experience problems with connecting to a TaskManager due to a slow network, you should increase this value.
akka.throughput
15 Number of messages that are processed in a batch before returning the thread to the pool. Low values denote a fair scheduling whereas high values can increase the performance at the cost of unfairness.
akka.transport.heartbeat.interval
"1000 s" Heartbeat interval for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the interval to a very high value. In case you should need the transport failure detector, set the interval to some reasonable value. The interval value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.heartbeat.pause
"6000 s" Acceptable heartbeat pause for Akka’s transport failure detector. Since Flink uses TCP, the detector is not necessary. Therefore, the detector is disabled by setting the pause to a very high value. In case you should need the transport failure detector, set the pause to some reasonable value. The pause value requires a time-unit specifier (ms/s/min/h/d).
akka.transport.threshold
300.0 Threshold for the transport failure detector. Since Flink uses TCP, the detector is not necessary and, thus, the threshold is set to a high value.
akka.watch.heartbeat.interval
"10 s" Heartbeat interval for Akka’s DeathWatch mechanism to detect dead TaskManagers. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should decrease this value or increase akka.watch.heartbeat.pause. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a>.
akka.watch.heartbeat.pause
"60 s" Acceptable heartbeat pause for Akka’s DeathWatch mechanism. A low value does not allow an irregular heartbeat. If TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should increase this value or decrease akka.watch.heartbeat.interval. Higher value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a>.
akka.watch.threshold
12 Threshold for the DeathWatch failure detector. A low value is prone to false positives whereas a high value increases the time to detect a dead TaskManager. A thorough description of Akka’s DeathWatch can be found <a href="http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector">here</a>.

REST

Key Default Description
rest.address
(none) The address that should be used by clients to connect to the server.
rest.await-leader-timeout
30000 The time in ms that the client waits for the leader address, e.g., Dispatcher or WebMonitorEndpoint
rest.bind-address
(none) The address that the server binds itself.
rest.client.max-content-length
104857600 The maximum content length in bytes that the client will handle.
rest.connection-timeout
15000 The maximum time in ms for the client to establish a TCP connection.
rest.poll.wait-strategy.max-interval
2000 The max waited milliseconds of retries to poll the asynchronously created resource if the resource is not completed.
rest.port
8081 The port that the server listens on / the client connects to.
rest.retry.delay
3000 The time in ms that the client waits between retries (See also `rest.retry.max-attempts`).
rest.retry.max-attempts
20 The number of retries the client will attempt if a retryable operations fails.
rest.server.max-content-length
104857600 The maximum content length in bytes that the server will handle.

Blob Server

Key Default Description
blob.fetch.backlog
1000 The config parameter defining the backlog of BLOB fetches on the JobManager.
blob.fetch.num-concurrent
50 The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves.
blob.fetch.retries
5 The config parameter defining number of retires for failed BLOB fetches.
blob.offload.minsize
1048576 The minimum size for messages to be offloaded to the BlobServer.
blob.server.port
"0" The config parameter defining the server port of the blob service.
blob.service.cleanup.interval
3600 Cleanup interval of the blob caches at the task managers (in seconds).
blob.service.ssl.enabled
true Flag to override ssl support for the blob service transport.
blob.storage.directory
(none) The config parameter defining the storage directory to be used by the blob server.

Heartbeat Manager

Key Default Description
heartbeat.interval
10000 Time interval for requesting heartbeat from sender side.
heartbeat.timeout
50000 Timeout for requesting and receiving heartbeat for both sender and receiver sides.

SSL Settings

Key Default Description
security.ssl.algorithms
"TLS_RSA_WITH_AES_128_CBC_SHA" The comma separated list of standard SSL algorithms to be supported. Read more <a href="http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#ciphersuites">here</a>.
security.ssl.enabled
false Turns on SSL for internal network communication. This can be optionally overridden by flags defined in different transport modules.
security.ssl.key-password
(none) The secret to decrypt the server key in the keystore.
security.ssl.keystore
(none) The Java keystore file to be used by the flink endpoint for its SSL Key and Certificate.
security.ssl.keystore-password
(none) The secret to decrypt the keystore file.
security.ssl.protocol
"TLSv1.2" The SSL protocol version to be supported for the ssl transport. Note that it doesn’t support comma separated list.
security.ssl.truststore
(none) The truststore file containing the public CA certificates to be used by flink endpoints to verify the peer’s certificate.
security.ssl.truststore-password
(none) The secret to decrypt the truststore.
security.ssl.verify-hostname
true Flag to enable peer’s hostname verification during ssl handshake.

Network communication (via Netty)

These parameters allow for advanced tuning. The default values are sufficient when running concurrent high-throughput jobs on a large cluster.

Key Default Description
taskmanager.network.netty.client.connectTimeoutSec
120 The Netty client connection timeout.
taskmanager.network.netty.client.numThreads
-1 The number of Netty client threads.
taskmanager.network.netty.max-order
9 The power of 2 of the number of pages in each chunk.
taskmanager.network.netty.num-arenas
-1 The number of Netty arenas.
taskmanager.network.netty.sendReceiveBufferSize
0 The Netty send and receive buffer size. This defaults to the system buffer size (cat /proc/sys/net/ipv4/tcp_[rw]mem) and is 4 MiB in modern Linux.
taskmanager.network.netty.server.backlog
0 The netty server connection backlog.
taskmanager.network.netty.server.numThreads
-1 The number of Netty server threads.
taskmanager.network.netty.transport
"nio" The Netty transport type, either "nio" or "epoll"

Web Frontend

Key Default Description
web.access-control-allow-origin
"*"
web.address
(none)
web.backpressure.cleanup-interval
600000
web.backpressure.delay-between-samples
50
web.backpressure.num-samples
100
web.backpressure.refresh-interval
60000
web.checkpoints.history
10
web.history
5
web.log.path
(none)
web.refresh-interval
3000
web.ssl.enabled
true
web.submit.enable
true
web.timeout
10000
web.tmpdir
System.getProperty("java.io.tmpdir")
web.upload.dir
(none)

File Systems

Key Default Description
fs.default-scheme
(none) The default filesystem scheme, used for paths that do not declare a scheme explicitly. May contain an authority, e.g. host:port in case of a HDFS NameNode.
fs.output.always-create-directory
false File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to "true", writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to "false", the writer will directly create the file directly at the output path, without creating a containing directory.
fs.overwrite-files
false Specifies whether file output writers should overwrite existing files by default. Set to "true" to overwrite by default,"false" otherwise.

Compiler/Optimizer

Key Default Description
compiler.delimited-informat.max-line-samples
10 he maximum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.max-sample-len
2097152 The maximal length of a line sample that the compiler takes for delimited inputs. If the length of a single sample exceeds this value (possible because of misconfiguration of the parser), the sampling aborts. This value can be overridden for a specific input with the input format’s parameters.
compiler.delimited-informat.min-line-samples
2 The minimum number of line samples taken by the compiler for delimited inputs. The samples are used to estimate the number of records. This value can be overridden for a specific input with the input format’s parameters

Runtime Algorithms

Key Default Description
taskmanager.runtime.hashjoin-bloom-filters
false Flag to activate/deactivate bloom filters in the hybrid hash join implementation. In cases where the hash join needs to spill to disk (datasets larger than the reserved fraction of memory), these bloom filters can greatly reduce the number of spilled records, at the cost some CPU cycles.
taskmanager.runtime.max-fan
128 The maximal fan-in for external merge joins and fan-out for spilling hash tables. Limits the number of file handles per operator, but may cause intermediate merging/partitioning, if set too small.
taskmanager.runtime.sort-spilling-threshold
0.8 A sort operation starts spilling when this fraction of its memory budget is full.

Resource Manager

The configuration keys in this section are independent of the used resource management framework (YARN, Mesos, Standalone, …)

Key Default Description
containerized.heap-cutoff-min
600 Minimum amount of heap memory to remove in containers, as a safety margin. This config option will not take effect in all session mode of Yarn/Kubernetes/Standalone. Please use fine-grained config option instead. (taskmanager.process.heap.memory.mb,taskmanager.process.netty.memory.mb,taskmanager.process.native.memory.mb)
containerized.heap-cutoff-ratio
0.25 Percentage of heap space to remove from containers (YARN / Mesos), to compensate for other JVM memory usage. This config option will not take effect in all session mode of Yarn/Kubernetes/Standalone. Please use fine-grained config option instead. (taskmanager.process.heap.memory.mb,taskmanager.process.netty.memory.mb,taskmanager.process.native.memory.mb)
local.number-resourcemanager
1
resourcemanager.job.timeout
"5 minutes" Timeout for jobs which don't have a job manager as leader assigned.
resourcemanager.rpc.port
0 Defines the network port to connect to for communication with the resource manager. By default, the port of the JobManager, because the same ActorSystem is used. Its not possible to use this configuration key to define port ranges.

YARN

Key Default Description
job.app-master-core
1
yarn.application-attempts
(none) Number of ApplicationMaster restarts. Note that that the entire Flink cluster will restart and the YARN Client will lose the connection. Also, the JobManager address will change and you’ll need to set the JM host:port manually. It is recommended to leave this option at 1.
yarn.application-master.port
"0" With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.
yarn.appmaster.rpc.address
(none) The hostname or address where the application master RPC system is listening.
yarn.appmaster.rpc.port
-1 The port where the application master RPC system is listening.
yarn.container-launcher-number
10
yarn.container-register-timeout
120
yarn.heartbeat-delay
5 Time between heartbeats with the ResourceManager in seconds.
yarn.maximum-failed-containers
(none) Maximum number of containers the system is going to reallocate in case of a failure.
yarn.per-job-cluster.include-user-jar
"ORDER" Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on their name ("ORDER"). Setting this parameter to "DISABLED" causes the jar to be included in the user class path instead.
yarn.properties-file.location
(none) When a Flink job is submitted to YARN, the JobManager’s host and the number of available processing slots is written into a properties file, so that the Flink client is able to pick those details up. This configuration parameter allows changing the default location of that file (for example for environments sharing a Flink installation between users).
yarn.tags
(none) A comma-separated list of tags to apply to the Flink YARN application.
yarn.vcore-ratio
1

Mesos

Key Default Description
mesos.failover-timeout
600 The failover timeout in seconds for the Mesos scheduler, after which running tasks are automatically shut down.
mesos.initial-tasks
0 The initial workers to bring up when the master starts
mesos.master
(none) The Mesos master URL. The value should be in one of the following forms: "host:port", "zk://host1:port1,host2:port2,.../path", "zk://username:password@host1:port1,host2:port2,.../path" or "file:///path/to/file"
mesos.maximum-failed-tasks
-1 The maximum number of failed workers before the cluster fails. May be set to -1 to disable this feature
mesos.resourcemanager.artifactserver.port
0 The config parameter defining the Mesos artifact server port to use. Setting the port to 0 will let the OS choose an available port.
mesos.resourcemanager.artifactserver.ssl.enabled
true Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to be set to true encryption to enable encryption.
mesos.resourcemanager.framework.name
"Flink" Mesos framework name
mesos.resourcemanager.framework.principal
(none) Mesos framework principal
mesos.resourcemanager.framework.role
"*" Mesos framework role definition
mesos.resourcemanager.framework.secret
(none) Mesos framework secret
mesos.resourcemanager.framework.user
(none) Mesos framework user

Mesos TaskManager

Key Default Description
mesos.constraints.hard.hostattribute
(none) Constraints for task placement on mesos.
mesos.resourcemanager.tasks.bootstrap-cmd
(none)
mesos.resourcemanager.tasks.container.docker.parameters
(none) Custom parameters to be passed into docker run command when using the docker containerizer. Comma separated list of "key=value" pairs. The "value" may contain '='.
mesos.resourcemanager.tasks.container.image.name
(none) Image name to use for the container.
mesos.resourcemanager.tasks.container.type
"mesos" Type of the containerization used: “mesos” or “docker”.
mesos.resourcemanager.tasks.container.volumes
(none) A comma separated list of [host_path:]container_path[:RO|RW]. This allows for mounting additional volumes into your container.
mesos.resourcemanager.tasks.cpus
0.0 CPUs to assign to the Mesos workers.
mesos.resourcemanager.tasks.gpus
0
mesos.resourcemanager.tasks.hostname
(none)
mesos.resourcemanager.tasks.mem
1024 Memory to assign to the Mesos workers in MB.
mesos.resourcemanager.tasks.taskmanager-cmd
"$FLINK_HOME/bin/mesos-taskmanager.sh"
taskmanager.numberOfTaskSlots
1 The number of parallel operator or user function instances that a single TaskManager can run. If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager's machine has (e.g., equal to the number of cores, or half the number of cores).

Kubernetes

Key Default Description
kubernetes.cluster-id
(none) The custom name for the Flink cluster on Kubernetes. It could be specified by -nm argument. If it's not set, the client will generate a random UUID name
kubernetes.connection.retry.interval.ms
1000 The retry interval in milliseconds for RM talking to kubernetes.
kubernetes.connection.retry.times
120 The max retry attempts for RM talking to kubernetes.
kubernetes.container-start-command-template
"%java% %classpath% %jvmmem% %jvmopts% %logging% %class%" Template for the kubernetes container start invocation
kubernetes.container.files
(none) Files to be used for Flink containers, will be transferred to flink conf directory and appended to classpath in containers.
kubernetes.container.image
"flink-k8s:latest" Container image to use for Flink containers. Individual container types (e.g. jobmanager or taskmanager) can also be configured to use different images if desired, by setting the container type-specific image name.
kubernetes.container.image.pullPolicy
"IfNotPresent" Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.
kubernetes.destroy-perjob-cluster.after-job-finished
true Whether to kill perjob-cluster on kubernetes after job finished.If you want to check logs and view dashboard after job finished, set this to false.
kubernetes.flink.conf.dir
"/etc/flink/conf" The conf dir will be mounted in pod.
kubernetes.jobmanager.container.image
(none) Container image to use for the jobmanager.
kubernetes.jobmanager.container.name
"flink-kubernetes-jobmanager" Name of the jobmanager container.
kubernetes.jobmanager.cpu
1.0 The number of cpu used by job manager
kubernetes.jobmanager.pod.name
"jobmanager" Name of the jobmanager pod.
kubernetes.jobmanager.service-account
"default" Service account that is used by jobmanager within kubernetes cluster. The job manager uses this service account when requesting taskmanager pods from the API server.
kubernetes.master.url
"localhost:8080" The kubernetes master url.
kubernetes.namespace
"default" The namespace that will be used for running the jobmanager and taskmanager pods.
kubernetes.program.args
(none) Arguments specified for user program.
kubernetes.program.entrypoint.class
(none) Class with the program entry point ("main" method or "getPlan()" method. Only needed if the JAR file does not specify the class in its manifest.
kubernetes.service.exposed.type
"CLUSTER_IP" It could be CLUSTER_IP(default)/NODE_PORT/LOAD_BALANCER/EXTERNAL_NAME.
kubernetes.service.external.address
"localhost" The exposed address of kubernetes service to submit job and view dashboard.
kubernetes.taskmanager.count
1 The task manager count for session cluster.
kubernetes.taskmanager.register-timeout
120 The register timeout for a task manager before released by resource manager. In seconds.In case of a task manager took very long time to be launched.
kubernetes.workernode.max-failed-attempts
100 The max failed attempts for work node.

High Availability (HA)

Key Default Description
high-availability
"NONE" Defines high-availability mode used for the cluster execution. To enable high-availability, set this mode to "FILESYSTEM" or "ZOOKEEPER".
high-availability.cluster-id
"/default" The ID of the Flink cluster, used to separate multiple Flink clusters from each other. Needs to be set for standalone clusters but is automatically inferred in YARN and Mesos.
high-availability.filesystem.path.jobgraphs
"/tmp/jobgraphs" FileSystem root path for job graphs.
high-availability.job.delay
(none) The time before a JobManager after a fail over recovers the current jobs.
high-availability.jobmanager.port
"0" Optional port (range) used by the job manager in high-availability mode.
high-availability.storageDir
(none) File system path (URI) where Flink persists metadata in high-availability setups.

ZooKeeper-based HA Mode

Key Default Description
high-availability.zookeeper.client.acl
"open" Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos).
high-availability.zookeeper.client.connection-timeout
15000 Defines the connection timeout for ZooKeeper in ms.
high-availability.zookeeper.client.max-retry-attempts
3 Defines the number of connection retries before the client gives up.
high-availability.zookeeper.client.retry-wait
5000 Defines the pause between consecutive retries in ms.
high-availability.zookeeper.client.session-timeout
60000 Defines the session timeout for the ZooKeeper session in ms.
high-availability.zookeeper.path.checkpoint-counter
"/checkpoint-counter" ZooKeeper root path (ZNode) for checkpoint counters.
high-availability.zookeeper.path.checkpoints
"/checkpoints" ZooKeeper root path (ZNode) for completed checkpoints.
high-availability.zookeeper.path.jobgraphs
"/jobgraphs" ZooKeeper root path (ZNode) for job graphs
high-availability.zookeeper.path.latch
"/leaderlatch" Defines the znode of the leader latch which is used to elect the leader.
high-availability.zookeeper.path.leader
"/leader" Defines the znode of the leader which contains the URL to the leader and the current leader session ID.
high-availability.zookeeper.path.mesos-workers
"/mesos-workers" ZooKeeper root path (ZNode) for Mesos workers.
high-availability.zookeeper.path.root
"/flink" The root path under which Flink stores its entries in ZooKeeper.
high-availability.zookeeper.path.running-registry
"/running_job_registry/"
high-availability.zookeeper.quorum
(none) The ZooKeeper quorum to use, when running Flink in a high-availability mode with ZooKeeper.

FileSystem-based HA Mode

Key Default Description
high-availability.filesystem.path.jobgraphs
"/tmp/jobgraphs" FileSystem root path for job graphs.

ZooKeeper Security

Key Default Description
zookeeper.sasl.disable
false
zookeeper.sasl.login-context-name
"Client"
zookeeper.sasl.service-name
"zookeeper"

Kerberos-based Security

Key Default Description
security.kerberos.login.contexts
(none) A comma-separated list of login contexts to provide the Kerberos credentials to (for example, `Client,KafkaClient` to use the credentials for ZooKeeper authentication and for Kafka authentication)
security.kerberos.login.keytab
(none) Absolute path to a Kerberos keytab file that contains the user credentials.
security.kerberos.login.principal
(none) Kerberos principal name associated with the keytab.
security.kerberos.login.use-ticket-cache
true Indicates whether to read from your Kerberos ticket cache.

Environment

Key Default Description
env.java.opts
(none)
env.java.opts.jobmanager
(none)
env.java.opts.taskmanager
(none)
env.log.dir
(none) Defines the directory where the Flink logs are saved. It has to be an absolute path. (Defaults to the log directory under Flink’s home)
env.log.max
5 The maximum number of old log files to keep.
env.ssh.opts
(none) Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).

Checkpointing

Key Default Description
state.backend
(none) The state backend to be used to store and checkpoint state. Supported values are 'jobmanager' for MemoryStateBackend, 'filesystem' for FsStateBackend, and 'rocksdb' for RocksDBStateBackend.
state.backend.async
true Option whether the state backend should use an asynchronous snapshot method where possible and configurable. Some state backends may not support asynchronous snapshots, or only support asynchronous snapshots, and ignore this option.
state.backend.fs.memory-threshold
1024 The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.
state.backend.incremental
false Option whether the state backend should create incremental checkpoints, if possible. For an incremental checkpoint, only a diff from the previous checkpoint is stored, rather than the complete checkpoint state. Some state backends may not support incremental checkpoints and ignore this option.
state.backend.local-recovery
false
state.backend.rocksdb.localdir
(none) The local directory (on the TaskManager) where RocksDB puts its files.
state.backend.working-dirs
(none) The working directories for file-based state backend.
state.checkpoints.create-subdirs
true Whether to create sub-directories with specific jobId to store the data files and meta data of checkpoints. The default value is true to enable user could run several jobs with the same checkpoint directory simultaneously, if this value is set to false, pay attention to not run several jobs with the same directory simultaneously.
state.checkpoints.dir
(none) The default directory used for storing the data files and meta data of checkpoints in a Flink supported filesystem. The storage path must be accessible from all participating processes/nodes(i.e. all TaskManagers and JobManagers).
state.checkpoints.num-retained
1 The maximum number of completed checkpoints to retain.
state.savepoints.dir
(none) The default directory for savepoints. Used by the state backends that write savepoints to file systems (MemoryStateBackend, FsStateBackend, RocksDBStateBackend).
taskmanager.state.local.root-dirs
(none)

Queryable State

Key Default Description
query.client.network-threads
0 Number of network (Netty's event loop) Threads for queryable state client.
query.proxy.network-threads
0 Number of network (Netty's event loop) Threads for queryable state proxy.
query.proxy.ports
"9069" The port range of the queryable state proxy. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234".
query.proxy.query-threads
0 Number of query Threads for queryable state proxy. Uses the number of slots if set to 0.
query.server.network-threads
0 Number of network (Netty's event loop) Threads for queryable state server.
query.server.ports
"9067" The port range of the queryable state server. The specified range can be a single port: "9123", a range of ports: "50100-50200", or a list of ranges and ports: "50100-50200,50300-50400,51234".
query.server.query-threads
0 Number of query Threads for queryable state server. Uses the number of slots if set to 0.

Metrics

Key Default Description
metrics.latency.history-size
128 Defines the number of measured latencies to maintain at each operator.
metrics.reporter.<name>.<parameter>
(none) Configures the parameter <parameter> for the reporter named <name>.
metrics.reporter.<name>.class
(none) The reporter class to use for the reporter named <name>.
metrics.reporter.<name>.interval
(none) The reporter interval to use for the reporter named <name>.
metrics.reporters
(none)
metrics.scope.delimiter
"."
metrics.scope.jm
"<host>.jobmanager" Defines the scope format string that is applied to all metrics scoped to a JobManager.
metrics.scope.jm.job
"<host>.jobmanager.<job_name>" Defines the scope format string that is applied to all metrics scoped to a job on a JobManager.
metrics.scope.operator
"<host>.taskmanager.<tm_id>.<job_name>.<operator_name>.<subtask_index>" Defines the scope format string that is applied to all metrics scoped to an operator.
metrics.scope.task
"<host>.taskmanager.<tm_id>.<job_name>.<task_name>.<subtask_index>" Defines the scope format string that is applied to all metrics scoped to a task.
metrics.scope.tm
"<host>.taskmanager.<tm_id>" Defines the scope format string that is applied to all metrics scoped to a TaskManager.
metrics.scope.tm.job
"<host>.taskmanager.<tm_id>.<job_name>" Defines the scope format string that is applied to all metrics scoped to a job on a TaskManager.
metrics.tracing.sample.count-interval
100

History Server

You have to configure jobmanager.archive.fs.dir in order to archive terminated jobs and add it to the list of monitored directories via historyserver.archive.fs.dir if you want to display them via the HistoryServer’s web frontend.

  • jobmanager.archive.fs.dir: Directory to upload information about terminated jobs to. You have to add this directory to the list of monitored directories of the history server via historyserver.archive.fs.dir.
Key Default Description
historyserver.archive.fs.dir
(none) Comma separated list of directories to fetch archived jobs from. The history server will monitor these directories for archived jobs. You can configure the JobManager to archive jobs to a directory via `jobmanager.archive.fs.dir`.
historyserver.archive.fs.refresh-interval
10000 Interval in milliseconds for refreshing the archived job directories.
historyserver.web.address
(none) Address of the HistoryServer's web interface.
historyserver.web.port
8082 Port of the HistoryServers's web interface.
historyserver.web.refresh-interval
10000
historyserver.web.ssl.enabled
false Enable HTTPs access to the HistoryServer web frontend. This is applicable only when the global SSL flag security.ssl.enabled is set to true.
historyserver.web.tmpdir
(none) This configuration parameter allows defining the Flink web directory to be used by the history server web interface. The web interface will copy its static files into the directory.

Slot Manager

The configuration keys in this section are relevant for the SlotManager running in the ResourceManager

Key Default Description
slotmanager.request-timeout
600000 The timeout for a slot request to be discarded.
slotmanager.slot-placement-policy
"RANDOM"
slotmanager.taskmanager-timeout
30000 The timeout for an idle task manager to be released.
slotmanager.taskmanager.checker-initial-delay
180000

Legacy

  • mode: Execution mode of Flink. Possible values are legacy and new. In order to start the legacy components, you have to specify legacy (DEFAULT: new).

Background

Configuring the Network Buffers

If you ever see the Exception java.io.IOException: Insufficient number of network buffers, you need to adapt the amount of memory used for network buffers in order for your program to run on your task managers.

Network buffers are a critical resource for the communication layers. They are used to buffer records before transmission over a network, and to buffer incoming data before dissecting it into records and handing them to the application. A sufficient number of network buffers is critical to achieve a good throughput.

Since Flink 1.3, you may follow the idiom "more is better" without any penalty on the latency (we prevent excessive buffering in each outgoing and incoming channel, i.e. *buffer bloat*, by limiting the actual number of buffers used by each channel).

In general, configure the task manager to have enough buffers that each logical network connection you expect to be open at the same time has a dedicated buffer. A logical network connection exists for each point-to-point exchange of data over the network, which typically happens at repartitioning or broadcasting steps (shuffle phase). In those, each parallel task inside the TaskManager has to be able to talk to all other parallel tasks.

Note: Since Flink 1.5, network buffers will always be allocated off-heap, i.e. outside of the JVM heap, irrespective of the value of taskmanager.memory.off-heap. This way, we can pass these buffers directly to the underlying network stack layers.

Setting Memory Fractions

Previously, the number of network buffers was set manually which became a quite error-prone task (see below). Since Flink 1.3, it is possible to define a fraction of memory that is being used for network buffers with the following configuration parameters:

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers (DEFAULT: 0.1),
  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB),
  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB), and
  • taskmanager.memory.segment-size: Size of memory buffers used by the memory manager and the network stack in bytes (DEFAULT: 32768 (= 32 KiBytes)).

Setting the Number of Network Buffers directly

Note: This way of configuring the amount of memory used for network buffers is deprecated. Please consider using the method above by defining a fraction of memory to use.

The required number of buffers on a task manager is total-degree-of-parallelism (number of targets) * intra-node-parallelism (number of sources in one task manager) * n with n being a constant that defines how many repartitioning-/broadcasting steps you expect to be active at the same time. Since the intra-node-parallelism is typically the number of cores, and more than 4 repartitioning or broadcasting channels are rarely active in parallel, it frequently boils down to

#slots-per-TM^2 * #TMs * 4

Where #slots per TM are the number of slots per TaskManager and #TMs are the total number of task managers.

To support, for example, a cluster of 20 8-slot machines, you should use roughly 5000 network buffers for optimal throughput.

Each network buffer has by default a size of 32 KiBytes. In the example above, the system would thus allocate roughly 300 MiBytes for network buffers.

The number and size of network buffers can be configured with the following parameters:

  • taskmanager.network.numberOfBuffers, and
  • taskmanager.memory.segment-size.

Configuring Temporary I/O Directories and Threads

Although Flink aims to process as much data in main memory as possible, it is not uncommon that more data needs to be processed than memory is available. Flink’s runtime is designed to write temporary data to disk to handle these situations.

The taskmanager.tmp.dirs parameter specifies a list of directories into which Flink writes temporary files. The paths of the directories need to be separated by ‘:’ (colon character). The io.manager.async.num-read-write-thread parameter specifies the number of IO threads. Flink will concurrently write (or read) one or more temporary files to (from) each configured directory. Each file will be bound to a directory and an IO thread. The directories and threads will be selected in a roundrobin way. This way, temporary I/O can be evenly distributed over multiple independent I/O devices such as hard disks to improve performance. To leverage fast I/O devices (e.g., SSD, RAID, NAS), it is possible to specify a directory multiple times.

If the taskmanager.tmp.dirs parameter is not explicitly specified, Flink writes temporary data to the temporary directory of the operating system, such as /tmp in Linux systems. And the default value of io.manager.async.num-read-write-thread is 1.

Configuring TaskManager processing slots

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots.

When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p (for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.

Configure the jobs using external shuffle services

Note: The jobs using the external shuffle services should run in the corresponding environments. For example, the jobs using the YARN shuffle service should be submitted to a YARN cluster.

External shuffle services provide alternative mechanism to shuffle intermediate data for batch jobs. To declare batch jobs with the Table API, the following configuration need to be set:

  • sql.exec.data-exchange-mode.all-batch: When set to true, the job will be executed in batch mode (DEFAULT: false).

By default batch jobs will use TaskManager to shuffle the intermediate data. To use the External shuffle service, add the following configuration:

  • task.blocking.shuffle.type: Currently only TM or YARN are supported (DEFAULT: TM).

Configure the disk type preferred

If there are multiple available root directories, by default Flink will randomly choose one from them to write the shuffle data. If you want to only use directories on specific type of disks, you can configure

  • taskmanager.output.local-disk.type: The disk type preferred to write the shuffle data. The corresponding shuffle services should be configured to be aware of the disk types, like the one described in Configure the root directories for the YARN shuffle service.

Configure the map-side tasks

For the map-side tasks, the intermediate data can be write to the disks with either the hash writer or the merge writer. The hash writer writes the data to different reduce-side tasks to separate files. It is suitable when the number of reduce-side tasks is limited and there are enough write buffers to open all the files concurrently:

  • taskmanager.output.hash.max-subpartitions: The maximum number of subpartitions supported by the hash ​writer.
  • taskmanager.output.memory.mb: The write buffer size for each output edge.​

The merge writer writes the data to different reduce-side tasks to the same files and data to each reduce task is continuous in each file. These files will be merged if the number of files is larger than the threshold:

  • taskmanager.output.merge.factor: The maximum number of files to merge at once.
  • taskmanager.output.merge.merge-to-one-file: Whether to merge to one file finally. If not, the merge stops once the number of files are less than taskmanager.output.merge.factor.
  • taskmanager.output.merge.enable-async-merge: Whether to merge while writing has not been finished.

Configure the reduce side tasks

For the reduce-side tasks, it is better to limit the number of concurrent request to assign more receive buffers to a single request. This is because the shuffle service stops serving a request once it has no receive buffers, a large receive buffer will reduce the number of switches and increase the overall throughput. For the reduce-side tasks:

  • task.external.shuffle.max-concurrent-requests: The maximum number of concurrent requests.
  • taskmanager.network.memory.buffers-per-external-blocking-channel: The number of buffers for each request. The size of buffers is configure by taskmanager.memory.segment-size (DEFAULT: 32768).

Compression

Compression is supported to decrease the data written to disks and sent by network.

  • task.external.shuffle.compression.enable: Whether to enable compression (DEFAULT: false).
  • task.external.shuffle.compression.codec: ​The compression algorithm to use. Currently supported codecs are lz4, bzip2,gzip (DEFAULT: lz4).

Back to top