低代码平台后端搭建-阶段完结
前言
最近又要开始为跳槽做准备了,发现还是写博客学的效率高点,在总结其他技术栈之前准备先把这个专题小完结一波。在这一篇中我又试着添加了一些实际项目中可能会用到的功能点,用来验证这个平台的扩展性,以及总结一些学过的知识。在这一篇中会增加如下功能点:增加Python执行组件、支持断点调试组件流、展示每个组件的详细运行信息。
Python组件
实现过程
在实际的应用中,有些复杂的需求可能没办法用现有的组件去实现,比如希望对组件A的结果进行函数计算、数据格式转换等,此时可以考虑引入一个Python组件,在这个组件的入参中直接写Python代码进行需要的操作。具体代码用gpt即可搞定,示例如下:
lowcode.application.properties——修改
python.interpreter.path=/Library/Frameworks/Python.framework/Versions/3.10/bin/python3.10
在配置文件中引入当前机器的Python环境的位置
com.example.lowcode.util.PythonUtil——新增
然后创建一个类用于解析Python代码
package com.example.lowcode.util;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Service
public class PythonUtil {
@Value("${python.interpreter.path}")
private String pythonInterpreterPath;
public String executePythonCode(String code, Map<String,Object> params) throws IOException, InterruptedException {
String fullCode = buildFullPythonCode(code, params);
ProcessBuilder processBuilder = new ProcessBuilder(pythonInterpreterPath, "-c", fullCode);
Process process = processBuilder.start();
// Handle the process's output stream (Python's stdout)
String output = readFromStream(process.getInputStream());
// Handle the process's error stream (Python's stderr)
String errorOutput = readFromStream(process.getErrorStream());
boolean finished = process.waitFor(30, TimeUnit.SECONDS);
if (!finished) {
throw new RuntimeException("Python process did not finish within the timeout period.");
}
if (process.exitValue() != 0) {
throw new RuntimeException("Python execution error: " + errorOutput);
}
return output.replaceAll("\\n$", "");
}
private String buildFullPythonCode(String code, Map<String, Object> params) {
// 构建参数传递的代码
StringBuilder arguments = new StringBuilder();
for (Map.Entry<String, Object> entry : params.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String) {
// 字符串参数需要加引号
arguments.append(String.format("%s = '%s'", key, value));
} else {
// 非字符串参数直接转换为字符串
arguments.append(String.format("%s = %s", key, value));
}
// 在参数之间添加换行符
arguments.append(System.lineSeparator());
}
return arguments + code;
}
private String readFromStream(InputStream inputStream) throws IOException {
StringBuilder output = new StringBuilder();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String line;
while ((line = reader.readLine()) != null) {
output.append(line).append(System.lineSeparator());
}
}
return output.toString();
}
}
com.example.lowcode.component.PythonScript——新增
最后再创建Python组件即可
package com.example.lowcode.component;
import com.example.lowcode.core.dto.ComponentInfo;
import com.example.lowcode.core.framework.AbstractComponent;
import com.example.lowcode.core.framework.ComponentContext;
import com.example.lowcode.core.model.*;
import com.example.lowcode.util.PythonUtil;
import com.google.common.collect.Maps;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.HashMap;
import java.util.Map;
@ComponentDefinition(name = "PythonScript", type = ComponentTypeEnum.SERVICE_CALL, desc = "python组件")
@InputParamDefinition({
@Param(name = "code", desc = "python函数模板", type = ParamTypeEnum.STRING, required = true),
@Param(name = "params", desc = "函数入参", type = ParamTypeEnum.MAP, required = false)
})
@OutputParamDefinition({@Param(name = "result", desc = "http接口返回结果", required = true)})
public class PythonScript extends AbstractComponent {
@Autowired
private PythonUtil pythonUtil;
@Override
public Map<String, Object> execute(ComponentContext context, ComponentInfo componentInfo) throws Exception {
String code = (String) parseInputParam("code", context, componentInfo);
Map<String, Object> paramMap = (Map<String, Object>) parseInputParam("params", context, componentInfo);
String output = parseOutputParam("result",componentInfo);
HashMap<String, Object> result = Maps.newHashMap();
String pythonResult = pythonUtil.executePythonCode(code,paramMap);
result.put(output, pythonResult);
return result;
}
}
测试
单独测试这个组件:
@Test
public void testPythonScript() {
try {
// 调用方法并打印结果
ComponentInfo componentInfo = new ComponentInfo();
componentInfo.setInputs(new HashMap<>() {{
String s = """
def main(response: str, length: int) -> str:
import re
response = response[:length]
match = re.search(r'[ABCDEFGH]', response)
if match:
return match.group()
else:
return 'other result'
""";
String s1 = """
def main(content1, content2):
return content1 + "" + content2
""";
String mainDef = """
result = main(response, length)
print(result, end='')
""";
put("code", new ComponentParam().setName("code").setValue(s+mainDef));
HashMap<Object, Object> map = new HashMap<>();
// map.put("content1","Hello World!");
// map.put("content2","hehe");
map.put("response","Hello World!");
map.put("length",20);
put("params", new ComponentParam().setName("params").setValue(map));
}});
componentInfo.setOutputs(new HashMap<>() {{
put("result", new ComponentParam().setName("result").setValue("result"));
}});
Map<String, Object> execute = pythonScript.execute(new ComponentContext(), componentInfo);
System.out.println(execute);
} catch (Exception e) {
e.printStackTrace();
}
}
运行结果:
断点调试组件流
实现过程
在使用低代码平台编辑组件流时,可能会遇到后面几个组件执行有问题或执行很慢的情况,可以考虑增加断点执行的能力,制定中间的某个组件为结束节点。
如上图所示,比如在调试的时候不想调用HttpClient组件,那就可以把PageFilter组件指定为结束节点,最下面的组件因为入参不够也会不执行。
实现的思路很简单,因为之前2.0版本的代码会根据组件之间的线去解析关联关系,只需要找到新的结束节点依赖的所有节点,把他们放到执行引擎中,不被依赖的节点自然就被剪掉了。
com.example.lowcode.core.dto2.FlowEngineBuilder——修改
剪枝部分的代码:
public DagEngine<O> buildDebug(String instanceName) {
check();
DagEngine<O> engineWithOpConfig = getEngineWithOpConfig(flow, instanceName);
clear();
return engineWithOpConfig;
}
private DagEngine<O> getEngineWithOpConfig(Flow flow, String instanceName) {
DagEngine<O> engine = new DagEngine<>(executor);
List<OperatorWrapper<?, ?>> operatorWrappers = getWrappersWithOpConfig(flow, engine);
// 单节点执行逻辑,根据当前节点解析依赖节点
Set<String> dependNode = new HashSet<>();
resolveDependenciesForCut(flow, operatorWrappers, instanceName, dependNode);
// 遍历wrapperMap,保留debug节点的所有依赖节点
Map<String, OperatorWrapper<?, ?>> debugWrapperMap = new HashMap<>();
engine.getWrapperMap().forEach(
(k, v) -> {
if (dependNode.contains(k)) {
debugWrapperMap.put(k, v);
}
}
);
engine.setWrapperMap(debugWrapperMap);
return engine;
}
private void resolveDependenciesForCut(Flow flow, List<OperatorWrapper<?, ?>> operatorWrappers, String instanceName, Set<String> dependNode) {
final Map<String, OperatorWrapper<?, ?>> wrapperMap = operatorWrappers.stream()
.collect(Collectors.toMap(OperatorWrapper::getInstanceName, e -> e));
final Map<String, List<Edge>> groupBySource = flow.getEdgeInstances().stream().collect(Collectors.groupingBy(
Edge::getSourceName
));
groupBySource.forEach((id, followings) -> {
for (Edge following : followings) {
final OperatorWrapper<?, ?> targetOp = wrapperMap.get(following.getTargetName());
targetOp.depend(id);
}
});
Map<String, List<String>> sourceNameMap = new HashMap<>();
groupBySource.forEach(
(k, v) -> {
List<String> collect = v.stream().map(Edge::getTargetName).collect(Collectors.toList());
sourceNameMap.put(k, collect);
}
);
dependNode.add(instanceName);
// 查找当前节点的依赖节点
findDependNode(instanceName, dependNode, sourceNameMap);
}
private void findDependNode(String start, Set<String> dependNode, Map<String, List<String>> sourceNameMap) {
List<String> list = new ArrayList<>();
list.add(start);
while (!list.isEmpty()) {
String node = list.remove(0);
for (Map.Entry<String, List<String>> entry : sourceNameMap.entrySet()) {
if (entry.getValue().contains(node)) {
dependNode.add(entry.getKey());
list.add(entry.getKey());
}
}
}
}
另外指定新的结束节点需要一个组件标志,可以随意选择只要保证唯一即可,这里为了方便演示选择用nodeName(组件的自定义名称)作为组件标志,同时ComponentInfo类也需要加上private String instanceName;属性。
然后引擎类DagEngine也需要加上set方法。
public void setWrapperMap(Map<String, OperatorWrapper<?, ?>> wrapperMap){
this.wrapperMap = wrapperMap;
}
最后修改接口层,把构建引擎的方法替换为新建的:
测试
详细运行信息
实现过程
实现这个功能需要小改动一下架构,之前的上下文类是用于存放每个组件的变量->变量值,是整个流层面的对象;现在需要保存每个节点的运行信息,且因为是并行需要线程安全。这里我加上原开源框架的DagContext类,来保存每个组件的运行信息。
作为示例,本篇实现展示每个组件的组件名、输入输出、耗时、异常报错,经过分析,组件名、输入、异常报错可以直接从flowNode中获取,而输出和耗时需要在执行组件时添加。
com.example.lowcode.core.framework2.DagContext——新增
引擎上下文,和ComponentContext不同,后者是整个流用一个ComponentContext对象,这个类是用于记录多线程环境每个组件的执行过程。
package com.example.lowcode.core.framework2;
import com.example.lowcode.core.dto2.OperatorResult;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* DAG执行引擎上下文
* 上下文的生命周期是引擎执行期间,即从开始节点到结束节点之间
*/
public class DagContext<O> {
/**
* 保存每个节点返回的结果
* key: 节点id
* value: result
*/
private Map<String, OperatorResult> operatorResultMap = new ConcurrentHashMap<>();
private OperatorResult<O> output;
public void putOperatorResult(String wrapperId, OperatorResult<?> operatorResult) {
operatorResultMap.put(wrapperId, operatorResult);
}
public OperatorResult getOperatorResult(String wrapperId) {
return operatorResultMap.get(wrapperId);
}
public synchronized void setOutput(OperatorResult<O> endResult) {
this.output = endResult;
}
public OperatorResult<O> getOutput () {
return output;
}
public Map<String, OperatorResult> getOperatorResultMap() {
return operatorResultMap;
}
}
com.example.lowcode.core.framework2.DagContextHolder——新增
包装DagContext,线程安全
package com.example.lowcode.core.framework2;
import com.alibaba.ttl.TransmittableThreadLocal;
import com.example.lowcode.core.dto2.OperatorResult;
/**
* 获取DagContext上下文的工具类
*/
public class DagContextHolder {
private static ThreadLocal<DagContext> holder = new TransmittableThreadLocal<>();
protected static void set(DagContext dagContext) {
holder.set(dagContext);
}
public static DagContext get() {
return holder.get();
}
protected static void remove() {
holder.remove();
}
public static void putOperatorResult(String instanceName, OperatorResult<?> operatorResult) {
holder.get().putOperatorResult(instanceName, operatorResult);
}
public static OperatorResult getOperatorResult(String instanceName) {
return holder.get().getOperatorResult(instanceName);
}
}
com.example.lowcode.core.framework2.DagEngine——修改
然后修改引擎类的代码
↑初始化DagContext
↑在getRunningTask方法中更新上下文的运行结果
↑最后在执行run组件的前后记录耗时,OperatorResult类也需要加上duration属性。这个实现方式很不好,在下文会修改这段实现。
com.example.lowcode.core.service.RunServiceImpl——修改
下一步是在实现类中新写一个接口,把组件的运行信息都取出来:
@Override
public Map<String, Map<String, Object>> runFlowDebug(long flowId, Map<String, ComponentInfo> inputParams, String instanceName) {
FlowSnapshot flowSnapshot = FlowSnapshotDO.selectByFlowId(flowId);
assert flowSnapshot != null;
Flow flow = JSON.parseObject(flowSnapshot.getJsonParam(), new TypeReference<>() {
});
DagEngine<Map<String, Object>> engine = new FlowEngineBuilder<Map<String, Object>>()
.setFlow(flow)
.setInputParams(inputParams)
.setExecutor(THREAD_POOL_EXECUTOR)
.buildDebug(instanceName);
engine.runAndWait();
// if (engine.getEx() != null) {
// throw new FlowExecutionException(String.format("【%s:%s】执行异常,原因:%s", flow.getId(), flow.getName(), engine.getEx().getMessage()), engine.getEx());
// }
Map<String, Map<String, Object>> flowResult = new HashMap<>();
// 遍历存放每个组件的信息
for(FlowNode node : flow.getNodeInstances()) {
Map<String, Object> flowInfo = Maps.newHashMap();
flowResult.put(node.getNodeName(), flowInfo);
flowInfo.put("nodeName", node.getNodeName());
flowInfo.put("componentName", node.getComponentName());
OperatorWrapper<?, ?> operatorWrapper = engine.getWrapperMap().get(node.getNodeName());
// 当且仅当node执行才设置详细信息
if(operatorWrapper == null || operatorWrapper.getOperatorResult() == null
|| Objects.equals(ResultState.DEFAULT, operatorWrapper.getOperatorResult().getResultState())) {
continue;
}
// 设置input信息
Map<String, Object> inputMap = Maps.newHashMap();
inputMap.putAll(node.getComponentInfo().getInputs());
flowInfo.put("input", inputMap);
// 设置output信息
OperatorResult operatorResult = engine.getDagContext().getOperatorResultMap().get(node.getNodeName());
flowInfo.put("output", operatorResult.getResult());
// 设置duration
flowInfo.put("duration", operatorWrapper.getOperatorResult().getDuration());
// 设置log信息
Map<String, Object> logMap = Maps.newHashMap();
if(operatorResult.getEx() != null) {
logMap.put("stderr", operatorResult.getEx().getStackTrace());
}
flowInfo.put("log", logMap);
}
return flowResult;
}
上面我把异常判断注释了,当发现组件流有异常时不再抛异常,而是返回结果。这里遍历的是flowNode,里面有写node可能没有被执行(断点执行)。
测试
修改测试代码
可以看到result中展示了每个组件的运行结果。
之后我手动造了一个异常,用于测试报错信息的展示↓
虽然在这里报错了,却没有执行到下面这块↓,也就没有记录异常信息。
原因是我之前写切面的时候把异常捕获了还没有抛出↓
把这块删掉后,再次运行即可看到报错信息:
补充
上面记录耗时的写法并不好,如果未来要进行一些复杂操作,会写的比较杂乱,不过目前来看其实不需要重构,可以当做参考看看。原开源代码中提供了两种思路:类似抽象类的execute方法,再抽象出start方法、success方法、error方法,分别对应组件的执行前、执行完成、执行异常,调用方式和执行的方法execute类似;另一个思路是用回调方法来实现。这里我用前者来扩展实现:
实现过程
com.example.lowcode.core.framework.ComponentInterface——修改
package com.example.lowcode.core.framework;
import com.example.lowcode.core.dto.ComponentInfo;
import com.example.lowcode.core.dto2.OperatorResult;
import com.example.lowcode.core.framework2.OperatorWrapper;
import java.util.Map;
/**
* @author llxzdmd
* @version IComponent.java, v 0.1 2024年01月02日 19:00 llxzdmd
*/
public interface ComponentInterface {
default Object defaultValue(ComponentContext context, ComponentInfo componentInfo) {
return null;
}
Map<String, Object> execute(ComponentContext context, ComponentInfo componentInfo) throws Exception;
default void onStart(ComponentContext context, ComponentInfo componentInfo){};
default void onSuccess(ComponentContext context, ComponentInfo componentInfo, OperatorResult<Object> result){};
default void onError(ComponentContext context, ComponentInfo componentInfo, OperatorResult<Object> result){};
}
首先在接口和抽象类中增加三个准备监听阶段的方法。这几个方法是组件去执行时的调用,可以在里面写额外逻辑,但目前需求不需要,直接置空即可。
com.example.lowcode.core.framework2.IOperator——修改
package com.example.lowcode.core.framework2;
import com.example.lowcode.core.dto2.OperatorResult;
import com.example.lowcode.core.framework.ComponentContext;
/**
* Operator接口
*
* @author llxzdmd
* @version IOperator.java, 2024年02月18日 16:13 llxzdmd
*/
public interface IOperator<P, V> {
/**
* 自定义OP的默认返回值
*/
default V defaultValue(P param) {
return null;
}
/**
* 该方法实现OP的具体处理逻辑
*/
V execute(P param, ComponentContext context) throws Exception;
void onStart(OperatorWrapper<?, ?> param, ComponentContext context);
void onSuccess(OperatorWrapper<?, ?> param, ComponentContext context, OperatorResult<Object> result);
void onError(OperatorWrapper<?, ?> param, ComponentContext context, OperatorResult<Object> result);
}
因为引擎在执行的过程中无法获取到组件对象去执行对应的方法,需要获取到封装的IOperator类,由这个类再去执行接口的方法,因此在此处也定义几个阶段。之后需要修改IOperator接口的实现类。
com.example.lowcode.core.framework2.DefaultInvokeMethodComponent——修改
package com.example.lowcode.core.framework2;
import com.example.lowcode.core.dto2.OperatorResult;
import com.example.lowcode.core.exception.FlowConfigException;
import com.example.lowcode.core.framework.AbstractComponent;
import com.example.lowcode.core.framework.ComponentContext;
import com.example.lowcode.core.framework.SpringUtil;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* @author llxzdmd
* @version DefaultInvokeMethodComponent.java, 2024年02月18日 19:26 llxzdmd
*/
@Component
public class DefaultInvokeMethodComponent implements IOperator<OperatorWrapper<?, ?>, Object> {
@Override
public Object defaultValue(OperatorWrapper<?, ?> param) {
return new Object();
}
@Override
public Object execute(OperatorWrapper<?, ?> param, ComponentContext context) throws Exception {
return invokeMethod(param, context);
}
@Override
public void onStart (OperatorWrapper<?, ?> param, ComponentContext context) {
invokeMethod(param, context, "onStart", null);
}
@Override
public void onSuccess (OperatorWrapper<?, ?> param, ComponentContext context, OperatorResult<Object> result) {
invokeMethod(param, context, "onSuccess", result);
}
@Override
public void onError (OperatorWrapper<?, ?> param, ComponentContext context, OperatorResult<Object> result) {
invokeMethod(param, context, "onError", result);
}
private Object invokeMethod(OperatorWrapper<?, ?> param, ComponentContext context) {
OpConfig opConfig = param.getOpConfig();
try {
Class<?> aClass = Class.forName(opConfig.getClassName());
AbstractComponent abstractComponent = (AbstractComponent) SpringUtil.getBean(aClass);
return abstractComponent.execute(context, opConfig.getComponentInfo());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void invokeMethod(OperatorWrapper<?, ?> param, ComponentContext context, String methodName, OperatorResult<Object> result){
OpConfig opConfig = param.getOpConfig();
try {
Class<?> aClass = Class.forName(opConfig.getClassName());
AbstractComponent abstractComponent = (AbstractComponent) SpringUtil.getBean(aClass);
switch (methodName) {
case "onStart" -> abstractComponent.onStart(context, opConfig.getComponentInfo());
case "onSuccess" -> abstractComponent.onSuccess(context, opConfig.getComponentInfo(), result);
case "onError" -> abstractComponent.onError(context, opConfig.getComponentInfo(), result);
default -> abstractComponent.defaultValue(context, opConfig.getComponentInfo());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
这里新写了一个多一个参数的invokeMethod方法,因为需要得到onSuccess和onError时的运行结果。invokeMethod方法也可以再抽象一层,用反射来执行对应的方法,尝试了一下由于需要获取到每个方法的入参类型,用枚举的话和上面的写法类似;否则需要再定义一个记录需要执行的方法的入参类型、入参值,再在此处解析,成本太大,就不继续抽象了。
com.example.lowcode.core.framework2.DagEngine——修改
在引擎类的对应位置让执行节点调用对应的方法,节点就会调用到组件的对应方法。
之后可以在切面中监听到组件执行这几个方法的动作,进行相应的处理。
之后再把这两行注释掉,就准备就绪可以测试了。
测试
效果符合预期
↑把之前制造的bug去掉,正常运行
总结
需要博客源码可私信免费获取,看到就会回复。