I skipped over “Chapter 2: Semaphores” because it doesn’t contain exercises. Nevertheless, looking back at it, I think it’s worth paying it a visit with Go in mind.
The Little Book of Semaphores offers the following properties as a definition for a semaphore:
When you create the semaphore, you can initialize its value to any integer, but after that the only operations you are allowed to perform are increment (increase by one) and decrement (decrease by one). You cannot read the current value of the semaphore.
When a thread decrements the semaphore, if the result is negative, the thread blocks itself and cannot continue until another thread increments the semaphore.
When a thread increments the semaphore, if there are other threads waiting, one of the waiting threads gets unblocked.
In Go terms, a semaphore is a type satisfying this interface:
type Semaphore interface {
decrement()
increment()
}
plus some way to initialize the semaphore to some specific value.
It’s tempting to try this:
type Semaphore int
func NewSemaphore(n int) *Semaphore {
// ...
}
func (s *Semaphore) decrement() {
// ...
}
func (s *Semaphore) increment() {
// ...
}
The counter must be manipulated atomically, and sync/atomic
provides
such operations.
The increment
method is superficially simple: increment the counter
and signal the blocked goroutines.
The decrement
method is simple as well: check the counter’s value, and
if decrementing it would cause it to be negative, block until it becomes
positive.
The signaling and blocking are the tricky parts, as you want to avoid a
busy loop. The standard library offers sync.Cond
, described as:
Cond implements a condition variable, a rendezvous point for goroutines waiting for or announcing the occurrence of an event.
Cond
has a Wait()
method that causes the calling goroutine to
suspend execution until another goroutine calls the Signal()
method,
which wakes up one of the waiting goroutines. This does the trick!
An implementation based on this might look like the following:
type Semaphore struct {
n int32
c *sync.Cond
}
func NewSemaphore(N int32) Semaphore {
return &Semaphore{
n: N,
c: sync.NewCond(new(sync.Mutex)),
}
}
func (s *Semaphore) decrement() {
s.c.L.Lock()
for s.n <= 0 {
s.c.Wait()
}
s.n--
s.c.L.Unlock()
}
func (s *Semaphore) increment() {
s.c.L.Lock()
s.n++
s.c.L.Unlock()
s.c.Signal()
}
This does everything we want: signals blocked gorroutines when a resource becomes available and blocks if the resources are exhausted.
There’s an alternative and popular implementation, based on channels:
type Semaphore chan struct{}
func NewSemaphore(N int32) Semaphore {
s := Semaphore(make(chan struct{}, N))
for i := N; i > 0; i-- {
s.increment()
}
return s
}
func (s Semaphore) decrement() {
<-s
}
func (s Semaphore) increment() {
s <- struct{}{}
}
With this approach the implementation is trivial: decrementing the semaphore only needs to take one element from the channel (wait for a signal) and increment needs to put one element in it (send a signal). Note the lack of locks. This is because the mechanism that implements channels uses locks and semaphores underneath, so you are leveraging the internal implementation for something that it already does well.
The channel must be buffered to prevent increment()
from blocking if
another goroutine hasn’t called decrement()
yet. The channel is seeded
with N
elements to make all the resources protected by the semaphore
available from the start. This is of course defined by the specific
problem that you are trying to solve, and the behavior can be extended
easily by changing the NewSemaphore
signature.
Note that, with this implementation, after calling increment()
N
times without an accompanying decrement()
, further calls to
increment()
will block, which should never happen. This means, for
example, that you cannot dynamically add resources.
Of the two, which implementation should you prefer?
The one that solves the problem you are trying to solve.
In principle, a semaphore is a good match for a problem where there’s a
limited amount of resources and a larger amount of agents that can do
work. Imagine a situation where you want to fetch URLs using a limited
number N
of simultaneous fetch operations, there’s some kind of
postprocessing that you want to perform on the content, and you have M
workers available for this problem, with M>N
. This can be solved with
a semaphore initialized at N
, eventually allowing that number of
goroutines to be in the fetching state. The remaining M-N
goroutines
can be in the postprocessing stage. Something like this:
for {
s.decrement()
url := getUrl()
content := fetchUrl(url)
s.increment()
processContent(content)
}
This solution might work, but there’s a better, more Go-like, one: start
N
goroutines for fetching and M-N
for postprocessing, communicating
between them using channels. Instead of using the channel as a signaling
device to implement a semaphore, use the channel to communicate. As you
can see from the second solution, the channel is a semaphore.