SparkSQL-Lag function?

I see DataBricks in this post , there is support for window functions in SparkSql, in particular I'm trying to use lag () window function.

I have lines of credit card transactions and I sorted them, now I want to iterate over the lines, and for each line the transaction amount is displayed, and the difference in the current line amount and the previous line is the sum.

Following a DataBricks post, I came up with this query, but it throws an exception at me, and I cannot understand why ..

This is in PySpark .. tx is my file frame already created when registering as a temporary table.

test =sqlContext.sql("SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, (lag(tx.amt) OVER (PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time ROW BETWEEN PRECEDING AND CURRENT ROW)) as prev_amt from tx")

and exception (truncated).

py4j.protocol.Py4JJavaError: An error occurred while calling o76.sql.
: java.lang.RuntimeException: [1.67] failure: ``)'' expected but identifier OVER found

I would really appreciate any insight, this functionality is fairly new, and not as much as in existing examples or other related posts.

Edit

SQL, , . Hive SQLContext .

windowSpec = \
Window \
    .partitionBy(h_tx_df_ordered['cc_num']) \
    .orderBy(h_tx_df_ordered['cc_num'],h_tx_df_ordered['trans_date'],h_tx_df_ordered['trans_time'])

windowSpec.rowsBetween(-1, 0)

lag_amt = \
   (lag(h_tx_df_ordered['amt']).over(windowSpec) - h_tx_df_ordered['amt'])
    tx_df_ordered.select(
    h_tx_df_ordered['cc_num'],
    h_tx_df_ordered['trans_date'],
    h_tx_df_ordered['trans_time'],
    h_tx_df_ordered['amt'],
    lag_amt.alias("prev_amt")).show()

Traceback (most recent call last):
  File "rdd_raw_data.py", line 116, in <module>
    lag_amt.alias("prev_amt")).show()
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 721, in select
    jdf = self._jdf.select(self._jcols(*cols))
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/brandon/anaconda/lib/python2.7/site-packages/py4j/protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o152.select.
: org.apache.spark.sql.AnalysisException: Could not resolve window function 'lag'. Note that, using window functions currently requires a HiveContext;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
+4
1
  • ROWS not ROW
  • ROWS BETWEEN 1 PRECEDING AND CURRENT ROW
    

    UNBOUNDED

    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    
  • LAG , SQL- :

    SELECT tx.cc_num,tx.trans_date,tx.trans_time,tx.amt, LAG(tx.amt) OVER (
         PARTITION BY tx.cc_num ORDER BY  tx.trans_date,tx.trans_time
    ) as prev_amt from tx
    

Edit

SQL DSL:

  • , HiveContex

    sqlContext, HiveContext not sqlContext

  • windowSpec.rowsBetween(-1, 0) , LAG.

+4

Source: https://habr.com/ru/post/1607296/


All Articles