How to rename a huge number of files in Hadoop / Spark?

I have an input folder containing +100,000 files.

I would like to do a batch operation on them, i.e. rename them all in a specific way or transfer them to a new path based on the information in each file name.

I would like to use Spark for this, but unfortunately when I tried the following piece of code:

final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration()); org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0])); List<String> pathsList = new ArrayList<>(); for (FileStatus path : paths) { pathsList.add(path.getPath().toString()); } JavaRDD<String> rddPaths = sc.parallelize(pathsList); rddPaths.foreach(new VoidFunction<String>() { @Override public void call(String path) throws Exception { Path origPath = new Path(path); Path newPath = new Path(path.replace("taboola","customer")); ghfs.rename(origPath,newPath); } }); 

I get an error that hasoop.fs.FileSystem is not Serializable (and therefore probably cannot be used in parallel operations)

Any idea on how I can get around this or do it differently?

+6
source share
4 answers

The problem is that you are trying to serialize a ghfs object. If you use mapPartitions and recreate the ghfs object in each section, you can run your code in just a couple of minor changes.

+4
source

You need to do FileSystem.get inside VoidFunction .

A driver needs a file system to get a list of files, but every worker also needs to rename the file system. The driver cannot transfer his FileSystem to the workers because it is not Serializable. But workers can get their own file system just fine.

In the Scala API, you can use RDD.foreachPartition to write code so that you only run FileSystem.get once for each section, and not once per line. It is probably also available in the Java API.

+4
source

I would recommend just renaming them the same way as with the file system class, in the context of the context without displaying (only in the driver), it does not matter to rename 100 thousand files, this is too slow, then you can try for multithreading. Just do something like

 FileSystem fileSystem = new Path("").getFileSystem(new Configuration()); File [] files = FileUtil.listFiles(directory) for (File file : files) { fileSystem.rename(new Path(file.getAbsolutePath()),new Path("renamed")); } 

The BTW error you get in sparks is because the spark requires objects that it uses to implement a Serializable, which is not in the FileSystem.


I cannot confirm this, but it would seem that every renaming in HDFS would include a Node name, as it keeps track of the full directory structure and node file location ( confirmation ), which means that this cannot be done effectively in parallel. According to this answer, renaming is an operation only with metadata, so it must be performed very quickly.

+3
source

I ran into a similar problem when my hdfs archives directory reached the maximum limit of an element

 Request error: org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException The directory item limit of /my/archive is exceeded: limit=1048576 items=1048576 

I decided to transfer all the items from the previous year (2015) to a separate subfolder. Here's a clean shell solution

 export HADOOP_CLIENT_OPTS="-XX:-UseGCOverheadLimit -Xmx4096m" hdfs dfs -ls /my/archive \ | grep 2015- \ | awk '{print $8}' \ | gnu-parallel -X -s 131000 hdfs dfs -mv {} /my/archive/2015 

Notes:

  • Client overrides are required for hdfs dfs -ls due to the large number of files. See here for more details.
  • hdfs dfs client has a limit on the length of the argument list: about 131000 ( 2^17 ) characters.
  • It took several minutes to move the 420k files.
0
source

Source: https://habr.com/ru/post/971921/


All Articles