Like a spark, how does translation work?

This is a very simple question: in a broadcast lawsuit, you can efficiently send variables to executors. How it works?

More precisely:

  • When are the values ​​sent: as soon as I call broadcast , or when are the values ​​used?
  • Where exactly is the data sent: to all performers or only to those who need it?
  • Where is the data stored? In memory or on disk?
  • Is there any difference in how simple variables and broadcast variables are available? What happens under the hood when I call the .value method?
+5
source share
2 answers

Short answer

  • Values ​​are sent the first time they are needed by the performer. When sending sc.broadcast(variable) nothing is sent.
  • Data is sent only to nodes that contain it, which needs it.
  • The data is stored in memory. If there is not enough memory, the disk is used.
  • Yes, there is a big difference between accessing a local variable and a broadcast variable. Transmitted variables should be loaded on first access.

Long answer

The answer is in the source of the spark, in TorrentBroadcast.scala .

  • When sc.broadcast is sc.broadcast , a new TorrentBroadcast object is created from BroadcastFactory.scala . In writeBlocks() , the following happens: it is called when the TorrentBroadcast object is initialized:

    • The object is cached unserialized locally using the MEMORY_AND_DISK policy.
    • Serializes.
    • The serialized version is divided into 4 MB blocks, which are compressed [0] and stored locally [1] .
  • When creating new artists, they have only a light TorrentBroadcast object, which contains only the identifier of the broadcast object and its number of blocks.

  • The TorrentBroadcast object has a lazy property [2] that contains its value. When the value method is called, this lazy property is returned. Therefore, when you first call this function, the value is set as follows:

    • In random order, blocks are retrieved from the local block manager and uncompressed.
    • If they are not present in the local block manager, getRemoteBytes is called in the block manager to retrieve them. Network traffic occurs only at this time.
    • If a block is missing locally, it is cached using MEMORY_AND_DISK_SER .

[0] The default is lz4. This can be customized .

[1] Blocks are stored in the local block manager using MEMORY_AND_DISK_SER , which means that it sheds partitions that cannot fit into memory. Each block has a unique identifier calculated from the identifier of the broadcast variable and its offset. Block size can be customized ; the default is 4 MB.

[2] The false val in scala is a variable whose value is evaluated on first access and then cached. See the documentation .

+7
source
  • as soon as it is broadcast.
  • it is sent to all artists using the torrent protocol, but is downloaded only if necessary
  • Loaded variables are kept deserialized in memory
    • confirms that the broadcast was not destroyed
    • lazily loads a variable from blockManager
+1
source

Source: https://habr.com/ru/post/1259991/


All Articles