Skip to content

Included Problems

The Moving Peaks Benchmark (MPB) for dynamic optimization problems.

This module provides an implementation of the Moving Peaks Benchmark (MPB) generator, a widely used tool for creating dynamic, multi-peaked optimization landscapes. It is designed to test the ability of optimization algorithms to adapt to changing environments.

The MPB landscape is defined by a number of peaks, each with its own height, width, and position. At specified intervals (controlled by change_frequency), these peak properties are updated, causing the landscape to shift, change shape, or both.

While the benchmark is naturally a maximization problem, this implementation negates the fitness value upon evaluation, allowing it to be used directly with standard minimization algorithms.


The 28 Standard Problem Classes & The generate_mpb_configs Function

This module also includes the generate_mpb_configs helper function, which programmatically creates parameter dictionaries for the 28 standard problem classes defined by Duhain and Engelbrecht.

These classes are identified by a 3-letter acronym (e.g., 'A1L'), which combines one code from each of the three classification schemes detailed below.

1. Duhain & Engelbrecht: Spatial and Temporal Severity (First Letter)

Defines the magnitude and frequency of changes.

  • Progressive ('P'): Frequent, small changes.

    • change_frequency: Low value (high temporal change).
    • change_severity (s): Low value.
    • height_severity: Low value.
  • Abrupt ('A'): Infrequent, large changes.

    • change_frequency: High value (low temporal change).
    • change_severity (s): High value.
    • height_severity: High value.
  • Chaotic ('C'): Frequent, large changes.

    • change_frequency: Low value (high temporal change).
    • change_severity (s): High value.
    • height_severity: High value.

2. Hu & Eberhart / Shi & Eberhart: Optima Modification (Second Letter)

Defines what changes about the peaks (position, value, or both).

  • Type I ('1'): Locations change, heights are constant.

    • height_severity: Set to 0.0.
    • Requires change_severity (s) != 0 for movement.
  • Type II ('2'): Locations are static, heights change.

    • height_severity: Set to a non-zero value.
    • Requires change_severity (s) = 0 to prevent movement.
  • Type III ('3'): Both locations and heights change.

    • height_severity: Set to a non-zero value.
    • Requires change_severity (s) != 0 for movement.

3. Angeline: Optima Trajectory (Third Letter)

Defines the pattern of peak movement.

  • Linear ('L'): Peaks move in a straight, correlated line.

    • lambda_param: Set to 1.0.
    • Requires change_severity (s) != 0 for movement.
  • Circular ('C'): Peaks have a periodic movement pattern. This is achieved in the parameterization by preventing translational movement.

    • Requires change_severity (s) = 0.
  • Random ('R'): Peaks move randomly without a discernible pattern.

    • lambda_param: Set to 0.0.
    • Requires change_severity (s) != 0 for movement.

Conflict Resolution

Some combinations are impossible (e.g., a Type II problem, which requires s = 0, cannot have Linear movement, which requires s != 0). The generate_mpb_configs function marks these impossible configurations by setting the change_severity parameter to the string 'XXX'.

MovingPeaksBenchmark

Bases: Problem[ndarray, float]

An implementation of the Moving Peaks Benchmark (MPB) generator.

This class conforms to the Problem interface and produces dynamic, unconstrained optimization problems. The objective is to find the maximum value in a landscape composed of several moving peaks.

Note

Since most solvers are minimizers, the evaluate method returns the negated value of the MPB function. Minimizing this value is equivalent to maximizing the original function.

Attributes:

Name Type Description
peaks List[_Peak]

A list of the peak objects in the landscape.

Source code in cilpy/problem/mpb.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
class MovingPeaksBenchmark(Problem[np.ndarray, float]):
    """An implementation of the Moving Peaks Benchmark (MPB) generator.

    This class conforms to the `Problem` interface and produces dynamic,
    unconstrained optimization problems. The objective is to find the maximum
    value in a landscape composed of several moving peaks.

    Note:
        Since most solvers are minimizers, the `evaluate` method returns the
        *negated* value of the MPB function. Minimizing this value is
        equivalent to maximizing the original function.

    Attributes:
        peaks (List[_Peak]): A list of the peak objects in the landscape.
    """

    def __init__(
        self,
        dimension: int = 2,
        num_peaks: int = 10,
        domain: Tuple[float, float] = (0.0, 100.0),
        min_height: float = 30.0,
        max_height: float = 70.0,
        min_width: float = 1.0,
        max_width: float = 12.0,
        change_frequency: int = 5000,
        change_severity: float = 1.0,
        height_severity: float = 7.0,
        width_severity: float = 1.0,
        lambda_param: float = 0.0,
        name: str = "MovingPeaksBenchmark",
    ):
        """Initializes the Moving Peaks Benchmark problem.

        Args:
            dimension (int): The dimensionality of the search landscape.
            num_peaks (int): The number of peaks in the landscape.
            domain (Tuple[float, float]): The `(min, max)` coordinates for the
                symmetric search space.
            min_height (float): The minimum initial height of a peak.
            max_height (float): The maximum initial height of a peak.
            min_width (float): The minimum initial width of a peak.
            max_width (float): The maximum initial width of a peak.
            change_frequency (int): The number of evaluations between landscape changes.
            change_severity (float): Controls how severely peak positions change.
            height_severity (float): Controls how severely peak heights change.
            width_severity (float): Controls how severely peak widths change.
            lambda_param (float): Correlates peak movement over time. A value of
                0.0 results in random movement direction at each change.
            name (str): The name of the problem instance.
        """
        min_bounds = np.array([domain[0]] * dimension)
        max_bounds = np.array([domain[1]] * dimension)
        super().__init__(dimension, (min_bounds, max_bounds), name)

        self._change_frequency = change_frequency
        self._change_sev = change_severity
        self._height_sev = height_severity
        self._width_sev = width_severity
        self._lambda = lambda_param
        self._max_height = max_height

        self.peaks: List[_Peak] = []
        for _ in range(num_peaks):
            pos = np.random.uniform(domain[0], domain[1], size=dimension)
            height = random.uniform(min_height, max_height)
            width = random.uniform(min_width, max_width)
            self.peaks.append(_Peak(pos, height, width))

        self._base_value = 0.0  # As per Equation 4.2
        self._eval_count = 0
        self._iteration_count = 0

    def evaluate(self, solution: np.ndarray) -> Evaluation[float]:
        """Evaluates a solution and returns its fitness.

        This method checks if the environment should change based on the
        evaluation count. It then calculates the function value as the maximum
        of all peak evaluations.

        Args:
            solution (np.ndarray): The candidate solution to be evaluated.

        Returns:
            Evaluation[float]: An Evaluation object containing the negated
                fitness value for use with minimization solvers.
        """
        self._eval_count += 1

        peak_values = [p.evaluate(solution) for p in self.peaks]
        fitness = float(max([self._base_value] + peak_values))
        return Evaluation(fitness=-fitness)

    def is_dynamic(self) -> Tuple[bool, bool]:
        """Indicates that the problem's objectives are dynamic.

        Returns:
            Tuple[bool, bool]: A tuple `(True, False)` as the objective
                function changes over time but there are no constraints.
        """
        return (True, False)

    def is_multi_objective(self) -> bool:
        """Indicates that the problem is not multi-objective."""
        return False

    def begin_iteration(self) -> None:
        """
        This method is called by the runner once per iteration.
        It handles the logic for changing the environment.
        """
        self._iteration_count += 1

        if (
            self._change_frequency > 0
            and self._iteration_count > 0
            and self._iteration_count % self._change_frequency == 0
        ):
            self.update_all_peaks()

    def get_fitness_bounds(self) -> Tuple[float, float]:
        """
        Returns the known theoretical min and max fitness values for the
        problem.

        This is used for calculating normalized performance metrics.

        Returns:
            A tuple containing (global_minimum_fitness, global_maximum_fitness).
        """
        return (-self._max_height, -self._base_value)

    def update_all_peaks(self) -> None:
        """Updates all peaks of the mpb."""
        for peak in self.peaks:
            peak.update(
                height_sev=self._height_sev,
                width_sev=self._width_sev,
                change_sev=self._change_sev,
                lambda_param=self._lambda,
                bounds=self.bounds,
                max_height_cap=self._max_height,
            )

__init__(dimension=2, num_peaks=10, domain=(0.0, 100.0), min_height=30.0, max_height=70.0, min_width=1.0, max_width=12.0, change_frequency=5000, change_severity=1.0, height_severity=7.0, width_severity=1.0, lambda_param=0.0, name='MovingPeaksBenchmark')

Initializes the Moving Peaks Benchmark problem.

Parameters:

Name Type Description Default
dimension int

The dimensionality of the search landscape.

2
num_peaks int

The number of peaks in the landscape.

10
domain Tuple[float, float]

The (min, max) coordinates for the symmetric search space.

(0.0, 100.0)
min_height float

The minimum initial height of a peak.

30.0
max_height float

The maximum initial height of a peak.

70.0
min_width float

The minimum initial width of a peak.

1.0
max_width float

The maximum initial width of a peak.

12.0
change_frequency int

The number of evaluations between landscape changes.

5000
change_severity float

Controls how severely peak positions change.

1.0
height_severity float

Controls how severely peak heights change.

7.0
width_severity float

Controls how severely peak widths change.

1.0
lambda_param float

Correlates peak movement over time. A value of 0.0 results in random movement direction at each change.

0.0
name str

The name of the problem instance.

'MovingPeaksBenchmark'
Source code in cilpy/problem/mpb.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def __init__(
    self,
    dimension: int = 2,
    num_peaks: int = 10,
    domain: Tuple[float, float] = (0.0, 100.0),
    min_height: float = 30.0,
    max_height: float = 70.0,
    min_width: float = 1.0,
    max_width: float = 12.0,
    change_frequency: int = 5000,
    change_severity: float = 1.0,
    height_severity: float = 7.0,
    width_severity: float = 1.0,
    lambda_param: float = 0.0,
    name: str = "MovingPeaksBenchmark",
):
    """Initializes the Moving Peaks Benchmark problem.

    Args:
        dimension (int): The dimensionality of the search landscape.
        num_peaks (int): The number of peaks in the landscape.
        domain (Tuple[float, float]): The `(min, max)` coordinates for the
            symmetric search space.
        min_height (float): The minimum initial height of a peak.
        max_height (float): The maximum initial height of a peak.
        min_width (float): The minimum initial width of a peak.
        max_width (float): The maximum initial width of a peak.
        change_frequency (int): The number of evaluations between landscape changes.
        change_severity (float): Controls how severely peak positions change.
        height_severity (float): Controls how severely peak heights change.
        width_severity (float): Controls how severely peak widths change.
        lambda_param (float): Correlates peak movement over time. A value of
            0.0 results in random movement direction at each change.
        name (str): The name of the problem instance.
    """
    min_bounds = np.array([domain[0]] * dimension)
    max_bounds = np.array([domain[1]] * dimension)
    super().__init__(dimension, (min_bounds, max_bounds), name)

    self._change_frequency = change_frequency
    self._change_sev = change_severity
    self._height_sev = height_severity
    self._width_sev = width_severity
    self._lambda = lambda_param
    self._max_height = max_height

    self.peaks: List[_Peak] = []
    for _ in range(num_peaks):
        pos = np.random.uniform(domain[0], domain[1], size=dimension)
        height = random.uniform(min_height, max_height)
        width = random.uniform(min_width, max_width)
        self.peaks.append(_Peak(pos, height, width))

    self._base_value = 0.0  # As per Equation 4.2
    self._eval_count = 0
    self._iteration_count = 0

begin_iteration()

This method is called by the runner once per iteration. It handles the logic for changing the environment.

Source code in cilpy/problem/mpb.py
309
310
311
312
313
314
315
316
317
318
319
320
321
def begin_iteration(self) -> None:
    """
    This method is called by the runner once per iteration.
    It handles the logic for changing the environment.
    """
    self._iteration_count += 1

    if (
        self._change_frequency > 0
        and self._iteration_count > 0
        and self._iteration_count % self._change_frequency == 0
    ):
        self.update_all_peaks()

evaluate(solution)

Evaluates a solution and returns its fitness.

This method checks if the environment should change based on the evaluation count. It then calculates the function value as the maximum of all peak evaluations.

Parameters:

Name Type Description Default
solution ndarray

The candidate solution to be evaluated.

required

Returns:

Type Description
Evaluation[float]

Evaluation[float]: An Evaluation object containing the negated fitness value for use with minimization solvers.

Source code in cilpy/problem/mpb.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def evaluate(self, solution: np.ndarray) -> Evaluation[float]:
    """Evaluates a solution and returns its fitness.

    This method checks if the environment should change based on the
    evaluation count. It then calculates the function value as the maximum
    of all peak evaluations.

    Args:
        solution (np.ndarray): The candidate solution to be evaluated.

    Returns:
        Evaluation[float]: An Evaluation object containing the negated
            fitness value for use with minimization solvers.
    """
    self._eval_count += 1

    peak_values = [p.evaluate(solution) for p in self.peaks]
    fitness = float(max([self._base_value] + peak_values))
    return Evaluation(fitness=-fitness)

get_fitness_bounds()

Returns the known theoretical min and max fitness values for the problem.

This is used for calculating normalized performance metrics.

Returns:

Type Description
Tuple[float, float]

A tuple containing (global_minimum_fitness, global_maximum_fitness).

Source code in cilpy/problem/mpb.py
323
324
325
326
327
328
329
330
331
332
333
def get_fitness_bounds(self) -> Tuple[float, float]:
    """
    Returns the known theoretical min and max fitness values for the
    problem.

    This is used for calculating normalized performance metrics.

    Returns:
        A tuple containing (global_minimum_fitness, global_maximum_fitness).
    """
    return (-self._max_height, -self._base_value)

is_dynamic()

Indicates that the problem's objectives are dynamic.

Returns:

Type Description
Tuple[bool, bool]

Tuple[bool, bool]: A tuple (True, False) as the objective function changes over time but there are no constraints.

Source code in cilpy/problem/mpb.py
296
297
298
299
300
301
302
303
def is_dynamic(self) -> Tuple[bool, bool]:
    """Indicates that the problem's objectives are dynamic.

    Returns:
        Tuple[bool, bool]: A tuple `(True, False)` as the objective
            function changes over time but there are no constraints.
    """
    return (True, False)

is_multi_objective()

Indicates that the problem is not multi-objective.

Source code in cilpy/problem/mpb.py
305
306
307
def is_multi_objective(self) -> bool:
    """Indicates that the problem is not multi-objective."""
    return False

update_all_peaks()

Updates all peaks of the mpb.

Source code in cilpy/problem/mpb.py
335
336
337
338
339
340
341
342
343
344
345
def update_all_peaks(self) -> None:
    """Updates all peaks of the mpb."""
    for peak in self.peaks:
        peak.update(
            height_sev=self._height_sev,
            width_sev=self._width_sev,
            change_sev=self._change_sev,
            lambda_param=self._lambda,
            bounds=self.bounds,
            max_height_cap=self._max_height,
        )

demonstrate_mpb(params)

Helper function to run and print a scenario.

Source code in cilpy/problem/mpb.py
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def demonstrate_mpb(params: dict):
    """Helper function to run and print a scenario."""
    print("-" * 50)
    print(f"Demonstration: {params.get('name')}")
    print("-" * 50)

    # Instantiate the problem
    problem = MovingPeaksBenchmark(**params)

    # We will track the position and value of a single peak to see how it moves.
    tracked_peak_index = 0

    # We will also evaluate a static point to see how the landscape changes underneath it.
    static_point_to_test = np.array([50.0] * params.get("dimension"))  # type: ignore

    num_changes_to_observe = 5
    total_evaluations = params["change_frequency"] * num_changes_to_observe

    for i in range(total_evaluations + 1):
        # The actual evaluation triggers the internal counter
        evaluation = problem.evaluate(static_point_to_test)
        problem.begin_iteration()

        # Check if the environment just changed
        if i > 0 and i % (params["change_frequency"] / 2) == 0:
            change_num = i // params["change_frequency"]
            peak_pos = problem.peaks[tracked_peak_index].v
            peak_evaluation = problem.evaluate(peak_pos)

            print(f"\nEnvironment Change #{change_num} (at evaluation {i}):")
            print(f"  - Position of Peak {tracked_peak_index}: {peak_pos}")
            print(
                f"  - Value of Peak {tracked_peak_index}: [{peak_evaluation.fitness}]"
            )
            print(f"  - Value at static point: {evaluation.fitness:.2f}")

    print("\n")

generate_mpb_configs(dimension=5, num_peaks=10, domain=(0.0, 100.0), min_height=30.0, max_height=70.0, min_width=1.0, max_width=12.0, width_severity=0.05, s_for_random=1.0)

Programmatically generates parameter dictionaries for all 28 MPB classes.

This function combines the rules from three classification schemes to generate 27 dynamic problem configurations and 1 static configuration. It handles contradictions between rules as specified.

Parameters:

Name Type Description Default
s_for_random float

The non-zero value to use for the change_severity parameter s when a non-zero value is required. Defaults to 1.0.

1.0

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dict[str, Dict]: A dictionary where keys are the 3-letter acronyms (e.g., "A1C", "P3L") and values are the corresponding parameter dictionaries for the MovingPeaksBenchmark constructor. A "STA" key is included for the static case.

Source code in cilpy/problem/mpb.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def generate_mpb_configs(
    dimension: int = 5,
    num_peaks: int = 10,
    domain: Tuple[float, float] = (0.0, 100.0),
    min_height: float = 30.0,
    max_height: float = 70.0,
    min_width: float = 1.0,
    max_width: float = 12.0,
    width_severity: float = 0.05,
    s_for_random: float = 1.0,  # s value for s != 0
) -> Dict[str, Dict[str, Any]]:
    """
    Programmatically generates parameter dictionaries for all 28 MPB classes.

    This function combines the rules from three classification schemes to generate
    27 dynamic problem configurations and 1 static configuration. It handles
    contradictions between rules as specified.

    Args:
        s_for_random (float): The non-zero value to use for the change_severity
            parameter `s` when a non-zero value is required. Defaults to 1.0.

    Returns:
        Dict[str, Dict]: A dictionary where keys are the 3-letter acronyms
            (e.g., "A1C", "P3L") and values are the corresponding parameter
            dictionaries for the MovingPeaksBenchmark constructor. A "STA" key
            is included for the static case.
    """
    if s_for_random == 0:
        raise ValueError("'s_for_random' must be a non-zero value.")

    # 1. Base Configuration (common to all classes)
    base_params = {
        "dimension": dimension,
        "num_peaks": num_peaks,
        "domain": domain,
        "min_height": min_height,
        "max_height": max_height,
        "min_width": min_width,
        "max_width": max_width,
        "width_severity": width_severity,  # Often kept low
    }

    # 2. Define "Low" vs. "High" Values for severity and frequency
    LOW_S, HIGH_S = s_for_random, 10.0
    LOW_H, HIGH_H = 1.0, 10.0

    # High temporal frequency = low number of evaluations between changes
    FREQ_PROGRESSIVE = 20
    FREQ_ABRUPT = 100
    FREQ_CHAOTIC = 30

    # 3. Classification Rules
    # Duhain & Engelbrecht: Severity (Spatial & Temporal)
    # Acronyms: P (Progressive), A (Abrupt), C (Chaotic)
    SEVERITY_CLASSES = {
        "P": {
            "change_severity": LOW_S,
            "height_severity": LOW_H,
            "change_frequency": FREQ_PROGRESSIVE,
        },
        "A": {
            "change_severity": HIGH_S,
            "height_severity": HIGH_H,
            "change_frequency": FREQ_ABRUPT,
        },
        "C": {
            "change_severity": HIGH_S,
            "height_severity": HIGH_H,
            "change_frequency": FREQ_CHAOTIC,
        },
    }

    # Hu & Eberhart / Shi & Eberhart: Optima Modification
    # Acronyms: 1 (Type I), 2 (Type II), 3 (Type III)
    # We use 's_req' to define the requirement for the change_severity (s)
    MODIFICATION_CLASSES = {
        "1": {"height_severity": 0.0, "s_req": "!=0"},
        "2": {"s_req": "=0"},  # height_severity will be taken from SEVERITY_CLASSES
        "3": {"s_req": "!=0"},  # height_severity will be taken from SEVERITY_CLASSES
    }

    # Angeline: Optima Trajectory
    # Acronyms: L (Linear), C (Circular), R (Random)
    MOVEMENT_CLASSES = {
        "L": {"lambda_param": 1.0, "s_req": "!=0"},
        "C": {"lambda_param": 0.0, "s_req": "=0"},  # lambda is irrelevant when s=0
        "R": {"lambda_param": 0.0, "s_req": "!=0"},
    }

    # --- Generation Logic ---
    all_configs = {}

    # 4. Iterate through all 3*3*3 = 27 combinations
    severity_codes = SEVERITY_CLASSES.keys()
    modification_codes = MODIFICATION_CLASSES.keys()
    movement_codes = MOVEMENT_CLASSES.keys()

    for sev_code, mod_code, mov_code in itertools.product(
        severity_codes, modification_codes, movement_codes
    ):
        acronym = f"{sev_code}{mod_code}{mov_code}"

        # Start with base and add severity parameters
        config = base_params.copy()
        config.update(SEVERITY_CLASSES[sev_code])
        config["name"] = acronym

        mod_rules = MODIFICATION_CLASSES[mod_code]
        mov_rules = MOVEMENT_CLASSES[mov_code]

        # 5. Resolve Conflicts for `change_severity` (s)
        s_req_mod = mod_rules["s_req"]
        s_req_mov = mov_rules["s_req"]

        is_conflict = (s_req_mod == "!=0" and s_req_mov == "=0") or (
            s_req_mod == "=0" and s_req_mov == "!=0"
        )

        if is_conflict:
            # *2L/*2R: 2 requires s = 0, but L&R requires s != 0
            #          2 gets priority since *3* requires s != 0
            if mod_code == "2" and (mov_code == "L" or mov_code == "R"):
                config["change_severity"] = 0.0
            # *1C/*3C: C requires s = 0, but 1&3 requires s != 0
            #          C gets priority since *2* requires s = 0
            elif mov_code == "C" and (mod_code == "1" or mod_code == "3"):
                config["change_severity"] = 1.0
            else:
                # This is not supposed to happen, but is kept for clarity
                config["change_severity"] = "XXX"  # Assign placeholder on conflict
        elif s_req_mod == "=0" or s_req_mov == "=0":
            # If either requires s=0 and there's no conflict, it must be 0
            config["change_severity"] = 0.0
        else:
            # Otherwise, s must be non-zero. Use the value from the severity class.
            # This is already set, but we make it explicit for clarity.
            pass

        # 6. Apply overrides from modification and movement rules
        # C1*: C requires hSeverity high, but 1 requires hSeverity = 0
        #      1 (mod rule) gets priority since *2*/*3* requires hSeverity != 0
        if "height_severity" in mod_rules:
            config["height_severity"] = mod_rules["height_severity"]

        if "lambda_param" in mov_rules:
            config["lambda_param"] = mov_rules["lambda_param"]

        all_configs[acronym] = config

    # Add the static problem class
    static_config = base_params.copy()
    static_config.update(
        {
            "change_frequency": 0,
            "change_severity": 0,
            "height_severity": 0,
            "width_severity": 0,
            "lambda_param": 0,
            "name": "STA",
        }
    )
    all_configs["STA"] = static_config

    return all_configs

The Constrained Moving Peaks Benchmark (CMPB).

This module provides an implementation of the CMPB generator, a dynamic and constrained optimization problem.

ConstrainedMovingPeaksBenchmark

Bases: Problem[ndarray, float]

An implementation of the Constrained Moving Peaks Benchmark (CMPB).

This class generates a dynamic constrained optimization problem by composing two independent MovingPeaksBenchmark instances: one for the objective function landscape (f) and one for the constraint landscape (g).

The problem is naturally a maximization problem defined as: Maximize: h(x) = f(x) - g(x) A solution is considered feasible if h(x) >= 0 (i.e., f(x) >= g(x)).

To align with standard minimization solvers, this class formulates the problem as: Minimize: g(x) - f(x) Subject to: g(x) - f(x) <= 0

This formulation correctly models the problem, where the objective function and the single inequality constraint are the same.

Attributes:

Name Type Description
f_landscape MovingPeaksBenchmark

The MPB instance for the objective function landscape.

g_landscape MovingPeaksBenchmark

The MPB instance for the constraint function landscape.

Source code in cilpy/problem/cmpb.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
class ConstrainedMovingPeaksBenchmark(Problem[np.ndarray, float]):
    """An implementation of the Constrained Moving Peaks Benchmark (CMPB).

    This class generates a dynamic constrained optimization problem by composing
    two independent `MovingPeaksBenchmark` instances: one for the objective
    function landscape (`f`) and one for the constraint landscape (`g`).

    The problem is naturally a maximization problem defined as:
    Maximize: h(x) = f(x) - g(x)
    A solution is considered feasible if h(x) >= 0 (i.e., f(x) >= g(x)).

    To align with standard minimization solvers, this class formulates the
    problem as:
    Minimize: g(x) - f(x)
    Subject to: g(x) - f(x) <= 0

    This formulation correctly models the problem, where the objective function
    and the single inequality constraint are the same.

    Attributes:
        f_landscape (MovingPeaksBenchmark): The MPB instance for the objective
            function landscape.
        g_landscape (MovingPeaksBenchmark): The MPB instance for the constraint
            function landscape.
    """

    def __init__(
        self,
        f_params: Dict[str, Any],
        g_params: Dict[str, Any],
        name: str = "ConstrainedMovingPeaksBenchmark",
    ):
        """Initializes the Constrained Moving Peaks Benchmark generator.

        Args:
            f_params (Dict[str, Any]): A dictionary of parameters for the
                objective landscape (f), which will be passed to the
                `MovingPeaksBenchmark` constructor.
            g_params (Dict[str, Any]): A dictionary of parameters for the
                constraint landscape (g), which will be passed to the
                `MovingPeaksBenchmark` constructor.
            name (str): The name for the problem instance.

        Raises:
            ValueError: If the 'dimension' parameter is not specified or is
                different for `f_params` and `g_params`.
        """
        f_dim = f_params.get("dimension")
        g_dim = g_params.get("dimension")

        if f_dim is None or g_dim is None or f_dim != g_dim:
            raise ValueError(
                "The 'dimension' parameter must be specified and identical for "
                "both f_params and g_params."
            )

        self.f_landscape = MovingPeaksBenchmark(**f_params)
        self.g_landscape = MovingPeaksBenchmark(**g_params)

        # The problem's domain is defined by the 'f' landscape.
        super().__init__(
            dimension=self.f_landscape.dimension,
            bounds=self.f_landscape.bounds,
            name=name,
        )

        # Determine if landscapes are dynamic based on their change frequency.
        self._is_f_dynamic = f_params.get("change_frequency", 0) > 0
        self._is_g_dynamic = g_params.get("change_frequency", 0) > 0

    def evaluate(self, solution: np.ndarray) -> Evaluation[float]:
        """Evaluates a solution against the composed objective and constraint.

        This method calls the `evaluate` method of the underlying `f` and `g`
        landscapes exactly once, ensuring that their internal evaluation
        counters are updated correctly. It then composes the results to form the
        final fitness and constraint violation.

        Args:
            solution (np.ndarray): The candidate solution to be evaluated.

        Returns:
            Evaluation[float]: An Evaluation object containing the composed
                fitness and the single inequality constraint violation.
        """
        # Evaluate each landscape once. This triggers their internal update
        # logic and returns the negated maximization value.
        f_eval = self.f_landscape.evaluate(solution)
        g_eval = self.g_landscape.evaluate(solution)

        # The MPB implementation in cilpy is already a minimization solver,
        # hence we convert evaluations back to the original maximization values.
        f_val = -f_eval.fitness
        g_val = -g_eval.fitness

        # The objective for maximization is h(x) = f(x) - g(x).  Infeasible
        # areas are indicated where h(x) < 0.
        composed_fitness = f_val - g_val

        # Fitness is negated for minimization solvers
        return Evaluation(
            fitness=-composed_fitness, constraints_inequality=[-composed_fitness]
        )

    def is_dynamic(self) -> Tuple[bool, bool]:
        """Indicates whether the objective or constraint landscapes can change.

        The composed objective `g(x) - f(x)` is dynamic if either `f` or `g`
        is dynamic. Similarly, the composed constraint is dynamic if either
        `f` or `g` is dynamic.

        Returns:
            Tuple[bool, bool]: A tuple `(is_objective_dynamic, is_constraint_dynamic)`.
        """
        return (self._is_f_dynamic, self._is_g_dynamic)

    def is_multi_objective(self) -> bool:
        return False

    def begin_iteration(self) -> None:
        """
        Notifies the underlying landscapes that a new solver iteration is
        beginning.

        This method acts as a delegate, calling the `begin_iteration` method on
        both the objective (f) and constraint (g) landscapes. This ensures
        that their internal iteration counters are incremented and environmental
        changes are triggered correctly and in sync.
        """
        self.f_landscape.begin_iteration()
        self.g_landscape.begin_iteration()

    def get_fitness_bounds(self) -> Tuple[float, float]:
        """
        Returns the known theoretical min and max fitness values for the
        problem.

        This is used for calculating normalized performance metrics.

        Returns:
            A tuple containing (global_minimum_fitness, global_maximum_fitness).
        """
        return (-self.f_landscape._max_height, -0.0)

__init__(f_params, g_params, name='ConstrainedMovingPeaksBenchmark')

Initializes the Constrained Moving Peaks Benchmark generator.

Parameters:

Name Type Description Default
f_params Dict[str, Any]

A dictionary of parameters for the objective landscape (f), which will be passed to the MovingPeaksBenchmark constructor.

required
g_params Dict[str, Any]

A dictionary of parameters for the constraint landscape (g), which will be passed to the MovingPeaksBenchmark constructor.

required
name str

The name for the problem instance.

'ConstrainedMovingPeaksBenchmark'

Raises:

Type Description
ValueError

If the 'dimension' parameter is not specified or is different for f_params and g_params.

Source code in cilpy/problem/cmpb.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    f_params: Dict[str, Any],
    g_params: Dict[str, Any],
    name: str = "ConstrainedMovingPeaksBenchmark",
):
    """Initializes the Constrained Moving Peaks Benchmark generator.

    Args:
        f_params (Dict[str, Any]): A dictionary of parameters for the
            objective landscape (f), which will be passed to the
            `MovingPeaksBenchmark` constructor.
        g_params (Dict[str, Any]): A dictionary of parameters for the
            constraint landscape (g), which will be passed to the
            `MovingPeaksBenchmark` constructor.
        name (str): The name for the problem instance.

    Raises:
        ValueError: If the 'dimension' parameter is not specified or is
            different for `f_params` and `g_params`.
    """
    f_dim = f_params.get("dimension")
    g_dim = g_params.get("dimension")

    if f_dim is None or g_dim is None or f_dim != g_dim:
        raise ValueError(
            "The 'dimension' parameter must be specified and identical for "
            "both f_params and g_params."
        )

    self.f_landscape = MovingPeaksBenchmark(**f_params)
    self.g_landscape = MovingPeaksBenchmark(**g_params)

    # The problem's domain is defined by the 'f' landscape.
    super().__init__(
        dimension=self.f_landscape.dimension,
        bounds=self.f_landscape.bounds,
        name=name,
    )

    # Determine if landscapes are dynamic based on their change frequency.
    self._is_f_dynamic = f_params.get("change_frequency", 0) > 0
    self._is_g_dynamic = g_params.get("change_frequency", 0) > 0

begin_iteration()

Notifies the underlying landscapes that a new solver iteration is beginning.

This method acts as a delegate, calling the begin_iteration method on both the objective (f) and constraint (g) landscapes. This ensures that their internal iteration counters are incremented and environmental changes are triggered correctly and in sync.

Source code in cilpy/problem/cmpb.py
136
137
138
139
140
141
142
143
144
145
146
147
def begin_iteration(self) -> None:
    """
    Notifies the underlying landscapes that a new solver iteration is
    beginning.

    This method acts as a delegate, calling the `begin_iteration` method on
    both the objective (f) and constraint (g) landscapes. This ensures
    that their internal iteration counters are incremented and environmental
    changes are triggered correctly and in sync.
    """
    self.f_landscape.begin_iteration()
    self.g_landscape.begin_iteration()

evaluate(solution)

Evaluates a solution against the composed objective and constraint.

This method calls the evaluate method of the underlying f and g landscapes exactly once, ensuring that their internal evaluation counters are updated correctly. It then composes the results to form the final fitness and constraint violation.

Parameters:

Name Type Description Default
solution ndarray

The candidate solution to be evaluated.

required

Returns:

Type Description
Evaluation[float]

Evaluation[float]: An Evaluation object containing the composed fitness and the single inequality constraint violation.

Source code in cilpy/problem/cmpb.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def evaluate(self, solution: np.ndarray) -> Evaluation[float]:
    """Evaluates a solution against the composed objective and constraint.

    This method calls the `evaluate` method of the underlying `f` and `g`
    landscapes exactly once, ensuring that their internal evaluation
    counters are updated correctly. It then composes the results to form the
    final fitness and constraint violation.

    Args:
        solution (np.ndarray): The candidate solution to be evaluated.

    Returns:
        Evaluation[float]: An Evaluation object containing the composed
            fitness and the single inequality constraint violation.
    """
    # Evaluate each landscape once. This triggers their internal update
    # logic and returns the negated maximization value.
    f_eval = self.f_landscape.evaluate(solution)
    g_eval = self.g_landscape.evaluate(solution)

    # The MPB implementation in cilpy is already a minimization solver,
    # hence we convert evaluations back to the original maximization values.
    f_val = -f_eval.fitness
    g_val = -g_eval.fitness

    # The objective for maximization is h(x) = f(x) - g(x).  Infeasible
    # areas are indicated where h(x) < 0.
    composed_fitness = f_val - g_val

    # Fitness is negated for minimization solvers
    return Evaluation(
        fitness=-composed_fitness, constraints_inequality=[-composed_fitness]
    )

get_fitness_bounds()

Returns the known theoretical min and max fitness values for the problem.

This is used for calculating normalized performance metrics.

Returns:

Type Description
Tuple[float, float]

A tuple containing (global_minimum_fitness, global_maximum_fitness).

Source code in cilpy/problem/cmpb.py
149
150
151
152
153
154
155
156
157
158
159
def get_fitness_bounds(self) -> Tuple[float, float]:
    """
    Returns the known theoretical min and max fitness values for the
    problem.

    This is used for calculating normalized performance metrics.

    Returns:
        A tuple containing (global_minimum_fitness, global_maximum_fitness).
    """
    return (-self.f_landscape._max_height, -0.0)

is_dynamic()

Indicates whether the objective or constraint landscapes can change.

The composed objective g(x) - f(x) is dynamic if either f or g is dynamic. Similarly, the composed constraint is dynamic if either f or g is dynamic.

Returns:

Type Description
Tuple[bool, bool]

Tuple[bool, bool]: A tuple (is_objective_dynamic, is_constraint_dynamic).

Source code in cilpy/problem/cmpb.py
121
122
123
124
125
126
127
128
129
130
131
def is_dynamic(self) -> Tuple[bool, bool]:
    """Indicates whether the objective or constraint landscapes can change.

    The composed objective `g(x) - f(x)` is dynamic if either `f` or `g`
    is dynamic. Similarly, the composed constraint is dynamic if either
    `f` or `g` is dynamic.

    Returns:
        Tuple[bool, bool]: A tuple `(is_objective_dynamic, is_constraint_dynamic)`.
    """
    return (self._is_f_dynamic, self._is_g_dynamic)

demonstrate_cmpb(name, f_params, g_params)

Helper function to run and print a constrained scenario.

Source code in cilpy/problem/cmpb.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def demonstrate_cmpb(name: str, f_params: Dict[str, Any], g_params: Dict[str, Any]):
    """Helper function to run and print a constrained scenario."""
    print("=" * 60)
    print(f"Demonstration: {name}")
    print("=" * 60)

    # Instantiate the constrained problem
    problem = ConstrainedMovingPeaksBenchmark(f_params, g_params)
    change_frequency = max(
        f_params.get("change_frequency", 0), g_params.get("change_frequency", 0)
    )

    if change_frequency == 0:
        print("Both landscapes are static. No changes will occur.")
        return

    # Define a few points to track their feasibility and fitness over time
    test_points = {
        "Center": np.array([50.0, 50.0]),
        "Corner": np.array([10.0, 10.0]),
    }

    num_changes_to_observe = 5
    total_evaluations = change_frequency * num_changes_to_observe

    for i in range(total_evaluations + 1):
        # 1. Notify the problem that a new iteration is beginning.
        problem.begin_iteration()

        # 2. Evaluate points in the (potentially new) landscape.
        evals = {name: problem.evaluate(pos) for name, pos in test_points.items()}

        if i > 0 and i % (change_frequency / 2) == 0:
            print(
                f"\n--- Environment Change #{i // change_frequency} (at iteration {i}) ---"
            )
            for name, evaluation in evals.items():
                violation = evaluation.constraints_inequality[0]  # type: ignore
                is_feasible = violation <= 0
                print(
                    f"  - Point '{name}': Fitness = {evaluation.fitness:.2f}, "
                    f"Violation = {violation:.2f}, Feasible = {is_feasible}"
                )
    print("\n")