Pyspark and hdfs commands

I would like to do some cleanup at the beginning of my Spark (Pyspark) program. For example, I would like to delete data from a previous HDFS run. In a pig, this can be done using commands such as

fs -copyFromLocal ....

rmf /path/to-/hdfs

or locally using the sh command.

I was wondering how to do the same with Pyspark.

+6
source share
2 answers

You can execute an arbitrary shell command using an example form or library so something like this should work fine: subprocess.callsh

import subprocess

some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

If you are using Python 2.x, you can try using : spotify/snakebite

from snakebite.client import Client

host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)

hdfs3 - , :

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)

Apache Arrow Python ( Spark, pandas_udf):

from pyarrow import hdfs

fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)
+12

hdfs pyspark :

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
fs = (sc._jvm.org
      .apache.hadoop
      .fs.FileSystem
      .get(sc._jsc.hadoopConfiguration())
      )
path = "Your/hdfs/path"
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

, , /:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

def delete_path(spark, path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

delete_path(spark, "Your/hdfs/path")
+3

Source: https://habr.com/ru/post/1618058/


All Articles