How to read partitioned parquet files from S3 using pyarrow in python

I am looking for ways to read data from multiple partitioned directories from s3 using python.

data_folder / time_in_numeric_format = 1 / cur_date = 20-12-2012 / abcdsd0324324.snappy.parquet data_folder / time_in_numeric_format = 2 / cur_date = 27-12-2012 / asdsdfsd0324324.snappy.parquet

pyarrow The ParquetDataset module has the ability to read from sections. So I tried the following code:

>>> import pandas as pd >>> import pyarrow.parquet as pq >>> import s3fs >>> a = "s3://my_bucker/path/to/data_folder/" >>> dataset = pq.ParquetDataset(a) 

Threw the following error:

 Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__ self.metadata_path) = _make_manifest(path_or_paths, self.fs) File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest .format(path)) OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/ 

Based on the pyarrow documentation, I tried using s3fs as a file system, i.e.:

 >>> dataset = pq.ParquetDataset(a,filesystem=s3fs) 

Which causes the following error:

 Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__ self.metadata_path) = _make_manifest(path_or_paths, self.fs) File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest if is_string(path_or_paths) and fs.isdir(path_or_paths): AttributeError: module 's3fs' has no attribute 'isdir' 

I am limited to using an ECS cluster, so spark / pyspark is not an option .

Is there a way to easily read parquet files easily in python from such partitioned directories in s3? I believe listing all directories and then reading is not good practice as suggested in this link . I would need to convert the read data into the pandas framework for further processing and, therefore, give preference to the options associated with fastparquet or pyarrow. I am also open to other options in python.

+15
source share
4 answers

I managed to get this to work with the latest version of fastparquet & s3fs. Below is the code for the same:

 import s3fs import fastparquet as fp s3 = s3fs.S3FileSystem() fs = s3fs.core.S3FileSystem() #mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet s3_path = "mybucket/data_folder/*/*/*.parquet" all_paths_from_s3 = fs.glob(path=s3_path) myopen = s3.open #use s3fs as the filesystem fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen) #convert to pandas dataframe df = fp_obj.to_pandas() 

loans Martin for pointing me in the right direction through our conversation

NB : This will be slower than using pyarrow based on the test . I will update my answer as soon as s3fs support is implemented in pyarrow via ARROW-1213

I did a quick test for individual iterations using pyarrow and a list of files sent as fastparquet. fastparquet is faster with s3fs vs pyarrow + my hacker code. But I believe that pyarrow + s3fs will be faster after its implementation.

Code and tests below:

 >>> def test_pq(): ... for current_file in list_parquet_files: ... f = fs.open(current_file) ... df = pq.read_table(f).to_pandas() ... # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe ... #probably not the best way to split :) ... elements_list=current_file.split('/') ... for item in elements_list: ... if item.find(date_partition) != -1: ... current_date = item.split('=')[1] ... elif item.find(dma_partition) != -1: ... current_dma = item.split('=')[1] ... df['serial_number'] = current_dma ... df['cur_date'] = current_date ... list_.append(df) ... frame = pd.concat(list_) ... >>> timeit.timeit('test_pq()',number =10,globals=globals()) 12.078817503992468 >>> def test_fp(): ... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen) ... df = fp_obj.to_pandas() >>> timeit.timeit('test_fp()',number =10,globals=globals()) 2.961556333000317 

Update 2019

After all PRs , problems such as Arrow-2038 and Fast Parquet - PR # 182 were resolved.

Reading parquet files with Pyarrow

 # pip install pyarrow # pip install s3fs >>> import s3fs >>> import pyarrow.parquet as pq >>> fs = s3fs.S3FileSystem() >>> bucket = 'your-bucket-name' >>> path = 'directory_name' #if its a directory omit the traling / >>> bucket_uri = f's3://{bucket}/{path}' 's3://your-bucket-name/directory_name' >>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs) >>> table = dataset.read() >>> df = table.to_pandas() 

Reading parquet files with Fast parquet

 # pip install s3fs # pip install fastparquet >>> import s3fs >>> import fastparquet as fp >>> bucket = 'your-bucket-name' >>> path = 'directory_name' >>> root_dir_path = f'{bucket}/{path}' # the first two wild card represents the 1st,2nd column partitions columns of your data & so forth >>> s3_path = f"{root_dir_path}/*/*/*.parquet" >>> all_paths_from_s3 = fs.glob(path=s3_path) >>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path) >>> df = fp_obj.to_pandas() 

Quick tests

This is probably not the best way to compare. please read the blog post for the end-to-end test

 #pyarrow >>> import timeit >>> def test_pq(): ... dataset = pq.ParquetDataset(bucket_uri, filesystem=fs) ... table = dataset.read() ... df = table.to_pandas() ... >>> timeit.timeit('test_pq()',number =10,globals=globals()) 1.2677053569998407 #fastparquet >>> def test_fp(): ... fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path) ... df = fp_obj.to_pandas() >>> timeit.timeit('test_fp()',number =10,globals=globals()) 2.931876824000028 

Further reading regarding Pyarrow speed

Link:

+17
source

Discuss at https://issues.apache.org/jira/browse/ARROW-1213 and https://issues.apache.org/jira/browse/ARROW-1119 . We need to add code that allows pyarrow to recognize the s3fs file system and add a fit / compatibility class to match the S3FS slightly different API to pyarrow's.

+2
source

This issue was resolved in this request in 2017.

For those who want to read parquet from S3, using only public relations, here is an example:

 import s3fs import pyarrow.parquet as pq from pyarrow.filesystem import S3FSWrapper fs = s3fs.S3FileSystem() bucket = "your-bucket" path = "your-path" # Python 3.6 or later p_dataset = pq.ParquetDataset( f"s3://{bucket}/{path}", filesystem=fs ) df = p_dataset.read().to_pandas() # Pre-python 3.6 p_dataset = pq.ParquetDataset( "s3://{0}/{1}".format(bucket, path), filesystem=fs ) df = p_dataset.read().to_pandas() 
+1
source

For those of you who want to read only parts of a split parquet file, pyarrow accepts a list of keys, and only a partial path to the directory to read in all parts of the section. This method is especially useful for organizations that have broken down their parquet datasets into meaningful values, for example, by year or country, allowing users to specify which parts of the file they need. This will reduce costs in the long run, as the AWS fee per byte when reading in datasets.

 # Read in user specified partitions of a partitioned parquet file import s3fs import pyarrow.parquet as pq s3 = s3fs.S3FileSystem() keys = ['keyname/blah_blah/part-00000-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\ ,'keyname/blah_blah/part-00001-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\ ,'keyname/blah_blah/part-00002-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'\ ,'keyname/blah_blah/part-00003-cc2c2113-3985-46ac-9b50-987e9463390e-c000.snappy.parquet'] bucket = 'bucket_yada_yada_yada' # Add s3 prefix and bucket name to all keys in list parq_list=[] for key in keys: parq_list.append('s3://'+bucket+'/'+key) # Create your dataframe df = pq.ParquetDataset(parq_list, filesystem=s3).read_pandas(columns=['Var1','Var2','Var3']).to_pandas() 
0
source

Source: https://habr.com/ru/post/1269796/


All Articles