bamboo-pipeline 是蓝鲸智云旗下saas标准运维的流程编排引擎。其具备以下特点:
多种流程模式 :支持串行、并行,支持子流程,可以根据全局参数自动选择分支执行,节点失败处理机制可配置。参数引擎 :支持参数共享,支持参数替换。可交互的任务执行 :任务执行中可以随时暂停、继续、撤销,节点失败后可以重试、跳过。1.准备
开始之前,你要确保python和pip已经成功安装在电脑上,如果没有,可以访问这篇文章:超详细python安装指南 进行安装。
(可选1) 如果你用python的目的是数据分析,可以直接安装anaconda:python数据分析与挖掘好帮手—anaconda,它内置了python和pip.
(可选2) 此外,推荐大家用vscode编辑器,它有许多的优点:python 编程的最好搭档—vscode 详细指南。
请选择以下任一种方式输入命令安装依赖 :
windows 环境 打开 cmd (开始-运行-cmd)。macos 环境 打开 terminal (command+空格输入terminal)。如果你用的是 vscode编辑器 或 pycharm,可以直接使用界面下方的terminal.pip install bamboo-enginepip install bamboo-pipelinepip install djangopip install celery2. 项目初始化
(选项一:无django项目) 如果你没有任何的现成django项目,请按下面的流程初始化
由于 ** bamboo-pipeline ** 运行时基于 django 实现,所以需要新建一个 django 项目:
django-admin startproject easy_pipelinecd easy_pipeline在 ** easy_pipeline.settings.py ** 下添加如下配置:
from pipeline.eri.celery.queues import *from celery import celeryapp = celery(proj)app.config_from_object(django.conf:settings)installed_apps = [ ... pipeline, pipeline.engine, pipeline.component_framework, pipeline.eri, ...]在 ** easy_pipeline **目录下初始化数据库:
python manage.py migrate(选项二:有django项目需要使用流程引擎) 如果你有现成的pipeline项目需要使用此流程引擎,请在项目的** settings.py **下添加如下配置:
from pipeline.eri.celery.queues import *from celery import celeryapp = celery(proj)app.config_from_object(django.conf:settings)installed_apps = [ ... pipeline, pipeline.engine, pipeline.component_framework, pipeline.eri, ...]然后重新执行migrate,生成pipeline相关的流程模型:
python manage.py migratemigrate 执行完毕后会如下图所示:
由于是在原有项目上使用流程引擎,可能会遇到一些版本不匹配的问题,如果遇到报错,请排查解决或到蓝鲸官网上进行询问。
3. 简单的流程例子
首先在项目目录下启动 celery worker:
python manage.py celery worker -q er_execute,er_schedule --pool=solo -l info启动成功类似下图所示:
(注意) 如果你是在你的原有django项目上做改造,它并不一定能够顺利地启动成功,这是因为pipeline使用了 django 2.2.24,会存在许多版本不兼容的情况。如果遇到报错,请排查解决或到蓝鲸官网上进行询问。
在下面的例子中,我们将会创建并执行一个简单的流程:
3.1 创建流程app在 bamboo_pipeline 中,一个流程由多个组件组成,官方推荐使用app统一管控组件:
python manage.py create_plugins_app big_calculator该命令会在 django 工程根目录下生成拥有以下目录结构的 app:
big_calculator├── __init__.py├── components│ ├── __init__.py│ └── collections│ ├── __init__.py│ └── plugins.py├── migrations│ └── __init__.py└── static └── big_calculator └── plugins.js别忘了把新创建的这个插件添加到 django 配置的 **installed_apps **中:
installed_apps = ( ... 'big_calculator', ...)3.2 编写流程的service原子组件服务 ** service是组件的核心,service ** 定义了组件被调用时执行的逻辑,下面让我们实现一个计算传入的参数n的阶乘,并把结果写到输出中的 ** service ** ,在 **big_calculator/components/collections/plugins.py ** 中输入以下代码:
import mathfrom pipeline.core.flow.activity import serviceclass factorialcalculateservice(service): def execute(self, data, parent_data): 组件被调用时的执行逻辑 :param data: 当前节点的数据对象 :param parent_data: 该节点所属流程的数据对象 :return: n = data.get_one_of_inputs('n') if not isinstance(n, int): data.outputs.ex_data = 'n must be a integer!' return false data.outputs.factorial_of_n = math.factorial(n) return true def inputs_format(self): 组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。 :return:必须返回一个 inputitem 的数组,返回的这些信息能够用于确认该组件需要获取什么样的输入数据。 return [ service.inputitem(name='integer n', key='n', type='int', required=true) ] def outputs_format(self): 组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明 :return: 必须返回一个 outputitem 的数组, 便于在流程上下文或后续节点中进行引用 return [ service.outputitem(name='factorial of n', key='factorial_of_n', type='int') ]首先我们继承了 ** service基类,并实现了execute() ** 和 **outputs_format() ** 这两个方法,他们的作用如下:
execute :组件被调用时执行的逻辑。接收 data 和 parent_data 两个参数。
其中,data 是当前节点的数据对象,这个数据对象存储了用户传递给当前节点的参数的值以及当前节点输出的值。parent_data 则是该节点所属流程的数据对象,通常会将一些全局使用的常量存储在该对象中,如当前流程的执行者、流程的开始时间等。outputs_format :组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明。这个方法必须返回一个 outputitem 的数组,返回的这些信息能够用于确认某个组件在执行成功时输出的数据,便于在流程上下文或后续节点中进行引用。inputs_format :组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。这个方法必须返回一个 inputitem 的数组,返回的这些信息能够用于确认某个组件需要获取什么样的输入数据。下面我们来看一下 **execute() **方法内部执行的逻辑,首先我们尝试从当前节点数据对象的输出中获取输入参数n,如果获取到的参数不是一个 int 实例,那么我们会将异常信息写入到当前节点输出的 **ex_data **字段中, 这个字段是引擎内部的保留字段,节点执行失败时产生的异常信息都应该写入到该字段中 。随后我们返回 ** false ** 代表组件本次执行失败,随后节点会进入失败状态:
n = data.get_one_of_inputs('n')if not isinstance(n, int): data.outputs.ex_data = 'n must be a integer!' return false若获取到的 n 是一个正常的 int,我们就调用 **math.factorial() **函数来计算 n 的阶乘,计算完成后,我们会将结果写入到输出的 factorial_of_n 字段中,以供流程中的其他节点使用:
data.outputs.factorial_of_n = math.factorial(n)return true3.3 编写流程组件,绑定service原子完成 service 的编写后,我们需要将其与一个 component 绑定起来,才能够注册到组件库中,在**big_calculatorcomponents__init__.py **文件下添加如下的代码:
import loggingfrom pipeline.component_framework.component import componentfrom big_calculator.components.collections.plugins import factorialcalculateservicelogger = logging.getlogger('celery')class factorialcalculatecomponent(component): name = 'factorialcalculatecomponent' code = 'fac_cal_comp' bound_service = factorialcalculateservice我们定义了一个继承自基类 **component **的类 factorialcalculatecomponent ,他拥有以下属性:
1.name:组件名。
2.code:组件代码,这个代码必须是全局唯一的。
3.bound_service:与该组件绑定的 service。
这样一来,我们就完成了一个流程原子的开发。
3.4 生成流程,测试刚编写的组件在 big_calculatortest.py 写入以下内容,生成一个流程,测试刚刚编写的组件:
# python 实用宝典# 2021/06/20import timefrom bamboo_engine.builder import *from big_calculator.components import factorialcalculatecomponentfrom pipeline.eri.runtime import bamboodjangoruntimefrom bamboo_engine import apifrom bamboo_engine import builderdef bamboo_playground(): 测试流程引擎 # 使用 builder 构造出流程描述结构 start = emptystartevent() # 这里使用 我们刚创建好的n阶乘组件 act = serviceactivity(component_code=factorialcalculatecomponent.code) # 传入参数 act.component.inputs.n = var(type=var.plain, value=4) end = emptyendevent() start.extend(act).extend(end) pipeline = builder.build_tree(start) api.run_pipeline(runtime=bamboodjangoruntime(), pipeline=pipeline) # 等待 1s 后获取流程执行结果 time.sleep(1) result = api.get_execution_data_outputs(bamboodjangoruntime(), act.id).data print(result)随后,在命令行输入:
python manage.py shell打开 django console, 输入以下命令,执行此流程:
from big_calculator.test import bamboo_playgroundbamboo_playground()流程运行完后,获取节点的执行结果,可以看到,该节点输出了 factorial_of_n,并且值为 24(4 * 3 * 2 *1),这正是我们需要的效果:
{'_loop': 0, '_result': true, 'factorial_of_n': 24}恭喜你,你已经成功的创建了一个流程并把它运行起来了!在这期间你可能会遇到不少的坑,建议尝试先自行解决,如果实在无法解决,可以前往 标准运维 仓库提 issues,或者前往蓝鲸智云官网提问。
机械设备物联网可以有效提高生产效率,打造智慧工厂
智慧医疗的未来及发展趋势
流量计与液位开关有什么区别
米家电磁炉青春版高清图赏
基于MES的RFID数据采集你了解吗
Bamboo-pipeline:Python高效流程编排引擎
《长安十二时辰》中竟还内涵了工业互联网的内在逻辑!
基于数字存内处理芯片的GPU的大规模计算系统
英飞凌发布IMC300电机控制器系列 是对IMC100系列的补充
电磁感应方式的工作原理
对交流/直流电源而言分立式还是组合式控制器更好
工信部联合手机应用市场推“高品质APP”
压差旁通阀工作原理_压差旁通阀的作用
专注计算与连接,高通深耕IoT生态
快充桩与慢充桩的对比以及快充桩的发展 150-240kW直流输出是未来趋势
众泰t700怎么样?国产颜值最高SUV,配置高动力强悍,亮点比哈佛H9还要大!
智能手机屏幕素质排行榜公布
360手机N7Pro评测 堪称同价位最均衡的手机代表
苹果计划在2020年第三季度推出5G手机来夺得市场主导地位
TDA8425各引脚功能的电压参数资料