Faites sortir les amphipodes

J'ai déjà écrit un billet sur quelques techniques notables utilisées lors de ma participation à l'AdventOfCode 2021. Ici, j'aimerais revenir sur le problème du jour 23, assez difficile mais résolument fun.

Le but de cet article est de décrire le cheminement suivi, tout en présentant les techniques et algorithmes utilisés.

C'est parti !

Énoncé du problème

Le problème du jour 23 consistait à aider un sympathique groupe de crustacés (des amphipodes) à se ranger dans des chambres.

En l’occurrence, les amphipodes vivent dans un terrier constitué d'un couloir menant à quatre chambres. Il y a quatre types d'amphipodes (A, B, C et D), chacun vivant dans une chambre donnée.

Au début, les amphipodes sont répartis aléatoirement dans les chambres :

#############
#...........#
###B#C#B#D###
  #A#D#C#A#
  #########

On veut aider les amphipodes à regagner leurs chambres respectives :

#############
#...........#
###A#B#C#D###
  #A#B#C#D#
  #########

Les amphipodes peuvent se déplacer sur les cases libres qui leurs sont adjacentes (marquées par un point « . »). Chaque type d'amphipode, en se déplaçant d'une case, consomme une certaine quantité d'énergie (1 pour les A, 10 pour les B, 100 pour les C, 1000 pour les D).

On veut donc trouver quelle séquence de déplacements permet d'atteindre l'objectif (répartir les amphipodes dans leurs chambres) en minimisant la quantité totale d'énergie consommée.

Une approche manuelle

La première chose que j'ai tendance à faire pour approcher ce type de problème est de tenter une approche manuelle. Je dessine quelques étapes au stylo sur un cahier pour bien appréhender les règles de déplacement.

#############
#...B.......#
###B#C#.#D###
  #A#D#C#A#
  #########

Ok, un premier amphipode est sorti de sa tanière…

#############
#...B.......#
###B#.#C#D###
  #A#D#C#A#
  #########

Ah ! un amphipode s'est directement déplacé vers sa chambre d'arrivée.

Certains amphipodes n'ont pas besoin de se déplacer car ils sont déjà à la bonne place.

D'autres sont bien placés mais doivent sortir pour laisser passer ceux qui sont bloqués derrière.

Etc.

Cette première phase permet de bien comprendre toutes les contraintes imposées et de commencer à se faire une représentation mentale du problème.

Comment représenter les données ?

Avec certains problèmes (de l'AdventOfCode ou de LaVieRéelle), il est très facile de déterminer quelle structure de données est appropriée pour représenter le problème.

Or, choisir la bonne façon de représenter les données, c'est déjà résoudre 75% du problème.

Ici, toutefois, la question n'est pas triviale. Comment donc représenter un couloir menant à plusieurs pièces et dans lequel se baladent des crustacés ?

Nous allons devoir déplacer des amphipodes de points de départs à des points d'arrivée. Peut-être faudra-t-il utiliser une recherche de plus court chemin type Dijkstra ?

Dans ce cas, un graphe serait peut-être approprié ?

Ou alors, peut-être qu'un tableau en deux dimensions suffirait à effectuer des exploration via un parcours en largeur ?

Mmm… Il nous faut plus d'infos.

Relire l'énoncé en détail

À ce stade, c'est le moment de relire l'énoncé très en détail.

On s'aperçoit que de nombreuses contraintes réduisent très fortement les possibilités de mouvement pour les amphipodes :

  • les amphipodes ne peuvent pas se dépasser ou se chevaucher ;
  • les amphipodes ne peuvent pas s'arrêter devant la porte d'une chambre ;
  • les amphipodes ne peuvent pas rentrer dans une chambre si elle contient des amphipodes qui ne sont pas à leur place ;
  • une fois sortis dans le couloir, les amphipodes ne peuvent plus bouger sauf pour rentrer directement dans leur chambre de destination.

On s'aperçoit alors que ces règles réduisent très fortement la complexité du problème !

En effet, chaque amphipode va se déplacer très exactement deux fois :

  • une fois pour sortir dans le couloir ;
  • une fois pour rentrer dans leur chambre.

Bien sûr, cela ne s'applique pas aux amphipodes qui n'ont pas besoin de se déplacer du tout.

Choisir l'algorithme approprié

La solution que nous avons commencé à mettre en œuvre manuellement est la suivante :

  1. nous déplaçons un amphipode en notant l'énergie consommée ;
  2. nous effectuons un deuxième déplacement… et un troisième… etc.
  3. si on arrive à ranger les amphipodes dans leur chambre, on note l'énergie consommée, et on revient en arrière pour tester les autres solutions ;
  4. si on est bloqué, on revient en arrière pour tester d'autres combinaisons.

Cette technique — consistant à construire systématiquement toutes les solutions à un problème en itérant sur les solutions partielles — appartient à la famille des algorithmes de backtracking.

Un tel algorithme peut être employé dans les cas suivants :

  • on doit trouver une solution qui permet de respecter un certain nombre de contraintes (chaque amphipode est dans sa chambre) ;
  • il n'existe pas forcément d'ordre entre les différentes solutions (par exemple, ce n'est pas parce qu'on commence par déplacer les amphipodes les plus économes qu'on arrivera à la meilleure solution) ;
  • on peut construire itérativement une solution au problème à partir de solutions partielles (à partir d'un état de départ, je déplace un seul amphipode, ce qui me donne un nouvel état, et je recommence) ;
  • on peut déterminer qu'une solution partielle ne permettra pas d'arriver à une solution (si dans un état donné, plus aucun amphipode ne peut bouger, alors on ne pourra pas les ranger dans leurs chambres).

Première solution basique

À ce stade, nous avons déjà tous les éléments pour construire le squelette de notre solution.

Commençons par coder une API de haut niveau, nous permettant de lancer différents tests.

En l'occurrence, l'énoncé du problème stipule qu'il faut organiser le terrier des crustacés. Eh bien ! pour une solution claire, le point d'entrée sera une classe Burrow (terrier).

test_data = 
burrow = Burrow(test_data)
energy = burrow.organize()
assert energy == 44169


data = 
burrow = Burrow(data)
energy = burrow.organize()
print(energy)

Ici, les données de tests proviennent de l'énoncé même du problème et nous permettent de valider notre solution.

La classe Burrow n'a qu'une seule responsabilité : faire tourner l'algorithme de backtracking.

class Burrow:
    def __init__(self, state):
        self.initial_state = state

    def organize(self):
        """Lance l'algo de backtracking sur l'état initial."""

        energy = self.solve(self.initial_state, 0)
        return energy

    def solve(self, state, cost):
        """Étudie récursivement toutes les solutions possibles."""

        # Nous avons trouvé une solution, retourne le coût actuel
        if is_burrow_organized(state):
            return cost

        # Génère les déplacements possibles et les coûts associés
        # Ensuite, lance l'algorithme de backtracking sur chaque
        # nouvel état
        costs = [math.inf]
        new_states = generate_moves(state)
        while not new_states.empty():
            move_cost, new_state = new_states.get()
            costs.append(self.solve(new_state, cost + move_cost))

        # Si aucune solution n'a été trouvée, la fonction renvoie
        # `math.inf`
        result = min(costs)
        return result

L'algorithme de backtracking est désormais implémenté.

Cela dit, il manque quelques éléments :

  • comment représenter un état (state) ?
  • comment générer la liste des déplacements possibles (generate_moves)
  • comment vérifier si on a trouvé une solution viable (is_burrow_organized) ?

La classe Hallway

La classe Burrow a la responsabilité de manipuler différents états possibles du problème.

À ce stade, j'ai choisi de créer une nouvelle classe Hallway, qui aura pour responsabilité :

  • de représenter un état distinct ;
  • d'implémenter les règles de déplacements des amphipodes.

Cela nous permet d'écrire le code (presque) définitif de la classe Burrow :

class Burrow:
    def __init__(self, state):
        self.initial_state = state

    def organize(self):
        energy = self.solve(self.initial_state, 0)
        return energy

    def solve(self, state, cost):

        # Délègue la gestion des états à la classe Hallway
        hallway = Hallway(state)
        if hallway.is_organized():
            return cost

        # Délègue aussi la génération des nouveaux états
        costs = [math.inf]
        new_states = hallway.generate_moves()
        while not new_states.empty():
            move_cost, new_state = new_states.get()
            solution_cost = self.solve(new_state, cost + move_cost)
            costs.append(solution_cost)

        result = min(costs)
        return result

Retour sur la représentation du couloir

Après mûre réflexion — et aussi j'avoue en ayant trouvé l'inspiration notamment sur ce fil twitter — je me suis aperçu qu'une version textuelle très simple était tout à fait suffisante pour représenter un état donné.

Ainsi, cet état :

#############
#...........#
###B#C#B#D###
  #A#D#C#A#
  #########

…sera représenté ainsi :

..(10).(23).(12).(30)..

Et cet état :

#############
#.B.......CB#
###.#C#.#D###
  #A#D#.#A#
  #########

Ainsi :

.1(.0).(23).(..).(30)21

Chaque parenthèse représente le contenu d'une chambre.

Les amphipodes sont représentés par des chiffres au lieu des lettres. C'est très pratique parce qu'ainsi, l'amphipode 0 doit se trouver dans la chambre 0.

Les points en dehors des parenthèses représentent les cases du couloir.

On remarque qu'on ne représente pas les cases du couloir qui se trouvent à la sortie des chambres. C'est parce que c'est inutile, dans la mesure ou il est interdit à un amphipode de s'arrêter sur une telle case.

Par ailleurs, il est inutile de garder en mémoire si un amphipode s'est déjà déplacé ou pas. En effet, il n'y a que deux types de déplacements possibles :

  1. les déplacements permettant à un amphipode de sortir de sa chambre et…
  2. les déplacements permettant à un amphipode d'aller dans sa chambre.

Avoir une représentation de l'état du problème très concise est important dans la mesure ou nous allons générer récursivement de très nombreux états, et on veut éviter de faire exploser la mémoire.

Début d'implémentation pour la classe Hallway

Vous n'allez pas le croire mais implémenter le squelette de la classe Hallway est vraiment très facile.

import re
from queue import PriorityQueue


# L'état correspondant à un terrier rangé
ORGANIZED_STATE = '..(00).(11).(22).(33)..'

# L'énergie dépensée par chaque type amphipode
ENERGY_COST = [1, 10, 100, 1000]

# L'index du couloir auquel débouche chaque chambre
ROOM_EXIT = [2, 4, 6, 8]


class Hallway:
    def __init__(self, state):
        self.state = state

    def __repr__(self):
        return self.state

    def is_organized(self):
        return self.state == ORGANIZED_STATE

    def generate_moves(self):
        """Retourne tous les déplacements possibles."""
        exit_moves = self.generate_exit_moves()
        enter_moves = self.generate_enter_moves()

        # Les déplacements sont stockés dans une PriorityQueue
        # Cela nous permet de tester en priorité les déplacements
        # les moins coûteux. C'est finalement un
        # raffinement inutile puisque nous exlorerons toutes les
        # possibilités, mais je le garde pour la frime.
        moves = PriorityQueue()
        for cost, new_state in exit_moves + enter_moves:
            moves.put((cost, new_state))

        return moves


    def generate_exit_moves(self):
        pass

    def generate_enter_moves(self):
        pass

En construisant ainsi notre code en raisonnant par api first, nous avons finalement évacué toutes les questions d'intendance et obtenu un code plutôt lisible et ordonné.

Il ne reste plus qu'à aborder ce qui constitue vraiment la spécificité du problème : la génération de mouvements.

Générer les mouvements des amphipodes

Avant tout, ajoutons quelques petites fonctions utilitaires. L'objectif est de nous permettre de raisonner en considérant les chambres et le couloir de manière indépendante.

class Hallway:
    

    def get_hallway(self):
        """Retourne un tableau de case représentant le couloir."""
        return list('.'.join(re.split(r'\([\d\.]+\)', self.state)))

    def get_rooms(self):
        """Retourne un tableau de tableaux représentants les chambres."""
        return list(map(list, re.findall(r'\(([\d\.]+)\)', self.state)))

    def collapse_state(self, hallway, rooms):
        """Recréé une chaîne d'état à partir du couloir et des chambres.

        Comment ? Ce code est tout moche ?! Oui…
        """
        fragments = [
            ''.join(hallway[0:2]),
            '(', ''.join(rooms[0]), ')',
            hallway[3],
            '(', ''.join(rooms[1]), ')',
            hallway[5],
            '(', ''.join(rooms[2]), ')',
            hallway[7],
            '(', ''.join(rooms[3]), ')',
            ''.join(hallway[9:11]),
        ]
        state = ''.join(fragments)
        return state


hallway = Hallway('.1(.0).(23).(..).(30)21')

print(hallway.get_hallway())
['.', '1', '.', '.', '.', '.', '.', '.', '.', '2', '1']

print(hallway.get_rooms())
[['.', '0'], ['2', '3'], ['.', '.'], ['3', '0']]

Faites sortir les amphipodes

Générer la liste de déplacements possibles est finalement relativement « facile ». Par contre, je n'ai pas pris la peine de refactoriser le code qui suit, parce qu'il est 20h et que j'aimerais faire autre chose de ma journée.

La stratégie pour générer les mouvements de sortie est la suivante :

  • pour chaque chambre, on détermine quel amphipode est le plus proche de la sortie ;
  • on détermine ensuite si cet amphipode doit sortir (c'est le cas sauf si la chambre est déjà « rangée ») ;
  • si oui, on détermine tous les emplacements libres dans le couloir autour du point de sortie ;
  • chaque emplacement constitue une destination possible, dont on calcule le coût, et qu'on ajoute à la liste des mouvements possibles.
    def generate_exit_moves(self):
        """Génère les mouvements possibles de sortie.

        Renvoie une liste de tuples de la forme
        (coût, nouvel_état)
        """

        moves = []
        hallway = self.get_hallway()
        rooms = self.get_rooms()

        # For each room…
        for room_index, room in list(enumerate(rooms)):

            # … find amphipod closest to the exit
            amphipod_index, amphipod = self.find_closest_amphipod(room_index)

            # If the room is empty, move to the next room
            if amphipod is None:
                continue

            # The amphipod could stay in place if
            # 1/ it is already in the destination room…
            is_correct_amphipod = room_index == amphipod

            # 2/ and there are not other amphipods below that must move out.
            # Ce code est trop cradingue, mais c'est comme ça
            room_content = room[amphipod_index:]
            expected_content = [f'{room_index}'] * len(room_content)
            room_is_sorted = room_content == expected_content

            # The amphipod does not have to move out
            if is_correct_amphipod and room_is_sorted:
                continue

            # Where does it exits the room?
            exit_index = ROOM_EXIT[room_index]
            possible_destinations = []

            # Check all spaces to the left in the hallway
            i = exit_index
            while i > 0:
                i -= 1
                # The path is blocked
                if hallway[i] != '.':
                    break
                # This is an exit space, cannot stop
                if i in ROOM_EXIT:
                    continue
                possible_destinations.append(i)

            # Check all spaces to the right of the hallway
            i = exit_index
            while i < len(hallway) - 1:
                i += 1
                if hallway[i] != '.':
                    break
                if i in ROOM_EXIT:
                    continue
                possible_destinations.append(i)

            # How much does it cost to move out of the room?
            exit_cost = (amphipod_index + 1) * ENERGY_COST[amphipod]

            for destination in possible_destinations:
                move_cost = abs(destination - exit_index) * ENERGY_COST[amphipod]
                total_cost = exit_cost + move_cost

                # Build the new state
                new_hallway = hallway.copy()
                new_hallway[destination] = str(amphipod)
                new_rooms = deepcopy(rooms)
                new_rooms[room_index][amphipod_index] = '.'

                new_state = self.collapse_state(new_hallway, new_rooms)
                moves.append((total_cost, new_state))

        return moves

    def find_closest_amphipod(self, room_index):
        rooms = self.get_rooms()
        for index, room_content in enumerate(rooms[room_index]):
            if room_content != '.':
                return index, int(room_content)

(Notez que je ne me serais pas permis de publier ça si j'étais payé pour coder :))

Faites entrer les amphipodes

Nous allons maintenant générer les mouvements d'entrée. La stratégie sera la suivante :

  1. on parcourt toutes les cases du couloir contenant un amphipode ;
  2. pour chaque amphipode, on vérifie si la chambre est « emménageable », c'est à dire si elle contient de l'espace libre et des amphipodes du bon type ;
  3. on vérifie aussi si le chemin est dégagé entre l'amphipode et sa chambre ;
  4. on calcule le coût et génère l'état, vous connaissez la routine.
    def generate_enter_moves(self):
        """Génère les mouvements possibles d'entrées."""

        moves = []
        hallway = self.get_hallway()
        rooms = self.get_rooms()

        # Check every hallway space for amphipod
        for hallway_index, hallway_content in list(enumerate(hallway)):

            if hallway_content == '.':
                continue

            amphipod = int(hallway_content)

            # Can the room be moved into?
            room = rooms[amphipod]
            room_content = ''.join(room)
            sorted_re = r'\.*' + str(amphipod) + r'*'
            room_is_sorted = re.fullmatch(sorted_re, room_content)
            if not room_is_sorted:
                continue

            # Is the hallway to the room clear?
            destination = ROOM_EXIT[amphipod]
            min_index = min(destination, hallway_index)
            max_index = max(destination, hallway_index)
            hallway_section = hallway[min_index + 1:max_index]
            obstacles = list(filter(lambda x: x != '.', hallway_section))
            if obstacles:
                continue

            # So, the amphipod can enter the room, yeah \o/
            move_cost = abs(hallway_index - destination) * ENERGY_COST[amphipod]
            last_space = ''.join(room).rfind('.')
            enter_cost = (last_space + 1) * ENERGY_COST[amphipod]
            total_cost = move_cost + enter_cost

            # Build the new state
            new_hallway = hallway.copy()
            new_hallway[hallway_index] = '.'

            new_rooms = deepcopy(rooms)
            new_rooms[amphipod][last_space] = str(amphipod)

            new_state = self.collapse_state(new_hallway, new_rooms)
            moves.append((total_cost, new_state))

        return moves

J'ai honnêtement un peu honte de ce code, mais pas le temps de refactoriser.

Et voilà ! Nous avons tout ce qu'il faut. On lance le code et oh miracle, ça tourne !

Par contre, heu… c'est long…

Limiter l'explosion combinatoire

Mmm… Le code donné fonctionne, mais il tourne pendant bien trop longtemps ! Pourquoi donc ?

Pourtant…

1/ Le nombre d'états distincts du problème est relativement limité. En effet, le terrier est composé de 19 cases au total (11 cases de couloir et 4 * 2 cases de chambres), et chaque case peut prendre 5 valeurs possibles (vide ou chaque type d'amphipode).

À l'arrache, cela nous donne donc aux alentours de 19 ^ 5 = 2476099 états possibles, ce qui reste raisonnable.

2/ Chaque solution nécessite au maximum seize déplacements (chaque amphipodes ne peut se déplacer que deux fois).

À première vue, la surface d'exploration n'est donc pas gigantesque ?!

Oui, mais…

L'enchaînement de mouvements qui permet d'arriver à un même état est très élevé.

Pour s'en convaincre, on peut vérifier que les deux enchaînements suivants :

..(00).(11).(22).(33).. ->
0.(.0).(11).(22).(33).. ->
0.(.0).(11).(22).(.3).3

et…

..(00).(11).(22).(33).. ->
..(00).(11).(22).(.3).3 ->
0.(.0).(11).(22).(.3).3

…permettent d'arriver au même état intermédiaire. Pourtant, notre exploration systématique étudiera ces deux solutions de manière indépendante, ce qui nous amènera à effectuer deux fois les mêmes calculs.

On se retrouve donc avec un problème typique de programmation dynamique : petit nombre de sous-état uniques, mais nombre important de chemins pour les explorer.

J'ai déjà parlé de programmation dynamique dans mon dernier billet sur l'AoC, je vous laisse y jeter un œil si vous avez envie d'en savoir un peu plus.

Jouer à cache-cache

La fonction principale de la classe Burrow calcule le coût minimal des solutions possibles à partir d'un état et d'un coût donnés.

Une méthode très simple s'offre à nous pour éviter d'effectuer deux fois les mêmes calculs : il suffit de mettre en cache son résultat.

from functools import lru_cache

class Burrow:
    

    @lru_cache(maxsize=None)
    def solve(self, state, cost):
        

Grâce à cette simple optimisation, l'algo tourne en une trentaine de secondes sur ma machine.

Sans l'optimisation… je ne sais pas. Au moins de longues minutes… quelques heures ? des jours ? Je n'ai pas eu la patience d'attendre la réponse.

Nous pouvons faire mieux

Notre méthode de cache est trop grossière.

En effet, le décorateur lru_cache cache le résultat d'une fonction pour un jeu de paramètres donnés.

Cela signifie que si on arrive à un même état par deux chemins ayant des coûts différents, le résultat ne sera pas mis en cache.

Pour optimiser le bazar, nous n'allons passer par un cache maison en ne conservant qu'une seule clé : l'état.

Il nous faudra être attentif à une chose : puisqu'on peut arriver à un même état par deux chemins de coûts différents, il faudra prendre en compte cette différence en renvoyant la réponse.

Le code !

class Burrow:
    def __init__(self, state):
        self.initial_state = state
        self._cache = {}


    def solve(self, state, cost):

        # On vérifie si on a déjà calculé une solution pour
        # l'état donné
        if state in self._cache:
            cached_cost, cached_result = self._cache[state]

            # Dans le cache, on stocke aussi le coût de départ
            # ayant servi a calculer la solution.
            # Si on arrive au même état via un chemin moins coûteux,
            # ou plus coûteux, il faut prendre en compte
            # cette différence en renvoyant la valeur cachée.
            cost_delta = cached_cost - cost
            return cached_result - cost_delta

        # Solution trouvée !
        hallway = Hallway(state)
        if hallway.is_organized():
            # On n'oublie pas de remplir le cache
            self._cache[state] = cost, cost
            return cost

        costs = [math.inf]
        new_states = hallway.generate_moves()
        while not new_states.empty():
            move_cost, new_state = new_states.get()
            solution_cost = self.solve(new_state, cost + move_cost)
            costs.append(solution_cost)

        result = min(costs)

        # Ici non plus, n'oublions pas de remplir le cache.
        self._cache[state] = cost, result
        return result


data = '..(32).(32).(01).(01)..'
burrow = Burrow(data)
energy = burrow.organize()
print(energy)

Ce délicat algorithme tourne désormais en moins de 2 secondes sur ma machine.

La partie 2

La partie 2 du problème n'a nécessité absolument aucun changement dans le code utilisé, à part la mise à jour des états de départ. L'algo crache la bonne réponse en moins de 8 secondes.

Conclusion

Le problème du jour 23 était relativement difficile mais assez fun à traiter. J'espère vous avoir diverti via ce petit cheminement de résolution.

À la prochaine.