"fan in" - one "branched" behavior

Say we have three methods for implementing the behavior of a β€œfan”

func MakeChannel(tries int) chan int {
    ch := make(chan int)

    go func() {
        for i := 0; i < tries; i++ {
            ch <- i
        }
        close(ch)
    }()

    return ch
}

func MergeByReflection(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    cases := make([]reflect.SelectCase, length)
    for i, ch := range channels {
        cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
    }
    go func() {
        for length > 0 {
            i, line, opened := reflect.Select(cases)
            if !opened {
                cases[i].Chan = reflect.ValueOf(nil)
                length -= 1
            } else {
                out <- int(line.Int())
            }
        }
        close(out)
    }()
    return out
}

func MergeByCode(channels ...chan int) chan int {
    length := len(channels)
    out := make(chan int)
    go func() {
        var i int
        var ok bool

        for length > 0 {
            select {
            case i, ok = <-channels[0]:
                out <- i
                if !ok {
                    channels[0] = nil
                    length -= 1
                }
            case i, ok = <-channels[1]:
                out <- i
                if !ok {
                    channels[1] = nil
                    length -= 1
                }
            case i, ok = <-channels[2]:
                out <- i
                if !ok {
                    channels[2] = nil
                    length -= 1
                }
            case i, ok = <-channels[3]:
                out <- i
                if !ok {
                    channels[3] = nil
                    length -= 1
                }
            case i, ok = <-channels[4]:
                out <- i
                if !ok {
                    channels[4] = nil
                    length -= 1
                }
            }
        }
        close(out)
    }()
    return out
}

func MergeByGoRoutines(channels ...chan int) chan int {
    var group sync.WaitGroup

    out := make(chan int)
    for _, ch := range channels {
        go func(ch chan int) {
            for i := range ch {
                out <- i
            }
            group.Done()
        }(ch)
    }
    group.Add(len(channels))
    go func() {
        group.Wait()
        close(out)
    }()
    return out
}

type MergeFn func(...chan int) chan int

func main() {
    length := 5
    tries := 1000000
    channels := make([]chan int, length)
    fns := []MergeFn{MergeByReflection, MergeByCode, MergeByGoRoutines}

    for _, fn := range fns {
        sum := 0
        t := time.Now()
        for i := 0; i < length; i++ {
            channels[i] = MakeChannel(tries)
        }
        for i := range fn(channels...) {
            sum += i
        }
        fmt.Println(time.Since(t))
        fmt.Println(sum)
    }
}

Results (with 1 CPU, I used runtime.GOMAXPROCS (1)):
19.869s (MergeByReflection)
2499997500000
8.483s (MergeByCode)
2499997500000
4.977s (MergeByGoRoutines)
2499997500000

Results (with 2 CPUs, I used runtime.GOMAXPROCS (2)):
44.94s
2499997500000
10.853s
2499997500000
3.728s
2499997500000

  • I understand the reason MergeByReflection is the slowest, but what is the difference between MergeByCode and MergeByGoRoutines?
  • CPU, "" ( MergeByReflection MergeByCode ) ?
+4
1

. , , , , .

, . . , .

MergeByReflection MergeByCode , ( , ),

MergeByGoRoutines : , . , .

MergeByReflection , , .

MergeByGoRoutines , ( ), . ( ).

MergeByReflection MergeByCode, . , .

+3

Source: https://habr.com/ru/post/1584309/


All Articles