0%

基于交叉熵优化算法的非线性系统的MPC求解器

交叉熵优化算法,是一种启发式算法,能够用来求解复杂的非线性优化问题。本人的最近两篇论文(under review),都采用了交叉熵优化算法去生成控制轨迹作为“专家知识”,因此,为了仿真和测试的方便,开发了一个通用的交叉熵优化求解器,专门用于求解离散时间仿射非线性系统的MPC优化问题(模型预测控制),欢迎朋友们批评指正。

开发背景

我做的是非线性系统的最优控制问题,2022年上半年的时候写了3篇文章,都用到了MPC生成的轨迹作为专家知识来指导强化学习控制器的学习。在仿真的过程中,非线性系统的MPC求解问题成为了仿真需要解决的前提问题。在多种优化器的尝试下(包括商用优化器Gurobi),进行对比后发现,对于我这个问题来说,决定还是使用交叉熵优化算法来进行MPC优化问题的求解。2022年4,5,6月份写了两篇文章,都是使用交叉熵优化,但是没有好好整理一下代码,最近有点空闲,适当整理。

与现有求解器的对比(Gurobi)

和gurobi进行对比过:

  1. gurobi建模的时候有些函数没法建模,例如arctan,一般情况下,都是将arctan近似线性化来求解。本人还有一篇文章(under review)是通过gurobi来求解MPC问题的,并生成轨迹,gurobi语法简单,文档丰富,可以说是用起来比较方便的一个优化器了。
  2. gurobi计算的速度有点慢,背后的算法我只能通过官网上有限的介绍得到,好像是局部线性化然后将非线性优化问题转化为线性优化问题,然后用牛顿法之类的方法去求解,这一个过程非常缓慢。
  3. 经过测试,交叉熵优化算法没有精度保证(启发式算法的特点,都没有最优性保证的),但是优化解和Gurobi相差不大,对于某些问题,还能获得比Gurobi更优的解(Gurobi求解时陷入了局部最优)。

交叉熵优化算法的简介

后续等论文发表了可以放我的论文链接。

其余参考文献多的很,google scholar搜索一下即可大致了解CEOM的原理了。

代码实现

CUDA版本

尝试过numba,但是感觉不够底层,并且很多numpy的函数都不支持,所以没搞出来(希望有会的大佬能搞一下,我近期也要继续研究一下CUDA版本的实现)

用C++直接写CUDA的话我又不会

多进程版本

相对CUDA来说,多进程版本就简单的多了,再次注明一下:本程序本身开发的目的是为了求解多个初始状态的MPC问题,例如你有40000个点的MPC问题想要求解,本工具可以大幅提升速度,当然,求一个MPC问题也是可以的,简单封装了一下,后续结合字符串匹配等方式希望将求解器做到全自动化,现在定义目标函数这个过程还是需要手动定义,所以算是半自动吧。

什么是全自动:对于仿射非线性系统来说,输入fx表达式(字符串形式),gx表达式(字符串形式),MPC的控制时域N,初始状态集合,定义并行的进程数,程序直接输出所有的轨迹

主程序调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 定义fx
fx = ['-0.3*x2*cosx1', '1.01x2+0.2*sin(x1^2)']
# 定义gx
gx = ['1, 0', '0, 1']
# 上述的方式涉及到字符串匹配等问题,还包括字符关系映射成数学逻辑关系,思考了一下感觉挺复杂的,还没有实现自动创建目标函数
# 定义目标函数
def generate_cost_func():
"""
功能:生成优化问题,返回的是一个lambdify函数
输入:
1. 初始状态点

输出:
1. 构建的数值表达式:obj_f
"""
N = 10
state_dim = 2
action_dim = 2
# 通过符号表达式定义目标函数
action_list = [symengine.Symbol("a{}".format(i)) for i in range(action_dim * N)]
initial_state_list = [symengine.Symbol("x{}".format(i)) for i in range(state_dim)]
state1_list = []
state2_list = []
state1_list.append(initial_state_list[0])
state2_list.append(initial_state_list[1])
obj = 0

for i in range(N):
state1_list.append(
-0.3 * state2_list[i] * symengine.cos(state1_list[i])
+ action_list[action_dim * i]
)
state2_list.append(
1.01 * state2_list[i]
+ 0.2 * symengine.sin(state1_list[i] ** 2)
+ action_list[action_dim * i + 1]
)
obj += state1_list[i] ** 2 + state2_list[i] ** 2 + action_list[i] ** 2
obj += state1_list[-1] ** 2 + state2_list[-1] ** 2

cost_func = symengine.lambdify(
action_list + initial_state_list, [obj], dtype=np.float64, backend="llvm"
)
return cost_func
cost_func = generate_cost_func()
# 定义步长
N = 10
# 定义初始状态集合
x_0 = np.arange(-2.0, 2.0, 0.02)
x_1 = np.arange(-2.0, 2.0, 0.02)
xx_0, xx_1 = np.meshgrid(x_0, x_1) # 为一维的矩阵
initial_states = np.transpose(np.array([xx_0.ravel(), xx_1.ravel()]))
# 定义进程数
process = os.cpu_count()
# 调用求解器求解
mpc_solver(cost_func, N, initial_states)

计算单步的损失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def compute_loss(initial_state: np.array, action: np.array):
"""
功能:给定一个初始状态,一串动作轨迹,计算状态轨迹和损失
输入:
1. 初始状态
2. 动作轨迹

输出:
1. 状态轨迹
2. 损失函数目标值
"""
steps = N
state_trajectory = initial_state[np.newaxis, :]
for i in range(int(steps)):
state1_temp = (
-0.3 * state_trajectory[i][1] * np.cos(state_trajectory[i][0])
+ action[2 * i]
)
state2_temp = (
1.01 * state_trajectory[i][1]
+ 0.2 * np.sin(state_trajectory[i][0] ** 2)
+ action[2 * i + 1]
)
state_trajectory = np.append(
state_trajectory, np.array([[state1_temp, state2_temp]]), axis=0
)
return state_trajectory, np.sum(action ** 2) + np.sum(state_trajectory ** 2)

CEOM算法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def cem_optimize(
cost_func,
sample_N: int,
Nel: int,
initial_state: np.array,
means: list,
stds: list,
):
"""
功能:CEOM实现

输入:
1. 符号表达式
3. 采样个数:sample_N
4. 选取最好的个数:Nel
5. 初始状态
6. initial_mean
7. initial_std

输出:
1. mean_list也等于action_trajectory
2. std_list
3. state_tracejectory
5. running_time
"""
start_time = time.time()
action_sampled = scipy.stats.norm(loc=means, scale=stds).rvs(
size=(sample_N, action_dim * N)
)
obj_values = cost_func(
np.concatenate((action_sampled, np.tile(initial_state, (sample_N, 1))), axis=1)
)
# 进行排序
sort_index = np.argsort(obj_values)
nel_sort_index = sort_index[0:Nel]
means[:] = action_sampled[nel_sort_index].mean(axis=0)
stds[:] = action_sampled[nel_sort_index].std(axis=0)
state_trajectory, loss = compute_loss(initial_state, means)
return state_trajectory, loss, time.time() - start_time

构建MPC求解器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def qp_solver(cost_func, initial_state):
"""
功能:使用CEOM算法求解一次二次型优化问题

输入:
1. cost_func: 目标函数
3. initial_state: 初始状态

输出:
1. action:
2. state:
3. loss:
"""
# pdb.set_trace()
pre_loss = 1.0
# 定义参数
sample_N = 2000
Nel = 10
means = np.array([0 for i in range(action_dim * N)], dtype=np.float64)
stds = np.array([1 for i in range(action_dim * N)], dtype=np.float64)
for i in range(100):
state_trajectory, loss, runtime = cem_optimize(
cost_func,
sample_N=sample_N,
Nel=10,
initial_state=initial_state,
means=means,
stds=stds,
)
if np.abs(pre_loss - loss) < 1e-8:
# print("损失是:", loss)
break
pre_loss = loss
return means[0:2], state_trajectory[1], loss


def mpc_solver(cost_func, initial_state, trajectory, log=True):
"""
功能:使用qp_solver来求解MPC问题:一个MPC问题相当于使用qp_solver求解N次数,得到一传动作和轨迹

输入:
1. cost_func: 目标函数
2. initial_state: 初始状态
4. trajectory: 轨迹(嵌套的List),使用multiprocess定义的共享变量
5. 是否打印计算过程

输出:
1. 无输出,所有的轨迹被保存到共享变量trajectory中

"""
trajectory_single = []
state = initial_state
for step in range(N):
action, next_state, obj = qp_solver(cost_func, initial_state=state)
trajectory_single.append((state, action))
state = next_state
trajectory.append(trajectory_single)
if log:
if len(trajectory) % 5 == 0:
print("正在获取第{}个MPC最优解决,其损失为{}".format(len(trajectory), obj))

总的项目结构分为3个文件

1
2
3
1. test.py: 定义初始状态集合,定义函数
2. funcs.py: 定义action_dim, state_dim, N, generate_cost_func(), compute_loss()
3. cem_mpc_solver.py: CEOM-MPC求解器

具体的代码可以见:https://github.com/olixu/CEOM-MPC-Solver

If you like my blog, please donate for me.