交叉熵优化算法,是一种启发式算法,能够用来求解复杂的非线性优化问题。本人的最近两篇论文(under review),都采用了交叉熵优化算法去生成控制轨迹作为“专家知识”,因此,为了仿真和测试的方便,开发了一个通用的交叉熵优化求解器,专门用于求解离散时间仿射非线性系统的MPC优化问题(模型预测控制),欢迎朋友们批评指正。
开发背景
我做的是非线性系统的最优控制问题,2022年上半年的时候写了3篇文章,都用到了MPC生成的轨迹作为专家知识来指导强化学习控制器的学习。在仿真的过程中,非线性系统的MPC求解问题成为了仿真需要解决的前提问题。在多种优化器的尝试下(包括商用优化器Gurobi),进行对比后发现,对于我这个问题来说,决定还是使用交叉熵优化算法来进行MPC优化问题的求解。2022年4,5,6月份写了两篇文章,都是使用交叉熵优化,但是没有好好整理一下代码,最近有点空闲,适当整理。
与现有求解器的对比(Gurobi)
和gurobi进行对比过:
- gurobi建模的时候有些函数没法建模,例如arctan,一般情况下,都是将arctan近似线性化来求解。本人还有一篇文章(under review)是通过gurobi来求解MPC问题的,并生成轨迹,gurobi语法简单,文档丰富,可以说是用起来比较方便的一个优化器了。
- gurobi计算的速度有点慢,背后的算法我只能通过官网上有限的介绍得到,好像是局部线性化然后将非线性优化问题转化为线性优化问题,然后用牛顿法之类的方法去求解,这一个过程非常缓慢。
- 经过测试,交叉熵优化算法没有精度保证(启发式算法的特点,都没有最优性保证的),但是优化解和Gurobi相差不大,对于某些问题,还能获得比Gurobi更优的解(Gurobi求解时陷入了局部最优)。
交叉熵优化算法的简介
后续等论文发表了可以放我的论文链接。
其余参考文献多的很,google scholar搜索一下即可大致了解CEOM的原理了。
代码实现
CUDA版本
尝试过numba,但是感觉不够底层,并且很多numpy的函数都不支持,所以没搞出来(希望有会的大佬能搞一下,我近期也要继续研究一下CUDA版本的实现)
用C++直接写CUDA的话我又不会
多进程版本
相对CUDA来说,多进程版本就简单的多了,再次注明一下:本程序本身开发的目的是为了求解多个初始状态的MPC问题,例如你有40000个点的MPC问题想要求解,本工具可以大幅提升速度,当然,求一个MPC问题也是可以的,简单封装了一下,后续结合字符串匹配等方式希望将求解器做到全自动化,现在定义目标函数这个过程还是需要手动定义,所以算是半自动吧。
什么是全自动:对于仿射非线性系统来说,输入fx表达式(字符串形式),gx表达式(字符串形式),MPC的控制时域N,初始状态集合,定义并行的进程数,程序直接输出所有的轨迹
主程序调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| fx = ['-0.3*x2*cosx1', '1.01x2+0.2*sin(x1^2)']
gx = ['1, 0', '0, 1']
def generate_cost_func(): """ 功能:生成优化问题,返回的是一个lambdify函数 输入: 1. 初始状态点 输出: 1. 构建的数值表达式:obj_f """ N = 10 state_dim = 2 action_dim = 2 action_list = [symengine.Symbol("a{}".format(i)) for i in range(action_dim * N)] initial_state_list = [symengine.Symbol("x{}".format(i)) for i in range(state_dim)] state1_list = [] state2_list = [] state1_list.append(initial_state_list[0]) state2_list.append(initial_state_list[1]) obj = 0
for i in range(N): state1_list.append( -0.3 * state2_list[i] * symengine.cos(state1_list[i]) + action_list[action_dim * i] ) state2_list.append( 1.01 * state2_list[i] + 0.2 * symengine.sin(state1_list[i] ** 2) + action_list[action_dim * i + 1] ) obj += state1_list[i] ** 2 + state2_list[i] ** 2 + action_list[i] ** 2 obj += state1_list[-1] ** 2 + state2_list[-1] ** 2
cost_func = symengine.lambdify( action_list + initial_state_list, [obj], dtype=np.float64, backend="llvm" ) return cost_func cost_func = generate_cost_func()
N = 10
x_0 = np.arange(-2.0, 2.0, 0.02) x_1 = np.arange(-2.0, 2.0, 0.02) xx_0, xx_1 = np.meshgrid(x_0, x_1) initial_states = np.transpose(np.array([xx_0.ravel(), xx_1.ravel()]))
process = os.cpu_count()
mpc_solver(cost_func, N, initial_states)
|
计算单步的损失
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| def compute_loss(initial_state: np.array, action: np.array): """ 功能:给定一个初始状态,一串动作轨迹,计算状态轨迹和损失 输入: 1. 初始状态 2. 动作轨迹 输出: 1. 状态轨迹 2. 损失函数目标值 """ steps = N state_trajectory = initial_state[np.newaxis, :] for i in range(int(steps)): state1_temp = ( -0.3 * state_trajectory[i][1] * np.cos(state_trajectory[i][0]) + action[2 * i] ) state2_temp = ( 1.01 * state_trajectory[i][1] + 0.2 * np.sin(state_trajectory[i][0] ** 2) + action[2 * i + 1] ) state_trajectory = np.append( state_trajectory, np.array([[state1_temp, state2_temp]]), axis=0 ) return state_trajectory, np.sum(action ** 2) + np.sum(state_trajectory ** 2)
|
CEOM算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| def cem_optimize( cost_func, sample_N: int, Nel: int, initial_state: np.array, means: list, stds: list, ): """ 功能:CEOM实现
输入: 1. 符号表达式 3. 采样个数:sample_N 4. 选取最好的个数:Nel 5. 初始状态 6. initial_mean 7. initial_std
输出: 1. mean_list也等于action_trajectory 2. std_list 3. state_tracejectory 5. running_time """ start_time = time.time() action_sampled = scipy.stats.norm(loc=means, scale=stds).rvs( size=(sample_N, action_dim * N) ) obj_values = cost_func( np.concatenate((action_sampled, np.tile(initial_state, (sample_N, 1))), axis=1) ) sort_index = np.argsort(obj_values) nel_sort_index = sort_index[0:Nel] means[:] = action_sampled[nel_sort_index].mean(axis=0) stds[:] = action_sampled[nel_sort_index].std(axis=0) state_trajectory, loss = compute_loss(initial_state, means) return state_trajectory, loss, time.time() - start_time
|
构建MPC求解器接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| def qp_solver(cost_func, initial_state): """ 功能:使用CEOM算法求解一次二次型优化问题
输入: 1. cost_func: 目标函数 3. initial_state: 初始状态
输出: 1. action: 2. state: 3. loss: """ pre_loss = 1.0 sample_N = 2000 Nel = 10 means = np.array([0 for i in range(action_dim * N)], dtype=np.float64) stds = np.array([1 for i in range(action_dim * N)], dtype=np.float64) for i in range(100): state_trajectory, loss, runtime = cem_optimize( cost_func, sample_N=sample_N, Nel=10, initial_state=initial_state, means=means, stds=stds, ) if np.abs(pre_loss - loss) < 1e-8: break pre_loss = loss return means[0:2], state_trajectory[1], loss
def mpc_solver(cost_func, initial_state, trajectory, log=True): """ 功能:使用qp_solver来求解MPC问题:一个MPC问题相当于使用qp_solver求解N次数,得到一传动作和轨迹
输入: 1. cost_func: 目标函数 2. initial_state: 初始状态 4. trajectory: 轨迹(嵌套的List),使用multiprocess定义的共享变量 5. 是否打印计算过程 输出: 1. 无输出,所有的轨迹被保存到共享变量trajectory中
""" trajectory_single = [] state = initial_state for step in range(N): action, next_state, obj = qp_solver(cost_func, initial_state=state) trajectory_single.append((state, action)) state = next_state trajectory.append(trajectory_single) if log: if len(trajectory) % 5 == 0: print("正在获取第{}个MPC最优解决,其损失为{}".format(len(trajectory), obj))
|
总的项目结构分为3个文件
1 2 3
| 1. test.py: 定义初始状态集合,定义函数 2. funcs.py: 定义action_dim, state_dim, N, generate_cost_func(), compute_loss() 3. cem_mpc_solver.py: CEOM-MPC求解器
|
具体的代码可以见:https://github.com/olixu/CEOM-MPC-Solver