@[toc]
Введение в оптимизацию роя частиц
Цель
Решить задачу планирования целевой функции, например найти минимальное или максимальное значение функции X в определенном диапазоне в функции F(x1,x2,x3).
принцип
Алгоритм роя частиц, моделируя миграцию популяций птиц, адаптивно находит положение, удовлетворяющее требованиям, в диапазоне итераций, что отражено в функции Алгоритм роя частиц может найти значение многомерной переменной X с определенной точностью. В пределах диапазона значений вектор X, удовлетворяющий функции F, может получить максимальное/минимальное значение, то есть значение x1 x2 x3....
Цель этой статьи
Реализация алгоритма роя частиц на основе движка распределенной обработки потоков Flink (минимальное значение целевой функции получается по умолчанию)
Базовая настройка среды
Java 1.8 (годы)
maven 3.6x
Flink 1.10.1
Есть ли в инете туториал, не знаю, не нашел!
Конфигурация выглядит следующим образом:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId> org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>0.8.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
Структура проекта V0.5 Beta
Тестовая версия V0.5: имеет базовые функции, многоразовый интерфейс закрыт для использования. (Дело не в том, что его нельзя использовать, он просто не спроектирован)
Структура всего проекта понятна с первого взгляда. Во-первых, тестовый пакет, это старое правило, это тестовый пакет, он бесполезен, просто игнорируйте его!
дизайн-мышление
Поскольку Flink — это механизм потоковой обработки, с операциями на нашей стороне легко справиться, и мы можем напрямую отказаться от предыдущего метода использования матриц для выполнения операций. Вместо этого мы выбираем более простой и интуитивно понятный способ представления и работы, то есть напрямую моделируем птиц. Мы определяем птицу, а затем используем эту штуку для непрерывной обработки в нашем потоке!
А преимущество прямого использования класса Bird состоит в том, что мы можем напрямую использовать Bird для записи нашего собственного оптимального положения, то есть индивидуального оптимального положения, так что системе не нужно выполнять общий расчет индивидуального оптимального положения. Определение класса птиц
package com.java.PSO.StreamPso;
import com.java.PSO.ConfigPso.ConfigPso;
import jdk.nashorn.internal.objects.annotations.Constructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.lang.reflect.Array;
import java.util.ArrayList;
@Data
@ToString
@NoArgsConstructor
public class Bird implements Cloneable {
//大鸟的编号
private Integer id;
private ArrayList<Double> Pbest;
private ArrayList<Double> Gbest;
private Double Functionresult;
private Double LFunctionresult;
private ArrayList<Double> Xpostion;
private ArrayList<Double> Vpersent;
private Integer InterTimes;
public Bird(Integer id, ArrayList<Double> pbest, ArrayList<Double> gbest, Double functionresult, Double LFunctionresult, ArrayList<Double> xpostion, ArrayList<Double> vpersent, Integer interTimes) {
this.id = id;
this.Pbest = pbest;
this.Gbest = gbest;
this.Functionresult = functionresult;
this.LFunctionresult = LFunctionresult;
this.InterTimes = interTimes;
this.setXpostion(xpostion);
this.setVpersent(vpersent);
}
public void setXpostion(ArrayList<Double> xpostion) {
//越界处理
int index = 0;
for (Double aDouble : xpostion) {
if(aDouble > ConfigPso.X_up)
xpostion.set(index,ConfigPso.X_up);
else if (aDouble < ConfigPso.X_down)
xpostion.set(index,ConfigPso.X_down);
index++;
}
Xpostion = xpostion;
}
public void setVpersent(ArrayList<Double> vpersent) {
int index = 0;
for (Double aDouble : vpersent) {
if(aDouble > ConfigPso.V_max)
vpersent.set(index,ConfigPso.V_max);
else if (aDouble < ConfigPso.V_min)
vpersent.set(index,ConfigPso.V_min);
index++;
}
Vpersent = vpersent;
}
@Override
protected Object clone() throws CloneNotSupportedException {
return super.clone();
}
}
ConfigPso
ConfigPso, несомненно, является опцией конфигурации.
Требования к конфигурации описаны следующим образом:
package com.java.PSO.ConfigPso;
public class ConfigPso {
//关于粒子群算法的相关参数设置
/**
*X(i+1) = X(i) + V(i+1)
* V(i+1) = w*V(i) +c1*r1*(Pbest-X(i)) + c2*r2*(Gbest-X(i))
* r1,r2为随机数【0,1】这边不设置
*/
public static final Double C1 = 2.0;
public static final Double C2 = 2.0;
public static final Double w = 0.4;
public static final Double X_down = -2.0;
public static final Double X_up = 2.0;
public static final Double V_min = -4.0;
public static final Double V_max = 4.0;
public static final Integer PopulationNumber = 2; //种群个数
public static final Integer IterationsNumber = 20;//迭代次数不能为0
public static final Integer ParamesNumber = 1;
}
Это соответствует нашей предыдущей математической формуле.
Function
Этот пакет подходит для хранения целевых функций, например, нам нужно оптимизировать
F(x) = x^2 (X — одномерная матрица, а длина матрицы указывает размерность)
FunctionMake — это фабричный класс
Демонстрационный код выглядит следующим образом сверху вниз:
package com.java.PSO.Function.FunctionImp;
import java.util.ArrayList;
public interface FunctionsImpl {
Double FourFunction(ArrayList<Double> parames);
}
package com.java.PSO.Function;
import com.java.PSO.Function.FunctionImp.FunctionsImpl;
import java.util.ArrayList;
public class FunctionMake {
static FunctionsImpl functions=new Functions();
public static Double FourFunction(ArrayList<Double> List){
Double rest = functions.FourFunction(List);
return rest;
}
}
package com.java.PSO.Function;
import com.java.PSO.Function.FunctionImp.FunctionsImpl;
import java.util.ArrayList;
public class Functions implements FunctionsImpl {
@Override
public Double FourFunction(ArrayList<Double> parames) {
//测试函数,寻找最小值,x 假设都在 [5,-5] vmax = [-10,10] w=0.4 c1=c2=2默认初始
Double res = 0.0;
int index = 0;
for (Object parame : parames) {
res = res + Math.pow((Double) parames.get(index),2);
index ++;
}
return res;
}
}
Обратите внимание, что есть много методов, которые являются статическими, причина очень проста, его нужно использовать позже, и лучше использовать статические методы для удобства вызова.
StreamPso
Под этим есть подпакет
Core
В этом пакете хранится ядро нашего алгоритма, которое
Однако здесь это просто реализовать. В конце концов, вещи, которые вы можете придумать, на самом деле просты, но скудная информация заставляет вас думать, что это сложно.
package com.java.PSO.StreamPso.Core;
import com.java.PSO.ConfigPso.ConfigPso;
import com.java.PSO.StreamPso.Bird;
import java.util.ArrayList;
import java.util.Random;
public class Core {
static Random random = new Random();
public static ArrayList<Double> UpdateSpeed(Bird bird){
ArrayList<Double> CurrentSpeed = bird.getVpersent();
//更新速度,传入大鸟,会自动更新大鸟的速度,同时返回更新后的速度向量
Double fai1 = ConfigPso.C1 * random.nextDouble(); //c1*r1
Double fai2 = ConfigPso.C2 * random.nextDouble(); //c2*r2
int index = 0;
for (Double aDouble : CurrentSpeed) {
aDouble = ConfigPso.w * aDouble + fai1*(bird.getPbest().get(index) - bird.getXpostion().get(index))
+ fai2*(bird.getGbest().get(index) - bird.getXpostion().get(index));
CurrentSpeed.set(index,aDouble);
index ++ ;
}
//完成对速度的更新
bird.setVpersent(CurrentSpeed);
return CurrentSpeed;
}
public static ArrayList<Double> UpdatePosition(Bird bird){
//更新位置,传入大鸟,会自动更新大鸟的位置,同时返回更新后的位置的向量
int index = 0;
ArrayList<Double> CurrentXposition = bird.getXpostion();
for (Double aDouble : CurrentXposition) {
// System.out.println(aDouble+"<--->"+bird.getVpersent().get(index));
aDouble = aDouble+bird.getVpersent().get(index);
CurrentXposition.set(index,aDouble);
index++;
}
//完成对位置的更新
bird.setXpostion(CurrentXposition);
return CurrentXposition;
}
public static Bird UpDataBird(Bird bird){
//返回Bird,负责对前面的方法进行调度。只需要调用这一个方法就可以实现位置和速度更新
//先更新速度然后才能够更新位置
//由于每一个个体过来都会需要执行一下算子,所以每一次在执行的时候fai1,fai2都是不同的
//也就是每一个在每一轮当中的fai都是不同的,有可能会提高拟真度。
UpdateSpeed(bird);
UpdatePosition(bird);
return bird;
}
}
Dostream
Перед ним также находится BirdFactory, класс-фабрика для птиц, который должен генерировать птиц.Стоит упомянуть, что здесь используется клон. Но это не ключевой момент, а реализовать его тоже очень просто, поэтому я не буду его здесь показывать, а предоставлю читателям доработать код.
Так что в этом задействовано много базовых операторов, и как добиться оптимальной сортировки людей и всего мира.
индивидуальный оптимальный
Это относительно просто реализовать.Поскольку Flink является потоковой обработкой, нам нужно только записать предыдущее состояние, чтобы обработать его.
static class MinMapsP implements MapFunction<Bird,Bird>{
@Override
public Bird map(Bird bird) throws Exception{
//此时状态由Bird自己进行管理,Lfunctionresult记录的就是t-1次的个体最优的值,我们这边是找最小的的函数值
if(bird.getFunctionresult()<bird.getLFunctionresult()){
bird.setPbest(bird.getXpostion());
//更新最优值
bird.setLFunctionresult(bird.getFunctionresult());
}
return bird;
}
}
глобальный оптимум
Для этого нам нужна глобальная запись состояния
вот это
Запись состояния после
static class MinMapsG implements MapFunction<Bird,Bird>{
//这个是通用的不存在初始化例外使用的情况
@Override
public Bird map(Bird bird) throws Exception {
//状态流,状态由系统记录
if(bird1!=null){
if( bird.getFunctionresult()> bird1.getFunctionresult())
bird.setGbest(bird1.getXpostion());
else {
bird.setGbest(bird.getXpostion());
bird1=bird;
}
}
else{
bird1 = bird;
bird.setGbest(bird.getXpostion());
}
return bird;
}
}
Но тут есть проблема, я думаю вы тоже заметили, что мы получили глобальный оптимум, но нам же нужно зафиксировать каждую Птицу, то есть сказать Птице, кто лучшая (глобальная) и потом ввести расчет, конечно вы также. Причина, по которой вы можете выбрать использование bird1 напрямую, заключается в том, что bird1 записывает человека с глобальным оптимальным Pbest (с Pbest, но это не означает, что он лучший), но, хотя это решение, Flink является многопоточным, поэтому Знаете это решение не может быть реализовано напрямую. , все равно нужно это делать в операторе, а здесь я переиспользую MinMapsG напрямую.
глобальный код вызова
Это код в Dostream и основной код.
package com.java.PSO.StreamPso;
import com.java.PSO.ConfigPso.ConfigPso;
import com.java.PSO.Function.FunctionImp.FunctionsImpl;
import com.java.PSO.Function.FunctionMake;
import com.java.PSO.Function.Functions;
import com.java.PSO.StreamPso.Core.Core;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Random;
public class DoSteam {
static Bird bird1;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Bird> BirdInitStream = env.addSource(new InitBirds());
KeyedStream<Bird, Integer> Birdtimeing = BirdInitStream.keyBy(Bird::getInterTimes);
//进行初始化,获取全局最优,获取全局最优需要调用两次MinMapsG这个算子
//由于是基于流处理,不使用窗口所以必须使用状态流进行全局最优筛选,第一次调用只是选择出全局最优
//第二次调用是为了给所有的个体赋值,和个体的最优处理
SingleOutputStreamOperator<Bird> map = Birdtimeing.map(new MinMapsG());
KeyedStream<Bird, Tuple> id = map.keyBy("id");
SingleOutputStreamOperator<Bird> map1 = id.map(new MinMapsG());
SingleOutputStreamOperator<Bird> RealStream = map1.map(new MinMapsPinitial());
// RealStream.print("init");
//完成初始化后的数据流,到这里开始进行循环
IterativeStream<Bird> iterateStream = RealStream.iterate();
SingleOutputStreamOperator<Bird> IterationBody = iterateStream.keyBy(Bird::getInterTimes) //分组
.map(new MinMapsG()) //首次寻早最优解
.keyBy("id") //再次分组两个原因
.map(new MinMapsG()) // 再次统计最优解,为全局的位置最优解
.map(new MinMapsP())// 循环处理当中的个体最优解决
.map(new CalculationPso());//这一步是进行粒子群的运算,也是比较重要的一环
//需要进入循环的条件
SingleOutputStreamOperator<Bird> IterationFlag = IterationBody.filter(new FilterFunction<Bird>() {
@Override
public boolean filter(Bird bird) throws Exception {
return bird.getInterTimes() < ConfigPso.IterationsNumber;
}
});
iterateStream.closeWith(IterationFlag);
SingleOutputStreamOperator<Bird> Outstream = IterationBody.filter(new FilterFunction<Bird>() {
@Override
public boolean filter(Bird bird) throws Exception {
return bird.getInterTimes() >= ConfigPso.IterationsNumber;
}
});
// Outstream.print("1-->");
//到这一步的话我们的程序已经进行了最后一次的运行,但是此时的是没有进行排序的,所以需要进行最后一次排序
//这里由于只输出一个,所以这里打算直接开个技术窗口,然后输出最值!
SingleOutputStreamOperator<Bird> MinBrid = Outstream.countWindowAll(ConfigPso.PopulationNumber).min("Functionresult");
MinBrid.print("The best bird");
env.execute();
}
static class CalculationPso implements MapFunction<Bird,Bird>{
@Override
public Bird map(Bird bird) throws Exception {
/**
* @Huterox
* @Time:2021-11-1
* 目标,通过Core的Update实现对大鸟(粒子)的位置和速度更新
* 之后通过更新后的位置计算出目标函数的值,进行设置,前面的算子再进行一个新的轮回
* 更新粒子迭代次数
*/
Core.UpDataBird(bird);
bird.setFunctionresult(FunctionMake.FourFunction(bird.getXpostion()));
bird.setInterTimes(bird.getInterTimes()+1);
return bird;
}
}
static class MinMapsP implements MapFunction<Bird,Bird>{
@Override
public Bird map(Bird bird) throws Exception{
//此时状态由Bird自己进行管理,Lfunctionresult记录的就是t-1次的个体最优的值,我们这边是找最小的的函数值
if(bird.getFunctionresult()<bird.getLFunctionresult()){
bird.setPbest(bird.getXpostion());
//更新最优值
bird.setLFunctionresult(bird.getFunctionresult());
}
return bird;
}
}
static class MinMapsPinitial implements MapFunction<Bird,Bird>{
// 计算个体最优的都是无序的数据流,系统不好记录同时为了性能,所以个体状态由个体自己记录
@Override
public Bird map(Bird bird) throws Exception {
//本次进行初始化
//为了减少条件判读,所以直接把个体最优的算子进行拆分
bird.setPbest(bird.getXpostion());
bird.setLFunctionresult(bird.getFunctionresult());
return bird;
}
}
static class MinMapsG implements MapFunction<Bird,Bird>{
//这个是通用的不存在初始化例外使用的情况
@Override
public Bird map(Bird bird) throws Exception {
//状态流,状态由系统记录
if(bird1!=null){
if( bird.getFunctionresult()> bird1.getFunctionresult())
bird.setGbest(bird1.getXpostion());
else {
bird.setGbest(bird.getXpostion());
bird1=bird;
}
}
else{
bird1 = bird;
bird.setGbest(bird.getXpostion());
}
return bird;
}
}
static class InitBirds implements SourceFunction<Bird>{
@Override
public void run(SourceContext<Bird> ctx) throws Exception {
for(int i=1;i<=ConfigPso.PopulationNumber; i++) {
Bird bird = BirdFactory.MakeBird(i);
Double functionresult = FunctionMake.FourFunction(bird.getXpostion());
bird.setFunctionresult(functionresult);
bird.setInterTimes(0);//表示正在初始化
ctx.collect(bird);
}
}
@Override
public void cancel() {
}
}
}
контрольная работа
Когда мы получаем предыдущий тест файла конфигурации напрямую, F(x) = x^2
Минимальное значение этой функции, повторите 20 раз, чтобы увидеть результат
Видно, что после 20 раз есть хороший эффект, и значение близко к 0.
Поскольку другие функциональные тесты требуют корректировки соответствующих параметров конфигурации, они не будут здесь демонстрироваться.