If the step function returns a reduced value, the conversion process should not provide more input for the step function. The reduced value must be expanded using deref before completion.
One example of this scenario is a take-while converter:
(fn [rf] (fn ([] (rf)) ([result] (rf result)) ([result input] (if (pred input) (rf result input) (reduced result)))))
As you can see, it can return the reduced value, which means that there is no point (and in fact it would be a mistake) to provide more input for such a step function - we know that there can no longer be more produced values.
For example, when processing an input collection (1 1 3 5 6 8 7) with the predicate odd? when we reach value 6 , there will be no more values โโreturned by the step function created by the take-while odd? converter take-while odd? .
The completion process should invoke the completion operation on the final accumulated value exactly once.
This is a scenario in which the converter returns a state step function. A good example is the partition-by converter. For example, when (partition-by odd?) used by a conversion process to process (1 3 2 4 5 2) , it will produce ((1 3) (2 4) (5) (6 8)) .
(fn [rf] (let [a (java.util.ArrayList.) pv (volatile! ::none)] (fn ([] (rf)) ([result] (let [result (if (.isEmpty a) result (let [v (vec (.toArray a))] ;;clear first! (.clear a) (unreduced (rf result v))))] (rf result))) ([result input] (let [pval @pv val (f input)] (vreset! pv val) (if (or (identical? pval ::none) (= val pval)) (do (.add a input) result) (let [v (vec (.toArray a))] (.clear a) (let [ret (rf result v)] (when-not (reduced? ret) (.add a input)) ret))))))))
If you look at the implementation, you will notice that the step function will not return the accumulated values โโto it (stored in the list of arrays a ), until the predicate function returns another result (for example, after a sequence of odd numbers it will get an even number, it will return seq of accumulated odd numbers ) The problem is that if we get to the end of the source data, it will not be possible to observe the change in the value of the predicate result, and the accumulated value will not be returned. Thus, the transduced process must call the step function completion operation (arity 1) so that it can return its accumulated result (in our case (6 8) ).
The transduction process must encapsulate references to the function returned by the inverter call - they may be incompatible and unsafe for use over streams.
When the process to be converted is performed by transferring the source data and the converter instance, it will first call the converter function to create a step function. The converter is a function of the following form :
(fn [xf] (fn ([] ...) ([result] ...) ([result input] ...)))
Thus, the transformed process will call this top-level function (accepts the xf reduction function) to get the actual step function used to process the data elements. The problem is that the conversion process must contain a reference to this step function and use the same instance to process elements from a specific data source (for example, the created step function instance created by the partition-by converter must be used to process the entire input sequence, because it retains its internal state, as you saw above). Using different instances to process a single data source will produce incorrect results.
Similarly, a conversion process cannot reuse an instance of a step function to process multiple data sources for the same reason โ an instance of a step function can be restrained and contain an internal state to process a particular data source. This state will be corrupted if the step function is used to process another data source.
There is also no guarantee that the implementation of the step function is thread safe or not.
What does context mean in this context?
"A new context for the use of converters" means the implementation of a new type of process that provides transformability. Clojure provides conversion processes that work with collections (e.g. into , sequence ). The core.async library chan function (one of its arities) takes an instance of the converter as an argument, which creates an asynchronous process leading to the creation of values โโ(which can be used from the channel) by applying the converter to the consumed values.
You can, for example, create a transparent process for processing data received on a socket, or your own implementation of observables.
They can use transformers to transform data, because transformers are agnostic when they come from, where the data comes from (socket, stream, collection, event source, etc.) - it's just a function called by individual elements.
They also do not care (and do not know) what to do with the result that they generate (for example, should it be attached to the sequence of results (for example, conj ) "if it is sent over the network? Inserted into the database? ) - it is abstracted using the reduction function, which is captured by the step function ( rf argument above).
Therefore, instead of creating a step function that simply uses conj or stores the elements in db, we pass in a function that has a concrete implementation of this operation. And your transformable process determines what kind of operation it is.