How to do parallel downloads in Go

We have a process whereby users request files that we need to get from our source. This source is not the most reliable, so we implemented the queue using Amazon SQS. We put the download URL in a queue, and then we polled it with a small application that we wrote in Go. This application simply retrieves the messages, downloads the file, and then pops it onto S3, where we store it. When all this is completed, he will call the service, which will send an email to the user to inform them that the file is ready.

I originally wrote this to create n channels, and then applied 1 routine to each and had a routine in an infinite loop. That way, I could make sure that I only handled a fixed number of downloads at a time.

I realized that this is not the way that the channels are supposed to be used, and if I understand correctly now, there really should be one channel with n hosting methods accepted on this channel. Each routine is in an infinite loop, waiting for a message, and when it receives it, it will process the data, do everything that it should, and when it is done, it will wait for the next message. This allows me to guarantee that I only ever process n files at a time. I think this is the right way to do this. I think it's fanatical, right?

What I do not need to do is combine these processes together. Once the download is complete, it will call the remote service back to process the rest of the process. Nothing more to do.

OK, so some code:

func main() {
    queue, err := ConnectToQueue() // This works fine...
    if err != nil {
        log.Fatalf("Could not connect to queue: %s\n", err)
    }

    msgChannel := make(chan sqs.Message, 10)

    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
        go processMessage(msgChannel, queue)
    }

    for {
        response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)

        for _, m := range response.Messages {
            msgChannel <- m
        }
    }
}

func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
    for {
        m := <-ch
        // Do something with message m

        // Delete message from queue when we're done
        queue.DeleteMessage(&m)
    }
}

Am I anywhere near? I have n running routines (where MAX_CONCURRENT_ROUTINES= n), and in a loop we will send messages to one channel. Is it correct? Do I need to close something or can I just leave it indefinitely?

One thing that I notice is that SQS returns messages, but as soon as I had 10 messages sent to processMessage()(10 is the size of the channel buffer) that no other messages are actually processed.

Thank you all

+4
1

. :

  • parallelism , , . , goroutine , , goroutine , parallelism. , , , .

    sem := make(chan struct{}, n)
    work := func(m sqs.Message) {
        sem <- struct{}{} // When there room we can proceed
        // do the work
        <-sem // Free room in the channel
    }()
    for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) {
        for _, m0 := range m {
            go work(m0)
        }
    }
    
  • 10 . , , 10 , , , , . , , .

  • , . , , , , , , . sync.WaitGroup , , msgChannel, . ( 2-.)

+3

Source: https://habr.com/ru/post/1606250/


All Articles