random worker pool exercise
This commit is contained in:
parent
12f61545ac
commit
624e3328ff
|
@ -0,0 +1,136 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type JobResult struct {
|
||||
Success bool
|
||||
Output string
|
||||
}
|
||||
|
||||
type Job interface {
|
||||
Execute()
|
||||
Result() JobResult
|
||||
}
|
||||
|
||||
type PrintJob struct {
|
||||
result *JobResult
|
||||
}
|
||||
|
||||
func (j *PrintJob) Execute() {
|
||||
j.result = &JobResult{
|
||||
Success: true,
|
||||
Output: time.Now().String(),
|
||||
}
|
||||
}
|
||||
func (j *PrintJob) Result() JobResult {
|
||||
return *j.result
|
||||
}
|
||||
|
||||
type Worker struct {
|
||||
id string
|
||||
incomingChan chan Job
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (w *Worker) Start(ctx context.Context) {
|
||||
defer w.wg.Done()
|
||||
for {
|
||||
select {
|
||||
case job, ok := <-w.incomingChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
job.Execute()
|
||||
log.Printf("worker=%s result=%s", w.id, job.Result().Output)
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func NewWorker(id string, wg *sync.WaitGroup, incomingChan chan Job) *Worker {
|
||||
return &Worker{
|
||||
id: id,
|
||||
wg: wg,
|
||||
incomingChan: incomingChan,
|
||||
}
|
||||
}
|
||||
|
||||
type WorkerPool struct {
|
||||
size int
|
||||
wg *sync.WaitGroup
|
||||
workerJobChan chan Job
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) Start(ctx context.Context) {
|
||||
var wpCtx context.Context
|
||||
wpCtx, wp.cancel = context.WithCancel(ctx)
|
||||
|
||||
for i := 0; i <= wp.size; i++ {
|
||||
wp.wg.Add(1)
|
||||
worker := NewWorker(strconv.Itoa(i), wp.wg, wp.workerJobChan)
|
||||
go worker.Start(wpCtx)
|
||||
}
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) Stop() {
|
||||
close(wp.workerJobChan)
|
||||
wp.wg.Add(1)
|
||||
go wp.flush()
|
||||
wp.wg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) Wait() {
|
||||
wp.wg.Wait()
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) flush() {
|
||||
defer wp.wg.Done()
|
||||
for job := range wp.workerJobChan {
|
||||
job.Execute()
|
||||
log.Printf("worker=%s result=%s", "main", job.Result().Output)
|
||||
}
|
||||
}
|
||||
|
||||
func (wp *WorkerPool) AddJob(job Job) {
|
||||
wp.workerJobChan <- job
|
||||
}
|
||||
|
||||
func NewWorkerPool(size, jobBufferSize int) *WorkerPool {
|
||||
return &WorkerPool{
|
||||
size: size,
|
||||
wg: &sync.WaitGroup{},
|
||||
workerJobChan: make(chan Job, jobBufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
ctx := context.Background()
|
||||
wp := NewWorkerPool(runtime.GOMAXPROCS(0), 2>>8)
|
||||
wp.Start(ctx)
|
||||
|
||||
// Populate jobs for some time, then stop workerpool
|
||||
populateCtx, cancelPopulate := context.WithTimeout(ctx, time.Millisecond*300)
|
||||
defer cancelPopulate()
|
||||
for {
|
||||
select {
|
||||
case <-populateCtx.Done():
|
||||
log.Println("STOPPING!!")
|
||||
wp.Stop()
|
||||
return
|
||||
|
||||
default:
|
||||
job := PrintJob{}
|
||||
wp.AddJob(&job)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue