在流进行转换操作后,Flink通过分区器来精确得控制数据流向。
Flink分区 器如下图
1.随机分区器 ShufflePartitioner
根据均匀分布对元素进行随机划分
@Internal
public class ShufflePartitioner extends StreamPartitioner {
private static final long serialVersionUID = 1L;
private Random random = new Random();
private final int[] returnArray = new int[1];
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord> record,
int numberOfOutputChannels) {
returnArray[0] = random.nextInt(numberOfOutputChannels);
return returnArray;
}
@Override
public StreamPartitioner copy() {
return new ShufflePartitioner();
}
@Override
public String toString() {
return "SHUFFLE";
}
}
自定义分区器CustomPartitionerWrapperydataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
全局分区器GlobalPartitioner
@Internal
public class GlobalPartitioner extends StreamPartitioner {
private static final long serialVersionUID = 1L;
private final int[] returnArray = new int[] { 0 };
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord> record,
int numberOfOutputChannels) {
return returnArray;
}
@Override
public StreamPartitioner copy() {
return this;
}
@Override
public String toString() {
return "GLOBAL";
}
}
4.重行分区 RebalancePartitioner
分区元素轮循,从而为每个分区创建相等的负载。在存在数据偏斜的情况下对性能优化有用。
@Internal
public class RebalancePartitioner extends StreamPartitioner {
private static final long serialVersionUID = 1L;
private final int[] returnArray = {Integer.MAX_VALUE - 1};
@Override
public int[] selectChannels(
SerializationDelegate<StreamRecord> record,
int numChannels) {
int newChannel = ++returnArray[0];
if (newChannel >= numChannels) {
returnArray[0] = resetValue(numChannels, newChannel);
}
return returnArray;
}
private static int resetValue(
int numChannels,
int newChannel) {
if (newChannel == Integer.MAX_VALUE) {
// Initializes the first partition, this branch is only entered when initializing.
return ThreadLocalRandom.current().nextInt(numChannels);
}
return 0;
}
public StreamPartitioner copy() {
return this;
}
@Override
public String toString() {
return "REBALANCE";
}
}
官网参考