Assignment 6: Single-Source Shortest Path (SSSP) due 10am December 2

In this assignment you will implement single-source short path (SSSP) with weighted paths in PySpark (with RDDs) and using DuckDB. This will give you a sense of how graph algorithms "look" in different approaches.

Downloading Data

Download graph-data.tar.gz. The file is 29 MB and has an MD5 checksum of 122b471c1aad2eb7afd09be35bc16f7e.

Uncompress the data into graph-data/, and inside the directory you will see ny and ne. These graphs correspond to the road networks of New York City and Northeast USA from here, respectively.

The edges.csv file contains (src, dst, weight). For the ny graph:

% head graph-data/ny/edges.csv
src,dst,weight
1,2,803
2,1,803
3,4,158
4,3,158
5,6,774
6,5,774
7,8,1531
8,7,1531
9,10,1673

The above says you can get from node 1 to node 2 with a weight (i.e., cost) of 803, and so on. This should be pretty straightforward to understand.

Data Science with PySpark

Let us begin first with some basic analysis of the graphs. A Jupyter notebook that contains some basic questions is here. Start by answering those question.

SSSP with PySpark

Now, on to implementations.

Your first task is to implement Single-Source Shortest Path (SSSP) with weighted paths using RDDs. We provide a stub run_sssp_rdd.py for you to get started. To guide you through the implementation and to make sure you remain on the right track, you must use this stub code. Please do not change anything other than the parts specified in the stub code.

Here is how we will run run_sssp_rdd.py:

mkdir -p reachable/ny-src-6-iter000
echo "6,0" > reachable/ny-src-6-iter000/source.csv

spark-submit run_sssp_rdd.py --edges graph-data/ny/edges.csv --source reachable/ny-src-6 --iterations 20

We start by initializing reachable/ny-src-6-iter000 with a single entry: This says that node 6 is the source node, with a distance of 0. When you run the Spark job, you should see something like this (all the way to iteration 20):

Iteration 1: reading from reachable/ny-src-6-iter000, writing to reachable/ny-src-6-iter001
Iteration 2: reading from reachable/ny-src-6-iter001, writing to reachable/ny-src-6-iter002
...

We start with just the source node 6 in iter000. For each iteration, we expand the frontier by one hop, and keep track of all the reachable nodes as (source, distance) pairs. This is accomplished by the function expand_frontier, which you have to implement. In the Python stub, we provide code for loading edges_rdd.

Once you have written the implementation, run it on the ne graph with source node 6, as follows:

mkdir -p reachable/ne-src-6-iter000
echo "6,0" > reachable/ne-src-6-iter000/source.csv

spark-submit run_sssp_rdd.py --edges graph-data/ne/edges.csv --source reachable/ne-src-6 --iterations 20

You can then collect the reachable nodes at iteration 20, as follows:

cat reachable/ny-src-6-iter020/* | sort -t ',' -k 2,2n -k 1,1 > spark.ny-src-6.iter020.out
cat reachable/ne-src-6-iter020/* | sort -t ',' -k 2,2n -k 1,1 > spark.ne-src-6.iter020.out

Now start from source node 1024 on the two graphs, as follows:

mkdir -p reachable/ny-src-1024-iter000
echo "1024,0" > reachable/ny-src-1024-iter000/source.csv

spark-submit run_sssp_rdd.py --edges graph-data/ny/edges.csv --source reachable/ny-src-1024 --iterations 20

cat reachable/ny-src-1024-iter020/* | sort -t ',' -k 2,2n -k 1,1 > spark.ny-src-1024.iter020.out

mkdir -p reachable/ne-src-1024-iter000
echo "1024,0" > reachable/ne-src-1024-iter000/source.csv

spark-submit run_sssp_rdd.py --edges graph-data/ne/edges.csv --source reachable/ne-src-1024 --iterations 20

cat reachable/ne-src-1024-iter020/* | sort -t ',' -k 2,2n -k 1,1 > spark.ne-src-1024.iter020.out

To recap, by this point you should have:

SSSP with DuckDB

Next, implement Single-Source Shortest Path (SSSP) with weighted paths using DuckDB. We provide a stub run_sssp_duckdb.py for you to get started. To guide you through the implementation and to make sure you remain on the right track, you must use this stub code. Please do not change anything other than the parts specified in the stub code.

The DuckDB implementation has the same general structure, expect with tables holding the reachable nodes and distances. You should be able to issue the following command to run with source node 6 on the ny graph:

python run_sssp_duckdb.py --edges graph-data/ny/edges.csv --source 6 --output duckdb.ny-src-6.iter020.out --iterations 20

You should see something like this (all the way to iteration 20):

Iteration 1: reading from reachable000, writing to reachable001
Iteration 2: reading from reachable001, writing to reachable002
...
Iteration 19: reading from reachable018, writing to reachable019
Iteration 20: reading from reachable019, writing to reachable020
-> wrote output to duckdb.ny-src-6.iter020.out

Most of the work is done by the expand_frontier function, which parallels the identically named function in the RDD implementation. Implement this function. Note that expansion of the frontier at each hop must be performed with a single SQL query.

Run the following, so you have DuckDB counterparts to the runs with the RDD implementation above.

python run_sssp_duckdb.py --edges graph-data/ne/edges.csv --source 6 --output duckdb.ne-src-6.iter020.out --iterations 20

python run_sssp_duckdb.py --edges graph-data/ny/edges.csv --source 1024 --output duckdb.ny-src-1024.iter020.out --iterations 20
python run_sssp_duckdb.py --edges graph-data/ne/edges.csv --source 1024 --output duckdb.ne-src-1024.iter020.out --iterations 20

Now you will be able to compare the output of the PySpark implementation with the DuckDB implementation.

Assignment Submission

Procedure is the same as previous assignments. Use this link to create an assignment repo for submission.

In the assignment repo, for the "Data Science with PySpark" part, enter your answers in assignment6.ipynb. Make sure the notebook is committed to the repo (with the answers filled out).

In addition, make sure the following (from above) are committed into your repo:

Submit the assignment by committing all requested files and pushing your repo (with the answers filled out in the notebook) back to origin.

Grading Scheme

What does "following instructions" mean? These are "free points" if you follow the instructions provided in this assignment. These points are to handle the scenario where all your answers are correct, but you did not follow the instructions and that caused us to go out of our way to fix your submission so that it conforms to the instructions. (For example, you removed the ids that we used for tracking, which would make it much more difficult to grade.) In these and other related cases, we will dock points from this category.

Total: 75 points

Back to top