(1). Checkpoint开启和时间间隔指定
开启检查点,并且指定检查点时间间隔为1000ms.
env.enableCheckpointing(1000);
(2). exactly-ance和at-least-once语义选择
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
(3). Checkpoint超时时间
指定Checkpoint执行过程的上限时间范围,一旦Checkpoint执行时间超过该阀值,Flink将会中断Checkpoint过程,并按照超时时间处理(默认是10分钟)
env.getCheckpointConfig().setCheckpointTimeout(50000);
(4). 检查点之间最小时间间隔
设定两个Checkpoint之间的最小间隔,防止出现Checkpoint积压过多.
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
(5). 最大并行执行的检查点数量
能够最大同时执行的Checkpoint数量,在默认情况下只有一个检查点可以运行.
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
(6). 是否删除Checkpoint中保存数据
RETAIN_ON_CANCELLATION : Flink被Cancel后,会保留Checkpoint数据 DELETE_ON_CANCELLATION : Flink被Cancel后,会删除Checkpoint数据,
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
(7). TolerableCheckpointFailureNumber
设置可以容忍的检查次数,超过这个数量Flink将会自动关闭和停止任务.
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(1)