Concurrent Digraph Processing in Go

Concurrent Digraph Processing in Go

So today I decided to take on a fairly complex design pattern in the hopes of demonstrating some more advanced features of concurrency in Go or at least ruffle a few feathers about how to approach digraph processing.

Wikipedia offers a great explanation of graph theory in case you need to brush up!

The goal of this post is to offer a design approach to processing graphs concurrently , where each vertex in the graph represents a processable unit of work .

The graph will be processed concurrently via a `Crawl()` operation, where there are N number of concurrent processors . Each processor will process a stream of vertices procedurally by referencing a function pointer to a `process()` function.

The Graph

The graph is composed of edges and vertices. We start the example off by building a complex graph with an arbitrary child count. Each of the children may or may not have their own children. A child is represented by a vertex.

The Vertices

A vertex represent the intersections of the graph. Where 2 edges intersect, there will be a vertex. The only way to get from one vertex to another is to traverse it’s corresponding edge.

The Edges

Edges are what connect vertices. Every edge has a `to` and a `from` pointer that will point to the two vertices it’s connecting. Because the edge insinuates direction, and only connects in a single direction that makes the graph a directional graph, or a digraph.

The Processors

Processors process vertexes. In this example we simulate some arbitrary work by injecting a random sleep. In a real implementation a processor would actually accomplish some amount of work that a vertex would require. We have N processors so they can operate on vertices concurrently. The more processors, the more vertices we can concurrently operate on, or process.

The Crawler

The crawler will traverse edges to find vertices. As the crawler finds a vertex it will concurrently process each vertex by passing it to a processor. Processors are called cyclically and in order.

For example if we had 10 vertices and 3 processors the call pattern would look like this.

V -- P
------
1 -- 1
2 -- 2
3 -- 3
4 -- 1
5 -- 2
6 -- 3
7 -- 1
8 -- 2
9 -- 3
0 -- 1

The vertices to be processed in unique goroutines, but on shared channels. The channels will buffer and form a queue if the vertex workflow overflows their ability to keep up.

The win

The huge win here is that the graph stays constructed in it’s original form. The crawler can iterate through it’s many (and complex) layers quickly because of the concurrent processing design. A processor could be replaced with any implementation capable of running a workload. This allows the user to structure complex data, while operating on it without any overhead of understanding the data. The processor gets a vertex, and thats it. The processors have no concept of order, and they don’t need to.

Notice how the program is able to calculate the operations by counting the graph, and the graph is actually processed quickly with the same number of operations. Good. Clean. Concurrent processing.

Furthermore a user can turn up the number of iterations and specify how many times to visit each vertex. This is useful in situations were a process is idempotent but could potentially fail. Sending the same request N number of times makes sense and increases the probability of a success.

package main



import (
	"math/rand"
	"time"
	"fmt"
)

const (
	NumberOfConcurrentProcessors = 32
	NumberOfCrawlIterations = 4
)

// DiGraph
//
// This is a directional graph where edges can point in any direction between vertices.
// The graph has 1 root vertex, which is where the Crawl() starts from.
type DiGraph struct {
	RootVertex       *Vertex       // The root vertex of the graph
	Processors       [] *Processor // List of concurrent processors
	ProcessorIndex   int           // The current index of the next processor to use
	Edges            []*Edge       // All directional edges that make up the graph
	Iterations       int           // The total number of times to iterate over the graph
	TotalVertices    int           // Count of the total number of vertices that make up the graph
	ProcessedChannel chan int      // Channel to track processed vertices
	ProcessedCount   int           // Total number of processed vertices
	TotalOperations  int           // Total number of expected operations | [(TotalVertices * Iterations) - Iterations] + 1
}

// Vertex
//
// A single unit that composes the graph. Each vertex has relationships with other vertices,
// and should represent a single entity or unit of work.
type Vertex struct {
	Name   string // Unique name of this Vertex
	Edges  []*Edge
	Status int
}

// Edge
//
// Edges connect vertices together. Edges have a concept of how many times they have been processed
// And a To and From direction
type Edge struct {
	To             *Vertex
	From           *Vertex
	ProcessedCount int
}

// Processor
//
// This represents a single concurrent process that will operate on N number of vertices
type Processor struct {
	Function func(*Vertex) int
	Channel  chan *Vertex
}

// Init the graph with a literal definition
var TheGraph *DiGraph = NewGraph()

func main() {
	TheGraph.Init(NumberOfConcurrentProcessors, NumberOfCrawlIterations)
	TheGraph.Crawl()
}

func (d *DiGraph) Init(n, i int) {
	noProcs := n
	d.TotalVertices = d.RootVertex.recursiveCount()
	d.Iterations = i
	for ; n > 0; n-- {
		p := Processor{Channel: make(chan *Vertex)}
		d.Processors = append(d.Processors, &p)
		p.Function = Process
		go p.Exec()
	}
	d.TotalOperations = (d.TotalVertices * d.Iterations) - d.Iterations + 1 //Math is hard
	fmt.Printf("Total Vertices              : %d\n", d.TotalVertices)
	fmt.Printf("Total Iterations            : %d\n", d.Iterations)
	fmt.Printf("Total Concurrent Processors : %d\n", noProcs)
	fmt.Printf("Total Assumed Operations    : %d\n", d.TotalOperations)
}

func (d *DiGraph) Crawl() {
	d.ProcessedChannel = make(chan int)
	go d.RootVertex.recursiveProcess(d.getProcessor().Channel)
	fmt.Printf("---\n")
	for d.ProcessedCount < d.TotalOperations {
		d.ProcessedCount += <-d.ProcessedChannel
                //o(fmt.Sprintf("%d ", d.ProcessedCount))
	}
	fmt.Printf("\n---\n")
	fmt.Printf("Total Comlpeted Operations  : %d\n", d.ProcessedCount)
}

func (d *DiGraph) getProcessor() *Processor {
	maxIndex := len(d.Processors) - 1
	if d.ProcessorIndex == maxIndex {
		d.ProcessorIndex = 0
	} else {
		d.ProcessorIndex += 1
	}
	return d.Processors[d.ProcessorIndex]
}

func Process(v *Vertex) int {

	// Simulate some work with a random sleep
	rand.Seed(time.Now().Unix())
	sleep := rand.Intn(100 - 0) + 100
	time.Sleep(time.Millisecond * time.Duration(sleep))

        o(fmt.Sprintf("Processing: %s", v.Name))
	// Return a status code
	return 1
}

func (v *Vertex) recursiveProcess(ch chan *Vertex) {
	ch <- v
	for _, e := range v.Edges {
		if e.ProcessedCount < TheGraph.Iterations {
			e.ProcessedCount += 1
			go e.To.recursiveProcess(TheGraph.getProcessor().Channel)
		}
	}
}

func (v *Vertex) recursiveCount() int {
	i := 1
	for _, e := range v.Edges {
		if e.ProcessedCount != 0 {
			e.ProcessedCount = 0
			i += e.To.recursiveCount()
		}
	}
	return i
}

func (v *Vertex) AddVertex(name string) *Vertex {
	newVertex := &Vertex{Name: name}
	newEdge := &Edge{To: newVertex, From: v, ProcessedCount: -1}
	newVertex.Edges = append(newVertex.Edges, newEdge)
	v.Edges = append(v.Edges, newEdge)
	return newVertex
}

func (p *Processor) Exec() {
	for {
		v := <-p.Channel
		v.Status = p.Function(v)
		TheGraph.ProcessedChannel <- 1
	}
}

func NewGraph() *DiGraph {
	rootVertex := &Vertex{Name: "0"}
	v1 := rootVertex.AddVertex("1")
	rootVertex.AddVertex("2")
	rootVertex.AddVertex("3")
	v1.AddVertex("1-1")
	v1.AddVertex("1-2")
	v1_3 := v1.AddVertex("1-3")
	v1_3.AddVertex("1-3-1")
	v1_3.AddVertex("1-3-2")
	v1_3_3 := v1_3.AddVertex("1-3-3")
	v1_3_3.AddVertex("1-3-3-1")
	v1_3_3.AddVertex("1-3-3-2")
	v1_3_3.AddVertex("1-3-3-3")
	v1_3_3.AddVertex("1-3-3-4")
	v1_3_3.AddVertex("1-3-3-5")
	v1_3_3.AddVertex("1-3-3-6")
	v1_3_3.AddVertex("1-3-3-7")
	graph := &DiGraph{}
	graph.RootVertex = rootVertex
	return graph
}

func o(str string) {
	fmt.Println(str)
}

Try it out

You can run the code yourself in the Go playground.

Thanks!

Thank you for reading my article. As always, I appreciate any feedback from users. So let me know how we could be better.

Follow @kris-nova

LEAVE A COMMENT

0 comment