Timestamp from Spark Python to Pandas and vice versa

How do you convert timestamp data from Spark Python to Pandas and vice versa? I am reading data from a Hive table in Spark, I want to do some calculations in Pandas and write the results back to Hive. Only the last part does not work, converting the Pandas timestamp back to the Spark DataFrame timestamp.

import datetime
import pandas as pd

dates = [
    ('today', '2017-03-03 11:30:00')
  , ('tomorrow', '2017-03-04 08:00:00')
  , ('next Thursday', '2017-03-09 20:00:00')
]
string_date_rdd = sc.parallelize(dates)
timestamp_date_rdd = string_date_rdd.map(lambda t: (t[0], datetime.datetime.strptime(t[1], "%Y-%m-%d %H:%M:%S')))
timestamp_df = sqlContext.createDataFrame(timestamp_date_rdd, ['Day', 'Date'])
timestamp_pandas_df = timestamp_df.toPandas()
roundtrip_df = sqlContext.createDataFrame(timestamp_pandas_df)
roundtrip_df.printSchema()
roundtrip_df.show()

root
 |-- Day: string (nullable = true)
 |-- Date: long (nullable = true)

+-------------+-------------------+
|          Day|               Date|
+-------------+-------------------+
|        today|1488540600000000000|
|     tomorrow|1488614400000000000|
|next Thursday|1489089600000000000|
+-------------+-------------------+

At this point, the roundtrip Spark DataFrame has a long date column as a data type. In Pyspark, this can easily be converted back to a datetime object, for example, datetime.datetime.fromtimestamp (148908960000000000/1000000000), although the time of day is disabled for several hours. How to do this to convert a Spark DataFrame data type?

Python 3.4.5, Spark 1.6.0

Thanks, John

+4
2

datetime64 datetime python .

from pandas import Series

def convert_to_python_datetime(df):
    df_copy = df.copy()
    for column_name, column in df_copy.iteritems():
        if column.dtype.kind == 'M':
            df_copy[column_name] = Series(column.dt.to_pydatetime(), dtype=object)
    return df_copy


tmp = convert_to_python_datetime(timestamp_pandas_df)
roundtrip_df = sqlContext.createDataFrame(tmp)
roundtrip_df.printSchema()
roundtrip_df.show()

:

 root
 |-- Day: string (nullable = true)
 |-- Date: timestamp (nullable = true)

+-------------+--------------------+
|          Day|                Date|
+-------------+--------------------+
|        today|2017-03-03 11:30:...|
|     tomorrow|2017-03-04 08:00:...|
|next Thursday|2017-03-09 20:00:...|
+-------------+--------------------+
+1

:

from pyspark.sql.types import TimestampType
extra_column_df = roundtrip_df.select(roundtrip_df.Day, roundtrip_df.Date).withColumn('new_date', roundtrip_df.Date / 1000000000)
roundtrip_timestamp_df = extra_column_df.select(extra_column_df.Day, extra_column_df.new_date.cast(TimestampType()).alias('Date')

:

root
 |-- Day: string (nullable = true)
 |-- Date: timestamp (nullable = true)

+-------------+--------------------+
|        Day  |                Date|
+-------------+--------------------+
|        today|2017-03-03 11:30:...|
|     tomorrow|2017-03-04 08:00:...|
|next Thursday|2017-03-09 20:00:...|
+-------------+--------------------+

, , UTC, DST.

+1

Source: https://habr.com/ru/post/1671383/


All Articles