Generally speaking, what is βcorrectβ probably depends on the needs of your application, but given your description of the problem, I think you need to consider three things:
xf-run-computation returns data that your business logic will see as errors,xf-run-computation throws an exception and- given that HTTP calls are involved, some
xf-run-computation runs may never finish (or not finish on time).
With respect to point 3., the first thing you should consider is to use pipeline-blocking instead of pipeline .
I think your question is mostly related to point 1. The main idea is that the result of xf-run-computation should return a data structure (like a map or record) that clearly marks the result as an error or success, like {:title nil :body nil :status "error"} . This will give you several solutions to this problem:
all of your later code simply ignores the input with :status "error" . Ie, your xf-run-computation will contain a string like (when (not (= (:status input) "error")) (run-computation input)) ,
you can run a filter on all the results between pipeline -calls and filter as needed (note that filter can also be used as a converter in the pipeline, thereby erasing the old filter> and filter< of core.async function),
you use async/split as you suggested / Alan Thompson shows in his answer to filter out error values ββfor a single error channel. There is no real need to have a second error channel for your second pipeline, if you are still going to combine the values, you can simply reuse your error channel.
In paragraph 2., the problem is that any exception in xf-run-computation occurs in another thread and will not just propagate back to your calling code. But you can use the ex-handler argument for pipeline (and pipeline-blocking ). You can simply filter out all exceptions, put the result on a separate exception channel, or try to catch them and turn them into errors (potentially returning them to the result or another error channel) - the latter makes sense if the exception gives you enough information, for example, an identifier or something what allows you to associate an exception with the input that caused the exception. You can organize this in xf-run-computation (i.e. catch any exception thrown from a third-party library, such as an http call).
For point 3, the canonical response in core.async should point to a timeout channel, but this is not particularly relevant to the pipeline . The best idea is to ensure that your http calls have a timeout set, for example. option :timeout http-kit or :socket-timeout and :conn-timeout clj-http. Note that these options usually result in a timeout exception.
source share