/usr/share/gocode/src/gopkg.in/eapache/go-resiliency.v1/batcher/batcher.go is in golang-gopkg-eapache-go-resiliency.v1-dev 0.0~git20150213.0.6800482-1.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | // Package batcher implements the batching resiliency pattern for Go.
package batcher
import (
"sync"
"time"
)
type work struct {
param interface{}
future chan error
}
// Batcher implements the batching resiliency pattern
type Batcher struct {
timeout time.Duration
prefilter func(interface{}) error
lock sync.Mutex
submit chan *work
doWork func([]interface{}) error
}
// New constructs a new batcher that will batch all calls to Run that occur within
// `timeout` time before calling doWork just once for the entire batch. The doWork
// function must be safe to run concurrently with itself as this may occur, especially
// when the timeout is small.
func New(timeout time.Duration, doWork func([]interface{}) error) *Batcher {
return &Batcher{
timeout: timeout,
doWork: doWork,
}
}
// Run runs the work function with the given parameter, possibly
// including it in a batch with other calls to Run that occur within the
// specified timeout. It is safe to call Run concurrently on the same batcher.
func (b *Batcher) Run(param interface{}) error {
if b.prefilter != nil {
if err := b.prefilter(param); err != nil {
return err
}
}
if b.timeout == 0 {
return b.doWork([]interface{}{param})
}
w := &work{
param: param,
future: make(chan error, 1),
}
b.submitWork(w)
return <-w.future
}
// Prefilter specifies an optional function that can be used to run initial checks on parameters
// passed to Run before being added to the batch. If the prefilter returns a non-nil error,
// that error is returned immediately from Run and the batcher is not invoked. A prefilter
// cannot safely be specified for a batcher if Run has already been invoked. The filter function
// specified must be concurrency-safe.
func (b *Batcher) Prefilter(filter func(interface{}) error) {
b.prefilter = filter
}
func (b *Batcher) submitWork(w *work) {
b.lock.Lock()
defer b.lock.Unlock()
if b.submit == nil {
b.submit = make(chan *work, 4)
go b.batch()
}
b.submit <- w
}
func (b *Batcher) batch() {
var params []interface{}
var futures []chan error
input := b.submit
go b.timer()
for work := range input {
params = append(params, work.param)
futures = append(futures, work.future)
}
ret := b.doWork(params)
for _, future := range futures {
future <- ret
close(future)
}
}
func (b *Batcher) timer() {
time.Sleep(b.timeout)
b.lock.Lock()
defer b.lock.Unlock()
close(b.submit)
b.submit = nil
}
|