### Concurrent Digraph Processing in Go

### Concurrent Digraph Processing in Go

So today I decided to take on a fairly complex design pattern in the hopes of demonstrating some more advanced features of concurrency in Go or at least ruffle a few feathers about how to approach digraph processing.

Wikipedia offers a great explanation of graph theory in case you need to brush up!

The goal of this post is to offer a design approach to processing graphs concurrently , where each vertex in the graph represents a processable unit of work .

The graph will be processed concurrently via a `Crawl()` operation, where there are N number of concurrent processors . Each processor will process a stream of vertices procedurally by referencing a function pointer to a `process()` function.

## The Graph

The graph is composed of edges and vertices. We start the example off by building a complex graph with an arbitrary child count. Each of the children may or may not have their own children. A child is represented by a vertex.

## The Vertices

A vertex represent the intersections of the graph. Where 2 edges intersect, there will be a vertex. The only way to get from one vertex to another is to traverse it’s corresponding edge.

## The Edges

Edges are what connect vertices. Every edge has a `to` and a `from` pointer that will point to the two vertices it’s connecting. Because the edge insinuates direction, and only connects in a single direction that makes the graph a directional graph, or a digraph.

## The Processors

Processors process vertexes. In this example we simulate some arbitrary work by injecting a random sleep. In a real implementation a processor would actually accomplish some amount of work that a vertex would require. We have N processors so they can operate on vertices concurrently. The more processors, the more vertices we can concurrently operate on, or process.

## The Crawler

The crawler will traverse edges to find vertices. As the crawler finds a vertex it will concurrently process each vertex by passing it to a processor. Processors are called cyclically and in order.

For example if we had 10 vertices and 3 processors the call pattern would look like this.

V -- P ------ 1 -- 1 2 -- 2 3 -- 3 4 -- 1 5 -- 2 6 -- 3 7 -- 1 8 -- 2 9 -- 3 0 -- 1

The vertices to be processed in **unique** goroutines, but on **shared** channels. The channels will buffer and form a queue if the vertex workflow overflows their ability to keep up.

## The win

The huge win here is that the graph stays constructed in it’s original form. The crawler can iterate through it’s many (and complex) layers quickly because of the concurrent processing design. A processor could be replaced with any implementation capable of running a workload. This allows the user to structure complex data, while operating on it without any overhead of understanding the data. The processor gets a vertex, and thats it. The processors have no concept of order, and they don’t need to.

Notice how the program is able to calculate the operations by counting the graph, and the graph is actually processed quickly with the same number of operations. Good. Clean. Concurrent processing.

Furthermore a user can turn up the number of iterations and specify how many times to visit each vertex. This is useful in situations were a process is idempotent but could potentially fail. Sending the same request N number of times makes sense and increases the probability of a success.

package main import ( "math/rand" "time" "fmt" ) const ( NumberOfConcurrentProcessors = 32 NumberOfCrawlIterations = 4 ) // DiGraph // // This is a directional graph where edges can point in any direction between vertices. // The graph has 1 root vertex, which is where the Crawl() starts from. type DiGraph struct { RootVertex *Vertex // The root vertex of the graph Processors [] *Processor // List of concurrent processors ProcessorIndex int // The current index of the next processor to use Edges []*Edge // All directional edges that make up the graph Iterations int // The total number of times to iterate over the graph TotalVertices int // Count of the total number of vertices that make up the graph ProcessedChannel chan int // Channel to track processed vertices ProcessedCount int // Total number of processed vertices TotalOperations int // Total number of expected operations | [(TotalVertices * Iterations) - Iterations] + 1 } // Vertex // // A single unit that composes the graph. Each vertex has relationships with other vertices, // and should represent a single entity or unit of work. type Vertex struct { Name string // Unique name of this Vertex Edges []*Edge Status int } // Edge // // Edges connect vertices together. Edges have a concept of how many times they have been processed // And a To and From direction type Edge struct { To *Vertex From *Vertex ProcessedCount int } // Processor // // This represents a single concurrent process that will operate on N number of vertices type Processor struct { Function func(*Vertex) int Channel chan *Vertex } // Init the graph with a literal definition var TheGraph *DiGraph = NewGraph() func main() { TheGraph.Init(NumberOfConcurrentProcessors, NumberOfCrawlIterations) TheGraph.Crawl() } func (d *DiGraph) Init(n, i int) { noProcs := n d.TotalVertices = d.RootVertex.recursiveCount() d.Iterations = i for ; n > 0; n-- { p := Processor{Channel: make(chan *Vertex)} d.Processors = append(d.Processors, &p) p.Function = Process go p.Exec() } d.TotalOperations = (d.TotalVertices * d.Iterations) - d.Iterations + 1 //Math is hard fmt.Printf("Total Vertices : %d\n", d.TotalVertices) fmt.Printf("Total Iterations : %d\n", d.Iterations) fmt.Printf("Total Concurrent Processors : %d\n", noProcs) fmt.Printf("Total Assumed Operations : %d\n", d.TotalOperations) } func (d *DiGraph) Crawl() { d.ProcessedChannel = make(chan int) go d.RootVertex.recursiveProcess(d.getProcessor().Channel) fmt.Printf("---\n") for d.ProcessedCount < d.TotalOperations { d.ProcessedCount += <-d.ProcessedChannel //o(fmt.Sprintf("%d ", d.ProcessedCount)) } fmt.Printf("\n---\n") fmt.Printf("Total Comlpeted Operations : %d\n", d.ProcessedCount) } func (d *DiGraph) getProcessor() *Processor { maxIndex := len(d.Processors) - 1 if d.ProcessorIndex == maxIndex { d.ProcessorIndex = 0 } else { d.ProcessorIndex += 1 } return d.Processors[d.ProcessorIndex] } func Process(v *Vertex) int { // Simulate some work with a random sleep rand.Seed(time.Now().Unix()) sleep := rand.Intn(100 - 0) + 100 time.Sleep(time.Millisecond * time.Duration(sleep)) o(fmt.Sprintf("Processing: %s", v.Name)) // Return a status code return 1 } func (v *Vertex) recursiveProcess(ch chan *Vertex) { ch <- v for _, e := range v.Edges { if e.ProcessedCount < TheGraph.Iterations { e.ProcessedCount += 1 go e.To.recursiveProcess(TheGraph.getProcessor().Channel) } } } func (v *Vertex) recursiveCount() int { i := 1 for _, e := range v.Edges { if e.ProcessedCount != 0 { e.ProcessedCount = 0 i += e.To.recursiveCount() } } return i } func (v *Vertex) AddVertex(name string) *Vertex { newVertex := &Vertex{Name: name} newEdge := &Edge{To: newVertex, From: v, ProcessedCount: -1} newVertex.Edges = append(newVertex.Edges, newEdge) v.Edges = append(v.Edges, newEdge) return newVertex } func (p *Processor) Exec() { for { v := <-p.Channel v.Status = p.Function(v) TheGraph.ProcessedChannel <- 1 } } func NewGraph() *DiGraph { rootVertex := &Vertex{Name: "0"} v1 := rootVertex.AddVertex("1") rootVertex.AddVertex("2") rootVertex.AddVertex("3") v1.AddVertex("1-1") v1.AddVertex("1-2") v1_3 := v1.AddVertex("1-3") v1_3.AddVertex("1-3-1") v1_3.AddVertex("1-3-2") v1_3_3 := v1_3.AddVertex("1-3-3") v1_3_3.AddVertex("1-3-3-1") v1_3_3.AddVertex("1-3-3-2") v1_3_3.AddVertex("1-3-3-3") v1_3_3.AddVertex("1-3-3-4") v1_3_3.AddVertex("1-3-3-5") v1_3_3.AddVertex("1-3-3-6") v1_3_3.AddVertex("1-3-3-7") graph := &DiGraph{} graph.RootVertex = rootVertex return graph } func o(str string) { fmt.Println(str) }

#### Try it out

You can run the code yourself in the Go playground.

#### Thanks!

Thank you for reading my article. As always, I appreciate any feedback from users. So let me know how we could be better.

## LEAVE A COMMENT