Error handling with clojure core.async pipeline

I am trying to figure out how to handle errors correctly using core.async / pipeline, my pipeline is as follows:

input --> xf-run-computation --> first-out first-out --> xf-run-computation --> last-out 

Where xf-run-computation will execute HTTP requests and return a response. However, some of these answers will return an error. What is the best way to handle these errors? My solution is to split the output channels into success-values and error-values , and then combine them back into the channel:

 (let [[success-values1 error-values1] (split fn-to-split first-out) [success-values2 error-values2] (split fn-to-split last-out) errors (merge [error-values1 error-values2])] (pipeline 4 first-out xf-run-computation input) (pipeline 4 last-out xf-run-computation success-values1) [last-out errors]) 

Thus, my function will return the latest results and errors.

+5
source share
2 answers

Generally speaking, what is β€œcorrect” probably depends on the needs of your application, but given your description of the problem, I think you need to consider three things:

  • xf-run-computation returns data that your business logic will see as errors,
  • xf-run-computation throws an exception and
  • given that HTTP calls are involved, some xf-run-computation runs may never finish (or not finish on time).

With respect to point 3., the first thing you should consider is to use pipeline-blocking instead of pipeline .

I think your question is mostly related to point 1. The main idea is that the result of xf-run-computation should return a data structure (like a map or record) that clearly marks the result as an error or success, like {:title nil :body nil :status "error"} . This will give you several solutions to this problem:

  • all of your later code simply ignores the input with :status "error" . Ie, your xf-run-computation will contain a string like (when (not (= (:status input) "error")) (run-computation input)) ,

  • you can run a filter on all the results between pipeline -calls and filter as needed (note that filter can also be used as a converter in the pipeline, thereby erasing the old filter> and filter< of core.async function),

  • you use async/split as you suggested / Alan Thompson shows in his answer to filter out error values ​​for a single error channel. There is no real need to have a second error channel for your second pipeline, if you are still going to combine the values, you can simply reuse your error channel.

In paragraph 2., the problem is that any exception in xf-run-computation occurs in another thread and will not just propagate back to your calling code. But you can use the ex-handler argument for pipeline (and pipeline-blocking ). You can simply filter out all exceptions, put the result on a separate exception channel, or try to catch them and turn them into errors (potentially returning them to the result or another error channel) - the latter makes sense if the exception gives you enough information, for example, an identifier or something what allows you to associate an exception with the input that caused the exception. You can organize this in xf-run-computation (i.e. catch any exception thrown from a third-party library, such as an http call).

For point 3, the canonical response in core.async should point to a timeout channel, but this is not particularly relevant to the pipeline . The best idea is to ensure that your http calls have a timeout set, for example. option :timeout http-kit or :socket-timeout and :conn-timeout clj-http. Note that these options usually result in a timeout exception.

+4
source

Here is an example that does what you offer. Starting with (range 10) it first filters out the multiple of 5, then the multiple of 3.

 (ns tst.clj.core (:use clj.core clojure.test ) (:require [clojure.core.async :as async] [clojure.string :as str] ) ) (defn err-3 [x] "'fail' for multiples of 3" (if (zero? (mod x 3)) (+ x 300) ; error case x)) ; non-error (defn err-5 [x] "'fail' for multiples of 5" (if (zero? (mod x 5)) (+ x 500) ; error case x)) ; non-error (defn is-ok? "Returns true if the value is not 'in error' (>=100)" [x] (< x 100)) (def ch-0 (async/to-chan (range 10))) (def ch-1 (async/chan 99)) (def ch-2 (async/chan 99)) (deftest t-2 (let [ _ (async/pipeline 1 ch-1 (map err-5) ch-0) [ok-chan-1 fail-chan-1] (async/split is-ok? ch-1 99 99) _ (async/pipeline 1 ch-2 (map err-3) ok-chan-1) [ok-chan-2 fail-chan-2] (async/split is-ok? ch-2 99 99) ok-vec-2 (async/<!! (async/into [] ok-chan-2)) fail-vec-1 (async/<!! (async/into [] fail-chan-1)) fail-vec-2 (async/<!! (async/into [] fail-chan-2)) ] (is (= ok-vec-2 [1 2 4 7 8])) (is (= fail-vec-1 [500 505])) (is (= fail-vec-2 [303 306 309])))) 

Instead of returning errors, I will probably just register them as soon as they are discovered, and then forget about them.

+1
source

Source: https://habr.com/ru/post/1261995/


All Articles