五、flink--state状态管理机制

一、flink状态原理

1.1 什么是flink中的状态?为什么需要状态管理?

​ flink运行计算任务的过程中,会有很多中间处理过程。在整个任务运行的过程中,中间存在着多个临时状态,比如说某些数据正在执行一个operator,但是只处理了一半数据,另外一般还没来得及处理,这也是一个状态。
​ 假设运行过程中,由于某些原因,任务挂掉了,或者flink中的多个task中的一个task挂掉了,那么它在内存中的状态都会丢失,如果这时候我们没有存储中间计算的状态,那么就意味着重启这个计算任务时,需要从头开始将原来处理过的数据重新计算一遍。如果存储了中间状态,就可以恢复到中间状态,并从该状态开始继续执行任务。这就是状态管理的意义。所以需要一种机制去保存记录执行过程中的中间状态,这种机制就是状态管理机制。

洱源网站制作公司哪家好,找创新互联!从网页设计、网站建设、微信开发、APP开发、成都响应式网站建设公司等网站项目制作,到程序开发,运营维护。创新互联2013年开创至今到现在10年的时间,我们拥有了丰富的建站经验和运维经验,来保证我们的工作的顺利进行。专注于网站建设就选创新互联

1.2 flink中状态的分类

flink中包括两种基础状态:keyed state(keyed状态)和operator state(operator状态)

1.2.1 keyed状态

keyed状态总是与key一起,且只能用在keyedStream中。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。唯一组合成(operator--key,state)的形式。

1.2.2 operator状态

与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。而且operator state可以应用于非keyed stream中。

举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

1.3 state的存在形式

所有state的存在形式有两种:managed(委托管理)和raw(原始)。
托管方式就是状态管理由flink提供的框架进行管理,通过flink状态管理框架提供的接口,来更新和管理状态的值。这里面包括用于存储状态数据的数据结构,现成的包装类等。

原始方式就是由用户自行管理状态具体的数据结构,框架在做checkpoint的时候(checkpoint是flink进行状态数据持久化存储的机制),使用byte[]来读写状态内容,对其内部数据结构一无所知。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。一般来说,托管状态用的比较多。

1.4 managed 方式提供的接口

flink提供了很多存储state数据的类,类继承图如下:
五、flink--state状态管理机制
​ 图 1.4 flink--状态管理类继承图

可用于存储状态数据的类如下:

ValueState*这保留了一个可以更新和检索的值。即类型为T的单值状态。这个状态与对应的key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过value()方法获取状态值。

ListState*这保存了一个元素列表,即key上的状态值为一个列表。可以追加元素并检索Iterable所有当前存储的元素。可以通过add方法往列表中附加值;也可以通过get()方法返回一个Iterable来遍历状态值。

ReducingState*这保留了一个值,该值表示添加到状态的所有值的聚合。接口类似于ListState,但是添加的元素使用add(T)。每次调用add方法添加值的时候,会调用reduceFunction,最后合并到一个单一的状态值。

AggregatingState*这保留了一个值,该值表示添加到状态的所有值的聚合。和ReducingState不同的是,聚合类型可能与添加到状态的元素类型不同

FoldingState*这保留了一个值,该值表示添加到状态的所有值的聚合。违背ReducingState,聚合类型可能与添加到状态的元素类型不同。接口类似于ListState但是添加的元素使用add(T)使用指定的FoldFunction.这个基本弃用,请用AggregatingState代替

MapState*它保存了一个映射列表。可以将键值对放入状态并检索Iterable所有当前存储的映射。映射使用put(UK, UV)或putAll(Map)添加元素。使用get(UK)获取元素。映射、键和值的可迭代视图可以分别使用entries(), keys()和values()

需要注意的是,以上所述的State对象,仅仅用于与状态进行交互(更新、删除、清空等),而真正的状态值,有可能是存在内存、磁盘、或者其他分布式存储系统中。相当于我们只是持有了这个状态的句柄(state handle)。

接下来看下,我们如何得到这个状态句柄。Flink通过StateDescriptor来定义一个状态。这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息。与上面的状态类型对应。如下:

ValueStateDescriptor
ListStateDescriptor
ReducingStateDescriptor
FoldingStateDescriptor
AggregatingStateDescriptor
MapStateDescriptor

二、使用flink状态管理的方式

2.1 使用状态管理基本流程

以keyed state为例,
1、首先,普通Function接口是不支持状态管理的,也就是一般故障的情况下,状态并没有保存下来,后面需要将所有数据进行重新计算。如果需要支持状态管理,那么我们需要继承实现 RichFunction类。基本常用的function,flink都再封装了对应的RichFunction接口给我们使用,比如普通function中的MapFunction,对应的RichFunction抽象类为RichMapFunction。命名方式对应关系很简单,基本就是 xxxFunciotn -->RichxxxFunction。

2、接着,需要在覆盖实现RichFunction中的对应的算子方法(如map、flatMap等),里面需要实现算子业务逻辑,并将对keyed state进行更新、管理。然后还要重写open方式,用于获取状态句柄。

2.2 使用keyed state例子

下面使用ValueState为例,实现RichFlatMapFunction接口:

public class CountWindowAverage extends RichFlatMapFunction, Tuple2> {

    /**
     * ValueState状态句柄. 第一个值为count,第二个值为sum。
     */
    private transient ValueState> sum;

    @Override
    public void flatMap(Tuple2 input, Collector> out) throws Exception {
        // 获取当前状态值
        Tuple2 currentSum = sum.value();

        // 更新
        currentSum.f0 += 1;
        currentSum.f1 += input.f1;

        // 更新状态值
        sum.update(currentSum);

        // 如果count >=2 清空状态值,重新计算
        if (currentSum.f0 >= 2) {
            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
            sum.clear();
        }
    }

    @Override
    public void open(Configuration config) {
        ValueStateDescriptor> descriptor =
                new ValueStateDescriptor<>(
                        "average", // 状态名称
                        TypeInformation.of(new TypeHint>() {}), // 当个状态数据的类型,这里是tuple,也就是元祖
                        Tuple2.of(0L, 0L)); // 状态默认值
        //获取状态句柄
        sum = getRuntimeContext().getState(descriptor);
    }
}

// 主程序
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
        .keyBy(0)
        .flatMap(new CountWindowAverage())
        .print();

// the printed output will be (1,4) and (1,5)

三、状态管理源码分析

3.1 获取state操作句柄源码分析

以2.2 中的例子为例,看状态句柄是如何获取的。
首先从这里入手:

sum = getRuntimeContext().getState(descriptor);

首先RichFlatMapFunction类中没有这个方法,再到父类AbstractRichFunction,如下:

public abstract class AbstractRichFunction implements RichFunction, Serializable {
    private static final long serialVersionUID = 1L;
    private transient RuntimeContext runtimeContext;

    public AbstractRichFunction() {
    }

    public void setRuntimeContext(RuntimeContext t) {
        this.runtimeContext = t;
    }

    //返回RuntimeContext类的对象
    public RuntimeContext getRuntimeContext() {
        if (this.runtimeContext != null) {
            return this.runtimeContext;
        } else {
            throw new IllegalStateException("The runtime context has not been initialized.");
        }
    }
    ...........................
}

getRuntimeContext()这个方法在这里实际上是返回了StreamingRuntimeContext这个子类对象。所以调用了RichFlatMapFunction.getRuntimeContext().getState方法,最终会调用StreamingRuntimeContext.getState方法:

/* StreamingRuntimeContext */
public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
    ...........
    public  ValueState getState(ValueStateDescriptor stateProperties) {
        //这里获取到KeyedStateStore对象
        KeyedStateStore keyedStateStore = this.checkPreconditionsAndGetKeyedStateStore(stateProperties);
        stateProperties.initializeSerializerUnlessSet(this.getExecutionConfig());
        return keyedStateStore.getState(stateProperties);
    }      
    ...............
}

接着看this.checkPreconditionsAndGetKeyedStateStore

private KeyedStateStore checkPreconditionsAndGetKeyedStateStore(StateDescriptor stateDescriptor) {
        Preconditions.checkNotNull(stateDescriptor, "The state properties must not be null");
        KeyedStateStore keyedStateStore = this.operator.getKeyedStateStore();
        Preconditions.checkNotNull(keyedStateStore, "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation.");
        return keyedStateStore;
    }

再接着看 this.operator.getKeyedStateStore();

/*AbstractStreamOperator*/
.....
public KeyedStateStore getKeyedStateStore() {
        return this.keyedStateStore;
    }
.....

返回了AbstractStreamOperator.keyedStateStore变量。这个变量的初始化在AbstractStreamOperator.initializeState方法中:

public final void initializeState() throws Exception {
        TypeSerializer keySerializer = this.config.getStateKeySerializer(this.getUserCodeClassloader());
        StreamTask containingTask = (StreamTask)Preconditions.checkNotNull(this.getContainingTask());
        CloseableRegistry streamTaskCloseableRegistry = (CloseableRegistry)Preconditions.checkNotNull(containingTask.getCancelables());
        //创建 StreamTaskStateInitializerImpl 对象
        StreamTaskStateInitializer streamTaskStateManager = (StreamTaskStateInitializer)Preconditions.checkNotNull(containingTask.createStreamTaskStateInitializer());
        //这里创建一个当前operator的state context对象
        StreamOperatorStateContext context = streamTaskStateManager.streamOperatorStateContext(this.getOperatorID(), this.getClass().getSimpleName(), this, keySerializer, streamTaskCloseableRegistry);
        //通过state context这个上下文对象获取keyed state和operator state的 backend配置
        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();
        //初始化keyedStateStore,将从配置中创建的statebackend封装到默认的statebackend
        if (this.keyedStateBackend != null) {
            this.keyedStateStore = new DefaultKeyedStateStore(this.keyedStateBackend, this.getExecutionConfig());
        }

        this.timeServiceManager = context.internalTimerServiceManager();
        CloseableIterable keyedStateInputs = context.rawKeyedStateInputs();
        CloseableIterable operatorStateInputs = context.rawOperatorStateInputs();

        try {
            StateInitializationContext initializationContext = new StateInitializationContextImpl(context.isRestored(), this.operatorStateBackend, this.keyedStateStore, keyedStateInputs, operatorStateInputs);
            this.initializeState(initializationContext);
        } finally {
            closeFromRegistry(operatorStateInputs, streamTaskCloseableRegistry);
            closeFromRegistry(keyedStateInputs, streamTaskCloseableRegistry);
        }

    }

containingTask.createStreamTaskStateInitializer(),也就是StreamTask.createStreamTaskStateInitializer()这里有初始化操作,看看

public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        return new StreamTaskStateInitializerImpl(this.getEnvironment(), this.stateBackend, this.timerService);
    }

创建了一个StreamTaskStateInitializerImpl对象,里面传入了 this.stateBackend这个参数,估计就是获取用户指定的stateBackend类型,追踪this.stateBackend这个参数在哪初始化的,找到了StreamTask.invoke这个方法,如下:

public final void invoke() throws Exception {
        ..........
            this.stateBackend = this.createStateBackend();
        ............
            }

调用了this.createStateBackend(),即StreamTask.createStateBackend()继续看

private StateBackend createStateBackend() throws Exception {
    //从application中获取已经创建的statebackend object
        StateBackend fromApplication = this.configuration.getStateBackend(this.getUserCodeClassLoader());
    //加载指定的 statebackend类,并返回statebackend对象
        return StateBackendLoader.fromApplicationOrConfigOrDefault(fromApplication, this.getEnvironment().getTaskManagerInfo().getConfiguration(), this.getUserCodeClassLoader(), LOG);
    }

继续看this.configuration.getStateBackend, 即StreamConfing.getStateBackend

public StateBackend getStateBackend(ClassLoader cl) {
        try {
            //从application的config中获取"statebackend"这个key对应的object,也就是已创建的backend对象。
            return (StateBackend)InstantiationUtil.readObjectFromConfig(this.config, "statebackend", cl);
        } catch (Exception var3) {
            throw new StreamTaskException("Could not instantiate statehandle provider.", var3);
        }
    }

从application的config中获取"statebackend"这个key对应的object,也就是已创建的backend对象。如果application不存在,则从配置中读取应该使用的backend类型,然后加载。看这个方法StateBackendLoader.fromApplicationOrConfigOrDefault。这个方法就是从config配置中读取“state.backend”这个配置项值,然后加载对应的backend。

到最后,要记住this.keyedStateStore = new DefaultKeyedStateStore(this.keyedStateBackend, this.getExecutionConfig());这个是 this.keyedStateStore封装后的对象。
接着回到StreamingRuntimeContext.getState中,看到 this.keyedStateStore.getState,这一行,调用的是DefaultKeyedStateStore.getState,其实就再次进行初始化而已,这里不展开。

3.2 获取和更新状态值源码分析

首先,我们这里使用的是ValueState,它的直接实现子类是HeapValueState。它的存储数据结构,在它的父类AbstractHeapState中,以StateTable<K, N, SV> stateTable的形式存在的,其中K代表Key的类型,N代表state的namespace(这样属于不同namespace的state可以重名),SV代表state value的类型。


文章标题:五、flink--state状态管理机制
分享链接:http://pwwzsj.com/article/pggops.html