Distributed MapReduce Algorithm and Its Go Implementation
What is Distributed Computing?
“Distributed computing is a field of computer science that studies distributed systems. A distributed system is a system whose components are located on different networked computers, which communicate and coordinate their actions by passing messages to one another from any system. The components interact with one another in order to achieve a common goal.”[1]
In the 1970s-1980s, Distributed systems were just a research topic at universities. After very large web systems become popular, lots of practitioners inside the software industry, started work on this topic.
There are some advantages and disadvantages of distributed computing.
Advantages
- Reliability, high fault tolerance
- Scalability
- Flexibility
Disadvantages
- Difficult troubleshooting
- Partial failure
- Concurrency
Making your system fault-tolerant is a more complicated process than a phrase. You have to think and handle very different cases because partial failure occurs anytime. Partial failure makes possible error set enormously bigger than normal. When you need concurrent programming, debugging becomes a painful process. Possible bugs increased also, time allocation per bug increased. So if your system does not require this approach, centralized systems are enough for you. On the other hand, distributed systems will be inevitable in some conditions such as:
- No tolerance for downtime
- Very high load
- Huge data traffic
- Activity varies commonly
In this article, I will demonstrate one distributed system use case which is called MapReduce. I will describe briefly MapReduce and then how I implemented MapReduce with Go.
MapReduce
“MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key”[3]. Many real-world use cases can be solved with this model. You can find it at the referenced link. Basically, it depends on two functions like below:
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
Let me explain with the most known use area of MapReduce is a Word counting system. Input files are split up. The master node creates multiple worker nodes to map these files. The same words were put in the same intermediate file. After the mapping phase, the master node creates multiple reducing workers. The final output is the count of each word that is inside the files. I will show the sequential approach’s code below and finally how I implemented it distributed.
Developing distributed MapReduce is a part of MIT 6.824: Distributed Systems class’ lab projects. So, base codes were taken from that lab. And I completed the project. I am also a learner, the below part shows what I learned so far.
PS: I am not an MIT student, I found the course while searching. I recommend everybody to learn this topic.
MrWorker, MrMaster, MrSequentials are at “ /src/main” folder.
The distributed part is located “/src/mr” folder which we need to implement
Also “src/mrapps” folder contains different types of map&reduce functions.
For example wc.go contains a map and reduce function for word counter.
Sequential MapReduce is like below:
The mapping and reducing process were done inside one class.
But this node stops working everything stops. You will not complete the process.
Let’s start with how to implement distributed MapReduce with Go.
We need to implement Master, RPC, Worker files inside src/mr folder.
We have already MrMaster file that creates the Master node whose role being master as stated at MapReduce architecture above. MrWorker file creates Worker nodes. Testing scripts use MrMaster and MrWorker files to create required nodes.
While writing a fault-tolerant, distributed MapReduce algorithm with Go, we will learn lots of important concepts of Go such as channels, goroutines, mutex, etc.
Predefined Conditions: The master node gives the tasks to workers. It will wait for each worker 10 seconds. If any worker could not finish their job on time, the master will reassign this job to another worker. So the job will be completed even workers crash etc.
NotStarted, Started, Finished are used for task status.
MapTask consists of filename and index
We have 2 channels to control tasks.
Master struct has information about status, files, etc.
The existing MrMaster calls the MakeMaster function of the Master struct to create the Master node. I initiated status maps and others inside this function. Also, I send each file to our maptasks channel with the filename and its index. All the map tasks will be assigned with respect to this channel. Channel is used for concurrent programming with Go.[4]
In our system, all communication between master and worker will be handle with RPC.[5]
After seeing how the master was created, let’s look, worker creation and how a worker gets a task from the master.
Worker.go
MrWorker calls worker function of Worker struct. The worker asks the Master to the current task until the master returns no task. Master either returns a map task or a reduce task. CallMaster is a simple RPC call. I will not describe the whole RPC process. In the code base, there is a file called RPC.go which consists of args and reply structs. There is a call function inside Worker.go is responsible for RPC calls. You can find these details from the GitHub link.
Also while creating a worker node, map and reduce functions are injected via Go’s plugin mechanism. For example below code inject wordcounter’s map and reduce function. So inside our worker code, we can use mapf and reducef functions injected from another file. There are some ready files inside mrapps folder to test our project.
$go build -buildmode=plugin ../mrapps/wc.go
$go run mrworker.go wc.so
crash.go is an important file that adds random crashes and waiting to check fault tolerance
Worker’s mapping code
It maps input with injected map function for example inside wc.go map function is like below. Words extracted from the content. mapf function located above is like this because of plugin injection.
When all mappings are ready, we need to divide these words into nReduce * file. You can see that intermediate files are more than 1.Then each reduce function collects a set of files and finishes the reducing part. Index of these files found with a hash function. The correct index was chosen via the hash function.
for the index 0 of map task for file0, if nReduce =3
mr-0–0, mr-0–1, mr-0–2 files will be created. Then for example,
apple will be put into mr-0–0, banana mr-0–1 etc.
Also, index 1 of map task does the same process for file1,
apple will be put mr-1–0, banana mr-1–1 etc.
All apples will be put into mr-*-0, bananas mr-*-1. So counting will work perfectly.
All intermediate files will be notified with NotifyMaster function.
Worker’s reducing code
Let’s look at how Master handles the task distribution.
If any Map task exists inside maptasks, the master will take this task from a channel and assign it to a worker.
While changing status I used mutex lock
m.RWMutexLock.Lock()
m.mapTaskStatus[mapTask.filename] = Started
m.RWMutexLock.Unlock()
Before returning to the worker, I started a goroutine to monitor the worker if it would finish its job in 10 secs. go m.watchWorkerMap(mapTask)
When a worker finishes the task, it will send a notification to the master which stated the worker’s code part(NotifyMapSuccess function). Map task status of that file was made finished with this notification. After 10 sec, the task status is still not finished, this task was put to maptasks again and the related file’s status becomes NotStarted.
If all mapping tasks are finished, then reduce task adds into reducetasks channel. After that point master will assign reduce tasks to workers.
Again reduce tasks will be monitored like below:
Again a reduce task will not completed in 10 sec, reduce task was put channel again and the status will be NotStarted.
If m.reduceFinished becomes true, then MrMaster will finish the whole process, because it's looking master’s done function.
How to test it
At src/main location$sh ./test-mr.sh
*** Starting wc test.
2021/03/09 15:34:19 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- wc test: PASS
*** Starting indexer test.
2021/03/09 15:34:24 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- indexer test: PASS
*** Starting map parallelism test.
2021/03/09 15:34:27 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- map parallelism test: PASS
*** Starting reduce parallelism test.
2021/03/09 15:34:34 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- reduce parallelism test: PASS
*** Starting crash test.
2021/03/09 15:34:42 rpc.Register: method "Done" has 1 input parameters; needs exactly three
--- crash test: PASS
*** PASSED ALL TESTS
crash test is using crash.go file which includes some random crashes and waiting. You can put some logs to follow status maps, some indexes become Started and then NotStarted again.
My code passed all tests. But there are some problems inside the code. The problem is race condition. There are some areas that shared variables might be accessed without locking. With go analyzing the race condition is easy. Simple build with code race parameter. There is an option inside test-mr.sh file to open detecting race conditions. So I opened this option.
RACE=-race
All tests still passed by there are some warnings.
==================
WARNING: DATA RACE
Read at 0x00c000220060 by goroutine 67:
github.com/yunuskilicdev/distributedsystems/src/mr.(*Master).NotifyIntermediateFile()
/Users/ykdev/go/src/github.com/yunuskilicdev/distributedsystems/src/mr/master.go:110 +0xf8
Previous write at 0x00c000220060 by goroutine 65:
github.com/yunuskilicdev/distributedsystems/src/mr.(*Master).NotifyIntermediateFile()
/Users/ykdev/go/src/github.com/yunuskilicdev/distributedsystems/src/mr/master.go:110 +0x1f5
==================
Stated function:
After locks added:
Another race condition warning:
==================WARNING: DATA RACEWrite at 0x00c0001ce1b9 by goroutine 50:github.com/yunuskilicdev/distributedsystems/src/mr.(*Master).NotifyReduceSuccess()/Users/ykdev/go/src/github.com/yunuskilicdev/distributedsystems/src/mr/master.go:146 +0x244...Previous read at 0x00c0001ce1b9 by main goroutine:github.com/yunuskilicdev/distributedsystems/src/mr.(*Master).Done()/Users/ykdev/go/src/github.com/yunuskilicdev/distributedsystems/src/mr/master.go:171 +0xdc==================
Stated function:
After locks:
Now race conditions also fixed.
That all for now. When I learned more, I will try to publish it.
Resources:
[1] — https://en.wikipedia.org/wiki/Distributed_computing
[2] — https://computernetworktopology.com/distributed-computing/
[3] — http://nil.csail.mit.edu/6.824/2020/papers/mapreduce.pdf