# Parallel Breadth-First Search (pBFS)
## Edric Svarte

Graphs are incredibly useful data structures used to represent networks. In practice, graphs tend to be large, and so graph algorithms must be efficient. Here, we explore a scaleable algorithm for solving the *single-source shortest path* problem.

Suppose we have a directed graph with weighted edges. How do we find the shortest paths between a given vertex and all other vertices? Dijkstra's algorithm is a promising candidate. Unfortunately, this algorithm cannot be parallelized. On the other hand, breadth-first search (BFS) can easily be modified to run in parallel. You may recall that sequential BFS may fail on graphs with weighted edges. On the other hand, parallel BFS can handle these graphs. Here, we explore the parallel algorithm and provide a simple implementation in PySpark.

First, we load Spark.

In [None]:
from math import inf, isinf

! apt-get update -qq > /dev/null
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://dlcdn.apache.org/spark/spark-3.3.3/spark-3.3.3-bin-hadoop3.tgz
! tar xzf spark-3.3.3-bin-hadoop3.tgz
! pip install -q findspark

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['SPARK_HOME'] = '/content/spark-3.3.3-bin-hadoop3'

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf

spark_conf = SparkConf().setAppName('YourTest').setMaster('local[*]')
sc = SparkContext.getOrCreate(spark_conf)

## Setup

Consider the graph below. Let's suppose we want to find the minimum distance between the vertex labelled $A$, and all other nodes.

pBFS_drawing1.svg

What is the minimum distance between nodes $A$ and $C$? The correct answer is 4. In particular, the path $A$ -> $B$ -> $C$ is of length 4. However, regular BFS would explore $C$ during the first iteration and mark this node as visited. The algorithm would incorrectly return 5 as the minimum distance between $A$ and $C$.

The pink edges in the graph below form a minimum spanning tree of the original graph. Following the pink edges, we can find the minimum distance between $A$ and any other node.

pBFS_drawing2.svg

Let's represent this graph as an adjacency list, in particular:


* The graph is stored as a list of tuples
* Each tuple is a key-value pair
* The key is a unique vertex/node ID
* The value is a list of tuples. Each tuple within this list stores: (ID of out-neighbour, edge weight)


In [None]:
toy_graph = [('A', [('B', 3), ('C', 5), ('D', 1), ('F', 8)]),
             ('B', [('C', 1), ('G', 2)]),
             ('C', [('A', 5), ('B', 1)]),
             ('D', [('E', 2)]),
             ('E', [('F', 3), ('I', 3)]),
             ('F', []),
             ('G', [('H', 5), ('I', 10)]),
             ('H', []),
             ('I', [('G', 10), ('H', 2)])]

Now that we have our graph, let's convert it to a Resilient Distributed Dataset (RDD): the Spark distributed-memory abstraction.

In [None]:
toy_graph = sc.parallelize(toy_graph)

## The Algorithm

The RDD, *toy_graph*, will serve as the input to the pBFS algorithm. However, a slightly more complicated graph representation is required while running pBFS. Let's augment the RDD as follows:

* Currently, the value for each key-value pair is a list of out-neighbours and edge weights
* Let's further compound this value by storing: (distance to source node, list of out-neighbours)
* Each node's distance to the source is initially set to infinity, save the source node, whose distance to the source is simply zero

Recall that we set $A$ to be the source node. The augmented RDD should look something like this (do not run the following cell, the main function will perform the augmentation):

In [None]:
[('A', (0, [('B', 3), ('C', 5), ('D', 1), ('F', 8)])), ('B', (inf, [('C', 1), ('G', 2)])), ...]

Now, what does one iteration of pBFS look like? Consider an arbitrary node $i$, with $n_i$ out-neighbours. During each iteration, every node $i$ outputs $n_i + 1$ key-value pairs:

* 1 key-value pair corresponds to: ($i$, distance from $i$ to source node)
* For each out-neighbour of $i$, the algorithm emits: (out-neighbour of $i$, distance from $i$ to source **plus** the weight of the edge from $i$ and the out-neighbour in question)
* A total of $n_i + 1$ key-value pairs are emitted

For example, consider the node labelled $B$ in the code above. The algorithm emits:
* (B, inf)
* (C, inf + 1), (G, inf + 2)

It is always difficult to translate code into words. Instead, let's see the code. The code chunk below details a simple helper function. This helper function will be applied to one tuple in the augmented graph at a time (in parallel of course).

In [None]:
def process_node(node):
    '''
    process_node returns a list of key-value pairs corresponding to nodes
        and their distances from the source.

    Inputs:
        node: A tuple (key-value pair). The key should be a string corresponding
            to a unique node ID. The value should be a tuple. In the first
                position, a float or integer should indicate the distance
                    between the node in question and the source. A list of the
                        out-neighbours of the node in question should be stored
                            in the second position.

    Outputs:
        A list of key-value pairs. Each key-value pair stores a node ID as the
            key, and the current distance from the source as the value.
    '''
    node_id, (distance_to_node, adj_list) = node
    node_distance_to_source = [(node_id, distance_to_node)]
    neighbours_distance_to_source = [(neighbour, edge_weight + distance_to_node) for neighbour, edge_weight in adj_list]

    return node_distance_to_source + neighbours_distance_to_source

The function above is a *mapper*. We now require a *reducer*. Recall that each node will output tuples containing node IDs and distances. The reducer is a one-line operation. Indeed, the reducer takes the minimum of all values (distances) corresponding to each unique key.

Once we have the updated distances (updated via the minimum operation), we update the augmented graph by joining the updated distances to the non-augmented graph.

Finally, we stop the algorithm when the change in distances drops below a specified threshold. This convergence test allows the algorithm to handle graphs with weighted edges. The main function is given below.

In [None]:
def pBFS(graph, source_node_id, partition_count = 10, tol = 1e-5):
    '''
    pBFS executes parallel breadth-first search on a given graph.

    Inputs:
        graph: An RDD storing the graph as an adjacency list.
        source_node_id: A string specifying the source node's ID. The source node
            must be a node in the given graph.
        partition_count: An integer specifying the number of RDD partitions (default = 10).
        tol: A float specifying the tolerance for the convergence test (default = 1e-5).

    Outputs:
        distances_new: A pair RDD specifying the distance between the source node
            and all other nodes.
    '''
    distances_prev = graph.map(lambda x: (x[0], 0 if x[0] == source_node_id else inf), partition_count)
    augmented_graph = distances_prev.join(graph, partition_count)

    converged = False
    while not converged:

        distances_new = augmented_graph.flatMap(process_node).reduceByKey(min, partition_count)
        augmented_graph = distances_new.join(graph, partition_count)

        # Convergence test
        distances_diff = distances_new.join(distances_prev).values().map(lambda x: inf if isinf(x[0]) or isinf(x[1]) else abs(x[0] - x[1]))
        diff_max = distances_diff.takeOrdered(1, lambda x: -x)[0]
        converged = diff_max < tol

        distances_prev = distances_new

    return distances_new

In [None]:
distances = pBFS(graph = toy_graph, source_node_id = 'A')
distances.collect()

[('A', 0),
 ('H', 8),
 ('I', 6),
 ('F', 6),
 ('B', 3),
 ('D', 1),
 ('G', 5),
 ('C', 4),
 ('E', 3)]

Inspection reveals that these distances are correct. Notice that the algorithm is slow for small graphs. Indeed, the above computations could be done faster in Python. However, graphs may be massive, and so parallel processing quickly becomes essential.

This style of algorithm (message-passing) can be extended to suit other needs. For instance, the original PageRank algorithm can be implemented this way.

## The Shortest Path

Now, what if we want the paths themselves, and not just their total lengths? Let's modify the algorithm above to return the actual paths. We only need to make a few modifications to the following stages.

**Preprocessing**

In the original algorithm, the augmented graph stores the following as *values*: <br> (distance to source node, list of out-neighbours)

The augmented graph now stores: <br> ((distance to source node, path from source node), list of out-neighbours) <br> Each path is initially set to the empty string.


**Mapper**

Recall the original mapper, for each node $i$: <br>
* The mapper outputs 1 key-value pair corresponding to: ($i$, distance from $i$ to source node)
* For each out-neighbour of $i$, the algorithm emits: (out-neighbour of $i$, distance from $i$ to source **plus** the weight of the edge from $i$ and the out-neighbour in question)
* A total of $n_i + 1$ key-value pairs are emitted

Instead, we now have: <br>
* The mapper outputs 1 key-value pair corresponding to: ($i$, (distance from $i$ to source node, path from source to $i$))
* For each out-neighbour of $i$, the algorithm emits: (out-neighbour of $i$, (distance from $i$ to source **plus** the weight of the edge from $i$ and the out-neighbour in question, path from source to out-neighbour in question))
* A total of $n_i + 1$ key-value pairs are emitted

**Reducer**

The change here is minor. The reducer now receives a compound value, namely: (distance from source, path from source). The reducer simply picks the tuple whose distance from the source is smallest.

The rest of the algorithm is nearly identical. The mapper is shown below.

In [None]:
def process_node_with_path(node):
    '''
    process_node_with_path returns a list of key-value pairs corresponding
        to nodes and their distances from the source, along with paths from
            the source.

    Inputs:
        node: A tuple (key-value pair). The key should be a string corresponding
            to a unique node ID. The value should be a tuple. In the first
                position, another tuple stores a float or integer indicating
                    the distance between the node in question and the source,
                        as well as a string indicating the path between the node
                            and the source. A list of the out-neighbours of the
                                node in question should be stored in the second
                                    position.

    Outputs:
        A list of key-value pairs. Each key-value pair stores a node ID as the
            key. The value is a tuple specifying the current distance from the
                source as the value, and the corresponding path.
    '''
    node_id, ((distance_to_node, path), adj_list) = node
    node_distance_to_source = [(node_id, (distance_to_node, path))]
    neighbours_distance_to_source = [(neighbour, (edge_weight + distance_to_node, path + node_id)) for neighbour, edge_weight in adj_list]

    return node_distance_to_source + neighbours_distance_to_source

The helper function below formats the results, and is not essential to understanding the algorithm.

In [None]:
def format_results(node_id, distance_to_source, path):
    '''
    format_results returns the results of the pBFS algorithm in a readable
        format.

    Inputs:
        node_id: A string corresponding to a node in the input graph.
        distance_to_source: A float or int corresponding to the minimum
            distance between the node bearing ID node_id, and the source node.
        path: A string correspinding to the path of minimum distance between
            the node bearing ID node_id, and the source node.

    Outputs:
        node_id: A string corresponding to a node in the input graph.
        distance_string: A float or int corresponding to the minimum distance
            between the node bearing ID node_id, and the source node.
        path_string: A string corresponding to the path of minimum distance
            between the node bearing ID node_id, and the source node.
    '''
    distance_string = f'Path length: {distance_to_source}'
    path_string = ''.join([x + ' -> ' for x in list(path)]) + node_id
    path_string = f'Path: {path_string}'

    return node_id, distance_string, path_string

In [None]:
def pBFS_with_path(graph, source_node_id, partition_count = 10, tol = 1e-5):
    '''
    pBFS_with_path executes parallel breadth-first search on a given graph, and
        returns the paths corresponding to a minimum spanning tree of the graph.

    Inputs:
        graph: An RDD storing the graph as an adjacency list.
        source_node_id: A string specifying the source node's ID. The source node
            must be a node in the given graph.
        partition_count: An integer specifying the number of RDD partitions (default = 10).
        tol: A float specifying the tolerance for the convergence test (default = 1e-5).

    Outputs:
        distances_new: A RDD specifying the distance between the source node
            and all other nodes, and the corresponding path.
    '''

    distances_prev = graph.map(lambda x: (x[0], (0 if x[0] == source_node_id else inf, '')), partition_count)
    augmented_graph = distances_prev.join(graph, partition_count)

    converged = False
    while not converged:

        distances_new = augmented_graph.flatMap(process_node_with_path).reduceByKey(lambda x, y: min(x, y, key = lambda z: z[0]), partition_count)
        augmented_graph = distances_new.join(graph, partition_count)

        # Convergence test
        distances_diff = distances_new.join(distances_prev).values().map(lambda x: inf if isinf(x[0][0]) or isinf(x[1][0]) else abs(x[0][0] - x[1][0]))
        diff_max = distances_diff.takeOrdered(1, lambda x: -x)[0]
        converged = diff_max < tol

        distances_prev = distances_new

    return distances_new.map(lambda x: format_results(x[0], *x[1]))

In [None]:
distances_and_paths = pBFS_with_path(graph = toy_graph, source_node_id = 'A')
distances_and_paths.collect()

[('A', 'Path length: 0', 'Path: A'),
 ('H', 'Path length: 8', 'Path: A -> D -> E -> I -> H'),
 ('I', 'Path length: 6', 'Path: A -> D -> E -> I'),
 ('F', 'Path length: 6', 'Path: A -> D -> E -> F'),
 ('B', 'Path length: 3', 'Path: A -> B'),
 ('D', 'Path length: 1', 'Path: A -> D'),
 ('G', 'Path length: 5', 'Path: A -> B -> G'),
 ('C', 'Path length: 4', 'Path: A -> B -> C'),
 ('E', 'Path length: 3', 'Path: A -> D -> E')]

Once again, inspection reveals that these paths are correct.