cola架构:有限状态机(FSM)源码分析
目录
0. cola状态机简述
1.cola状态机使用实例
2.cola状态机源码解析
2.1 语义模型源码
2.1.1 Condition和Action接口
2.1.2 State
2.1.3 Transition接口
2.1.4 StateMachine接口
2.2 Builder模式
2.2.1 StateMachine Builder模式
2.2.2 ExternalTransitionBuilder---转移构造Builder模式
2.2.2.1 链式调用顺序保障
2.2.2.2 链式调用具体实现
0. cola状态机简述
cola状态机采用无状态设计,不存储中间状态,重点关注状态之间的转移,这样的设计使得cola更加的简单、轻量、易于上手,在高并发多线程场景下应用单例模式可以实现更高的性能;
cola状态机框架根据实际使用场景进行抽象领域建模,抽象出如下的语义模型:
- State:状态
- Event:事件,状态由事件触发,引起变化
- Transition:流转,表示从一个状态到另一个状态
- External Transition:外部流转,两个不同状态之间的流转
- Internal Transition:内部流转,同一个状态之间的流转
- Condition:条件,表示是否允许到达某个状态
- Action:动作,到达某个状态之后,可以做什么
- StateMachine:状态机
上述语义模型之间的实体关系模型如下:
- 一个状态机实例(StateMachine)可以包含多个状态(State)
- 每个状态可以包含多个状态转移(Transition)
- 每个状态转移(Transition)包含初始状态、目标状态、驱动事件(Event),以及状态转移条件(Condition)和命中转移条件之后的动作(Action)
理清上述语义模型的概念后,下面通过源码分析来探究cola状态机的内部实现;
1.cola状态机使用实例
在深入源码解析之前,首先通过一个具体的状态机实例了解状态机的使用方法,后面再逐步探究其执行过程:
public class StateMachineTest {
static String MACHINE_ID = "TestStateMachine";
static enum States {
STATE1,
STATE2,
STATE3,
STATE4
}
static enum Events {
EVENT1,
EVENT2,
EVENT3,
EVENT4,
INTERNAL_EVENT
}
static class Context {
String operator = "frank";
String entityId = "123465";
}
@Test
public void testExternalNormal() {
StateMachineBuilder<States, Events, Context> builder = StateMachineBuilderFactory.create();
builder.externalTransition()
.from(States.STATE1)
.to(States.STATE2)
.on(Events.EVENT1)
.when(checkCondition())
.perform(doAction());
StateMachine<States, Events, Context> stateMachine = builder.build(MACHINE_ID);
States target = stateMachine.fireEvent(States.STATE1, Events.EVENT1, new Context());
Assert.assertEquals(States.STATE2, target);
}
private Condition<Context> checkCondition() {
return new Condition<Context>() {
@Override
public boolean isSatisfied(Context context) {
System.out.println("Check condition : " + context);
return true;
}
};
}
private Action<States, Events, Context> doAction() {
return (from, to, event, ctx) -> {
System.out.println(
ctx.operator + " is operating " + ctx.entityId + " from:" + from + " to:" + to + " on:" + event);
};
}
}
上面首先通过Builder模式创建StateMachine实例,其中定义了STATE1到STATE2的转移,然后通过fireEvent触发状态转移,在满足Condition时,执行了具体的动作Action;
2.cola状态机源码解析
cola状态机的核心逻辑都是围绕着上述的语义模型展开的,下面首先看下语义模型的源码;
2.1 语义模型源码
2.1.1 Condition和Action接口
Condition接口:
/**
* Condition
*
* @author Frank Zhang
* @date 2020-02-07 2:50 PM
*/
public interface Condition<C> {
/**
* @param context context object
* @return whether the context satisfied current condition
*/
boolean isSatisfied(C context);
default String name(){
return this.getClass().getSimpleName();
}
}
Action接口:
/**
* Generic strategy interface used by a state machine to respond
* events by executing an {@code Action} with a {@link StateContext}.
*
* @author Frank Zhang
* @date 2020-02-07 2:51 PM
*/
public interface Action<S, E, C> {
// /**
// * Execute action with a {@link StateContext}.
// *
// * @param context the state context
// */
// void execute(StateContext<S, E> context);
public void execute(S from, S to, E event, C context);
}
Condition接口定义了isSatisfied方法评估是否命中条件;
Action接口通过方法execute执行具体的动作;
2.1.2 State
State接口定义如下:
/**
* State
*
* @param <S> the type of state
* @param <E> the type of event
*
* @author Frank Zhang
* @date 2020-02-07 2:12 PM
*/
public interface State<S,E,C> extends Visitable{
/**
* Gets the state identifier.
*
* @return the state identifiers
*/
S getId();
/**
* Add transition to the state
* @param event the event of the Transition
* @param target the target of the transition
* @return
*/
Transition<S,E,C> addTransition(E event, State<S, E, C> target, TransitionType transitionType);
List<Transition<S,E,C>> getEventTransitions(E event);
Collection<Transition<S,E,C>> getAllTransitions();
}
State接口实现:
/**
* StateImpl
*
* @author Frank Zhang
* @date 2020-02-07 11:19 PM
*/
public class StateImpl<S,E,C> implements State<S,E,C> {
protected final S stateId;
private EventTransitions eventTransitions = new EventTransitions();
StateImpl(S stateId){
this.stateId = stateId;
}
@Override
public Transition<S, E, C> addTransition(E event, State<S,E,C> target, TransitionType transitionType) {
Transition<S, E, C> newTransition = new TransitionImpl<>();
newTransition.setSource(this);
newTransition.setTarget(target);
newTransition.setEvent(event);
newTransition.setType(transitionType);
Debugger.debug("Begin to add new transition: "+ newTransition);
eventTransitions.put(event, newTransition);
return newTransition;
}
@Override
public List<Transition<S, E, C>> getEventTransitions(E event) {
return eventTransitions.get(event);
}
@Override
public Collection<Transition<S, E, C>> getAllTransitions() {
return eventTransitions.allTransitions();
}
@Override
public S getId() {
return stateId;
}
@Override
public String accept(Visitor visitor) {
String entry = visitor.visitOnEntry(this);
String exit = visitor.visitOnExit(this);
return entry + exit;
}
@Override
public boolean equals(Object anObject){
if(anObject instanceof State){
State other = (State)anObject;
if(this.stateId.equals(other.getId()))
return true;
}
return false;
}
@Override
public String toString(){
return stateId.toString();
}
}
可以看出State聚合了多个状态转移(Transition) ,
状态转移是通过EventTransitions类来进行封装管理的,源码如下:
/**
* EventTransitions
*
* 同一个Event可以触发多个Transitions,https://github.com/alibaba/COLA/pull/158
*
* @author Frank Zhang
* @date 2021-05-28 5:17 PM
*/
public class EventTransitions<S,E,C> {
private HashMap<E, List<Transition<S,E,C>>> eventTransitions;
public EventTransitions(){
eventTransitions = new HashMap<>();
}
public void put(E event, Transition<S, E, C> transition){
if(eventTransitions.get(event) == null){
List<Transition<S,E,C>> transitions = new ArrayList<>();
transitions.add(transition);
eventTransitions.put(event, transitions);
}
else{
List existingTransitions = eventTransitions.get(event);
verify(existingTransitions, transition);
existingTransitions.add(transition);
}
}
/**
* Per one source and target state, there is only one transition is allowed
* @param existingTransitions
* @param newTransition
*/
private void verify(List<Transition<S,E,C>> existingTransitions, Transition<S,E,C> newTransition) {
for (Transition transition : existingTransitions) {
if (transition.equals(newTransition)) {
throw new StateMachineException(transition + " already Exist, you can not add another one");
}
}
}
public List<Transition<S,E,C>> get(E event){
return eventTransitions.get(event);
}
public List<Transition<S,E,C>> allTransitions(){
List<Transition<S,E,C>> allTransitions = new ArrayList<>();
for(List<Transition<S,E,C>> transitions : eventTransitions.values()){
allTransitions.addAll(transitions);
}
return allTransitions;
}
}
2.1.3 Transition接口
接口定义如下:
/**
* {@code Transition} is something what a state machine associates with a state
* changes.
*
* @author Frank Zhang
*
* @param <S> the type of state
* @param <E> the type of event
* @param <C> the type of user defined context, which is used to hold application data
*
* @date 2020-02-07 2:20 PM
*/
public interface Transition<S, E, C>{
/**
* Gets the source state of this transition.
*
* @return the source state
*/
State<S,E,C> getSource();
void setSource(State<S, E, C> state);
E getEvent();
void setEvent(E event);
void setType(TransitionType type);
/**
* Gets the target state of this transition.
*
* @return the target state
*/
State<S,E,C> getTarget();
void setTarget(State<S, E, C> state);
/**
* Gets the guard of this transition.
*
* @return the guard
*/
Condition<C> getCondition();
void setCondition(Condition<C> condition);
Action<S,E,C> getAction();
void setAction(Action<S, E, C> action);
/**
* Do transition from source state to target state.
*
* @return the target state
*/
State<S, E, C> transit(C ctx, boolean checkCondition);
/**
* Verify transition correctness
*/
void verify();
}
具体实现如下:
/**
* TransitionImpl。
*
* This should be designed to be immutable, so that there is no thread-safe risk
*
* @author Frank Zhang
* @date 2020-02-07 10:32 PM
*/
public class TransitionImpl<S,E,C> implements Transition<S,E,C> {
private State<S, E, C> source;
private State<S, E, C> target;
private E event;
private Condition<C> condition;
private Action<S,E,C> action;
private TransitionType type = TransitionType.EXTERNAL;
@Override
public State<S, E, C> getSource() {
return source;
}
@Override
public void setSource(State<S, E, C> state) {
this.source = state;
}
@Override
public E getEvent() {
return this.event;
}
@Override
public void setEvent(E event) {
this.event = event;
}
@Override
public void setType(TransitionType type) {
this.type = type;
}
@Override
public State<S, E, C> getTarget() {
return this.target;
}
@Override
public void setTarget(State<S, E, C> target) {
this.target = target;
}
@Override
public Condition<C> getCondition() {
return this.condition;
}
@Override
public void setCondition(Condition<C> condition) {
this.condition = condition;
}
@Override
public Action<S, E, C> getAction() {
return this.action;
}
@Override
public void setAction(Action<S, E, C> action) {
this.action = action;
}
@Override
public State<S, E, C> transit(C ctx, boolean checkCondition) {
Debugger.debug("Do transition: "+this);
this.verify();
if (!checkCondition || condition == null || condition.isSatisfied(ctx)) {
if(action != null){
action.execute(source.getId(), target.getId(), event, ctx);
}
return target;
}
Debugger.debug("Condition is not satisfied, stay at the "+source+" state ");
return source;
}
@Override
public final String toString() {
return source + "-[" + event.toString() +", "+type+"]->" + target;
}
@Override
public boolean equals(Object anObject){
if(anObject instanceof Transition){
Transition other = (Transition)anObject;
if(this.event.equals(other.getEvent())
&& this.source.equals(other.getSource())
&& this.target.equals(other.getTarget())){
return true;
}
}
return false;
}
@Override
public void verify() {
if(type== TransitionType.INTERNAL && source != target) {
throw new StateMachineException(String.format("Internal transition source state '%s' " +
"and target state '%s' must be same.", source, target));
}
}
}
可以看出,Transition聚合了源State、目标State、Event、Condition、Action;
在上述方法transit中,Transition在满足条件Condition的条件下,执行了具体动作Action,并且返回目标State;
2.1.4 StateMachine接口
/**
* StateMachine
*
* @author Frank Zhang
*
* @param <S> the type of state
* @param <E> the type of event
* @param <C> the user defined context
* @date 2020-02-07 2:13 PM
*/
public interface StateMachine<S, E, C> extends Visitable{
/**
* Verify if an event {@code E} can be fired from current state {@code S}
* @param sourceStateId
* @param event
* @return
*/
boolean verify(S sourceStateId,E event);
/**
* Send an event {@code E} to the state machine.
*
* @param sourceState the source state
* @param event the event to send
* @param ctx the user defined context
* @return the target state
*/
S fireEvent(S sourceState, E event, C ctx);
/**
* MachineId is the identifier for a State Machine
* @return
*/
String getMachineId();
/**
* Use visitor pattern to display the structure of the state machine
*/
void showStateMachine();
String generatePlantUML();
}
状态机具体实现:
/**
* For performance consideration,
* The state machine is made "stateless" on purpose.
* Once it's built, it can be shared by multi-thread
* <p>
* One side effect is since the state machine is stateless, we can not get current state from State Machine.
*
* @author Frank Zhang
* @date 2020-02-07 5:40 PM
*/
public class StateMachineImpl<S, E, C> implements StateMachine<S, E, C> {
private String machineId;
private final Map<S, State<S, E, C>> stateMap;
private boolean ready;
private FailCallback<S, E, C> failCallback;
public StateMachineImpl(Map<S, State<S, E, C>> stateMap) {
this.stateMap = stateMap;
}
@Override
public boolean verify(S sourceStateId, E event) {
isReady();
State sourceState = getState(sourceStateId);
List<Transition<S, E, C>> transitions = sourceState.getEventTransitions(event);
return transitions != null && transitions.size() != 0;
}
@Override
public S fireEvent(S sourceStateId, E event, C ctx) {
isReady();
Transition<S, E, C> transition = routeTransition(sourceStateId, event, ctx);
if (transition == null) {
Debugger.debug("There is no Transition for " + event);
failCallback.onFail(sourceStateId, event, ctx);
return sourceStateId;
}
return transition.transit(ctx, false).getId();
}
private Transition<S, E, C> routeTransition(S sourceStateId, E event, C ctx) {
State sourceState = getState(sourceStateId);
List<Transition<S, E, C>> transitions = sourceState.getEventTransitions(event);
if (transitions == null || transitions.size() == 0) {
return null;
}
Transition<S, E, C> transit = null;
for (Transition<S, E, C> transition : transitions) {
if (transition.getCondition() == null) {
transit = transition;
} else if (transition.getCondition().isSatisfied(ctx)) {
transit = transition;
break;
}
}
return transit;
}
private State getState(S currentStateId) {
State state = StateHelper.getState(stateMap, currentStateId);
if (state == null) {
showStateMachine();
throw new StateMachineException(currentStateId + " is not found, please check state machine");
}
return state;
}
private void isReady() {
if (!ready) {
throw new StateMachineException("State machine is not built yet, can not work");
}
}
@Override
public String accept(Visitor visitor) {
StringBuilder sb = new StringBuilder();
sb.append(visitor.visitOnEntry(this));
for (State state : stateMap.values()) {
sb.append(state.accept(visitor));
}
sb.append(visitor.visitOnExit(this));
return sb.toString();
}
@Override
public void showStateMachine() {
SysOutVisitor sysOutVisitor = new SysOutVisitor();
accept(sysOutVisitor);
}
@Override
public String generatePlantUML() {
PlantUMLVisitor plantUMLVisitor = new PlantUMLVisitor();
return accept(plantUMLVisitor);
}
@Override
public String getMachineId() {
return machineId;
}
public void setMachineId(String machineId) {
this.machineId = machineId;
}
public void setReady(boolean ready) {
this.ready = ready;
}
public void setFailCallback(FailCallback<S, E, C> failCallback) {
this.failCallback = failCallback;
}
}
状态机方法fireEvent的执行逻辑如下:
-
通过routeTransition方法,获取目标转移Transition(只会返回一个)
-
执行目标转移Transition的转移方法transit,并返回目标State
在方法routeTransition中:
- 首先根据源State和Event获取关联的转移列表
- 遍历转移列表,返回命中转移条件Condition的转移
2.2 Builder模式
上述cola状态机语义模型的源码就解析完成了,在实际使用时,需要将语义模型进行组合构造编排,这里是通过Builder模式来完成的,下面进行展开说明;
2.2.1 StateMachine Builder模式
StateMachineBuilder顶层接口定义如下:
/**
* StateMachineBuilder
*
* @author Frank Zhang
* @date 2020-02-07 5:32 PM
*/
public interface StateMachineBuilder<S, E, C> {
/**
* Builder for one transition
*
* @return External transition builder
*/
ExternalTransitionBuilder<S, E, C> externalTransition();
/**
* Builder for multiple transitions
*
* @return External transition builder
*/
ExternalTransitionsBuilder<S, E, C> externalTransitions();
/**
* Start to build internal transition
*
* @return Internal transition builder
*/
InternalTransitionBuilder<S, E, C> internalTransition();
/**
* set up fail callback, default do nothing {@code NumbFailCallbackImpl}
*
* @param callback
*/
void setFailCallback(FailCallback<S, E, C> callback);
StateMachine<S, E, C> build(String machineId);
}
具体实现如下:
/**
* StateMachineBuilderImpl
*
* @author Frank Zhang
* @date 2020-02-07 9:40 PM
*/
public class StateMachineBuilderImpl<S, E, C> implements StateMachineBuilder<S, E, C> {
/**
* StateMap is the same with stateMachine, as the core of state machine is holding reference to states.
*/
private final Map<S, State<S, E, C>> stateMap = new ConcurrentHashMap<>();
private final StateMachineImpl<S, E, C> stateMachine = new StateMachineImpl<>(stateMap);
private FailCallback<S, E, C> failCallback = new NumbFailCallback<>();
@Override
public ExternalTransitionBuilder<S, E, C> externalTransition() {
return new TransitionBuilderImpl<>(stateMap, TransitionType.EXTERNAL);
}
@Override
public ExternalTransitionsBuilder<S, E, C> externalTransitions() {
return new TransitionsBuilderImpl<>(stateMap, TransitionType.EXTERNAL);
}
@Override
public InternalTransitionBuilder<S, E, C> internalTransition() {
return new TransitionBuilderImpl<>(stateMap, TransitionType.INTERNAL);
}
@Override
public void setFailCallback(FailCallback<S, E, C> callback) {
this.failCallback = callback;
}
@Override
public StateMachine<S, E, C> build(String machineId) {
stateMachine.setMachineId(machineId);
stateMachine.setReady(true);
stateMachine.setFailCallback(failCallback);
StateMachineFactory.register(stateMachine);
return stateMachine;
}
}
StateMachineBuilder定义中:
- 通过build方法完成状态机实例的创建
- 通过externalTransition方法完成外部转移的构造
- 通过internalTransition方法完成内部转移的构造
在build方法中,构造StateMachine实例,并注册到StateMachineFactory中 ,然后返回StateMachine实例,完成构建;
对于转移Transition的构造,下面以externalTransition返回的ExternalTransitionBuilder为例进行具体分析,内部转移同理不再展开;
2.2.2 ExternalTransitionBuilder---转移构造Builder模式
2.2.2.1 链式调用顺序保障
在上述状态机使用实例中,是通过链式编程来完成外部转移构造的,如下:
这里通过接口约束,限制了:
- externalTransition方法之后只能调用from方法
- from方法之后只能调用to方法等调用顺序
- ......依次类推
内部原理是:
- externalTransition方法返回了ExternalTransitionBuilder接口,而ExternalTransitionBuilder接口只定义了from方法,并且返回From接口
- From接口中又只定义了to方法,并且返回To接口
- ......依次类推
相关接口定义说明如下:
/**
* ExternalTransitionBuilder
*
* @author Frank Zhang
* @date 2020-02-07 6:11 PM
*/
public interface ExternalTransitionBuilder<S, E, C> {
/**
* Build transition source state.
* @param stateId id of state
* @return from clause builder
*/
From<S, E, C> from(S stateId);
}
/**
* From
*
* @author Frank Zhang
* @date 2020-02-07 6:13 PM
*/
public interface From<S, E, C> {
/**
* Build transition target state and return to clause builder
* @param stateId id of state
* @return To clause builder
*/
To<S, E, C> to(S stateId);
}
/**
* To
*
* @author Frank Zhang
* @date 2020-02-07 6:14 PM
*/
public interface To<S, E, C> {
/**
* Build transition event
* @param event transition event
* @return On clause builder
*/
On<S, E, C> on(E event);
}
public interface On<S, E, C> extends When<S, E, C>{
/**
* Add condition for the transition
* @param condition transition condition
* @return When clause builder
*/
When<S, E, C> when(Condition<C> condition);
}
public interface When<S, E, C>{
/**
* Define action to be performed during transition
*
* @param action performed action
*/
void perform(Action<S, E, C> action);
}
通过上述严格的顺序调用,保证了Transition构造的正确性和易读性;
2.2.2.2 链式调用具体实现
下面看一下上述链式调用相关接口的具体实现逻辑:
ExternalTransitionBuilder的具体实现类如下:
/**
* TransitionBuilderImpl
*
* @author Frank Zhang
* @date 2020-02-07 10:20 PM
*/
class TransitionBuilderImpl<S,E,C> extends AbstractTransitionBuilder<S,E,C> implements ExternalTransitionBuilder<S,E,C>, InternalTransitionBuilder<S,E,C> {
private State<S, E, C> source;
private Transition<S, E, C> transition;
public TransitionBuilderImpl(Map<S, State<S, E, C>> stateMap, TransitionType transitionType) {
super(stateMap, transitionType);
}
@Override
public From<S, E, C> from(S stateId) {
source = StateHelper.getState(stateMap, stateId);
return this;
}
@Override
public To<S, E, C> within(S stateId) {
source = target = StateHelper.getState(stateMap, stateId);
return this;
}
@Override
public When<S, E, C> when(Condition<C> condition) {
transition.setCondition(condition);
return this;
}
@Override
public On<S, E, C> on(E event) {
transition = source.addTransition(event, target, transitionType);
return this;
}
@Override
public void perform(Action<S, E, C> action) {
transition.setAction(action);
}
}
其中AbstractTransitionBuilder抽象类具体实现如下,实现了接口From、To、On:
abstract class AbstractTransitionBuilder<S,E,C> implements From<S,E,C>,On<S,E,C>,To<S,E,C>{
final Map<S, State<S, E, C>> stateMap;
protected State<S, E, C> target;
final TransitionType transitionType;
public AbstractTransitionBuilder(Map<S, State<S, E, C>> stateMap, TransitionType transitionType) {
this.stateMap = stateMap;
this.transitionType = transitionType;
}
@Override
public To<S, E, C> to(S stateId) {
target = StateHelper.getState(stateMap, stateId);
return this;
}
}
上述用到的StateHelper帮助类说明如下:
public class StateHelper {
public static <S, E, C> State<S, E, C> getState(Map<S, State<S, E, C>> stateMap, S stateId){
State<S, E, C> state = stateMap.get(stateId);
if (state == null) {
state = new StateImpl<>(stateId);
stateMap.put(stateId, state);
}
return state;
}
}
下面对链式调用执行过程具体说明如下:
- from方法实现中,通过StateHelper帮助类完成源State的创建;
- to方法实现中,同样通过StateHelper帮助类完成目标State的创建;
- on方法实现中,在源State中添加新的转移;
- when方法实现中,对构造的转移添加转移条件Condition;
- perform方法实现中,对构造的转移添加具体动作Action;
最后通过状态机build方法,完成状态机的构造并注入所有State列表;
至此,状态机实例、状态机包含的状态、状态关联的所有转移都构造完毕,后续就可以通过状态机的触发方法fireEvent完成状态转移了。