25 OCT 2015 · 3 MIN READ · SETTLEMENTS

Parsing AEMO data with Clojure

We're going to use Clojure to quickly create a utility for parsing an RM16 file from AEMO and displaying the volume in MWh per profile per state. If you are reading this post I'm just going to assume you are familiar what both AEMO and a RM16 file. The solution will make use of Prismatic's schema li

We're going to use Clojure to quickly create a utility for parsing an RM16 file from AEMO and displaying the volume in MWh per profile per state. If you are reading this post I'm just going to assume you are familiar what both AEMO and a RM16 file. The solution will make use of Prismatic's schema library as it adds a little more documentation and formality to the code base. All resulting figures will be obfuscated as they would be commercially sensitive.


the data types

One of the biggest issues I find with Clojure is that it's hard to revisit code and work out what it is doing. The main reasons I find for this are:

  • The lack of type annotations and the inherent documentation they provide.
  • Deep levels of function calls meaning you need to read the code right to left.
  • It's a type of Lisp.

The last one I can't change but the first two I will look to address.

  • Prismatic's Schema library is not a type system but it does allow you to define the shape of a data structure and then be able to validate that shape.
  • There has been a bit written in the Clojure community lately about writing readable Clojure and it has reinforced my practice of writing short functions using the threading macros.

For this parser, we define 2 record types using schema. The first, RM16Row, represents a typed row of the CSV data from the RM16 file. The other represents our resulting RM16Summary data type which stores the per jurisdiction and profile results.

;; =====================
;; Constants and Helpers
;; =====================

(defn string-reader
  [s]
  (StringReader. s))

(defn file-exists?
  [path]
  (.exists (File. path)))

(def AEMO-RM16-DATE-FORMAT     (format/formatter "yyyy/MM/dd"))
(def AEMO-RM16-DATETIME-FORMAT (format/formatter "yyyy/MM/dd HH:mm:ss"))

;; ==========
;; Data Types
;; ==========

;; This record represents the typed representation of a CSV row from the AEMO file.
(s/defrecord RM16Row
  [tni             :- String
   data-type       :- String
   frmp            :- String
   lr              :- String
   mdp             :- String
   profile-name    :- String
   creation-dt     :- LocalDateTime
   settlement-date :- LocalDate
   hhr-data        :- [s/Num]])

;; The summary data to be printed.
(s/defrecord RM16Summary
  [jurisdiction      :- String
   profile           :- String
   volume            :- BigDecimal])

;; Comparison function used for a custom sort.
(defn rm16-summary-compare
  [rm16-summary-a rm16-summary-b]
  (compare
    (str (:jurisdiction rm16-summary-a) (:profile rm16-summary-a))
    (str (:jurisdiction rm16-summary-b) (:profile rm16-summary-b))))

(defn seq-of-bigdec-from-csv-chunk
  [row-of-strings start-pos]
  (->> (nthrest row-of-strings start-pos)
       (drop-last)
       (map bigdec)))

(defn csvdata->rm16row
  [row-of-strings]
  ;; Validate will return the object or throw an exception if the
  ;; item doesn't validate
  (s/validate RM16Row
    (->RM16Row
      (nth row-of-strings 0)
      (nth row-of-strings 1)
      (nth row-of-strings 2)
      (nth row-of-strings 3)
      (nth row-of-strings 4)
      (nth row-of-strings 5)
      (format/parse-local           AEMO-RM16-DATETIME-FORMAT (nth row-of-strings 6))
      (format/parse-local-date      AEMO-RM16-DATE-FORMAT     (nth row-of-strings 7))
      (seq-of-bigdec-from-csv-chunk row-of-strings 8))))
blog4_part1.clj — view on GitHub

In addition to the normal behaviour of defrecord the schema.core version creates a schema that can be used to validate entities at any point of the program. We can see the validation being called in line 43 of the above snippet, validate will throw an exception if the provided data doesn't adhere to the schema.

So far I have found the little bit of additional formality provided by schema greatly enhances the readability of my code base.


extracting the demand data

The file we are looking to parse is an XML file containing a CSV block of data. Each line of the CSV block will be mapped into an RM16Row record. These will act as the base data for further transformations and calculations. To help with the XML parsing we are using the clj-xpath library. It has some helpful overloaded conversion functions like xml->doc and jquery style selectors like $:text that makes it easy to extract the CSV block from the file.

;; =============
;; Parsing Logic
;; =============

(defn parse-rm16-doc
  [file-path]
  (if (file-exists? file-path)
    (xml->doc (slurp file-path))
    (do
      (println (str "File " file-path " not found."))
      nil)))

(defn extract-csv-payload
  [rm16-doc]
  (line-seq
    (io/reader
      (string-reader ($x:text "//CSVData" rm16-doc)))))

(defn construct-rm16-data
  [csv-payload]
  (->>
    (map #(string/split % #",") csv-payload)
    (rest)
    (map csvdata->rm16row)))
blog4_part2.clj — view on GitHub

transforming the data

Now that we have the data a sequence of RM16Row row records we can transform it into RM16Summary records using Clojure's standard functions. The block below shows the functions used to perform the transformations. It also represents a nice example of my preferred style of Clojure at the moment.

Specifically:

  • Short functions are made up of let bindings to provide meaningful parameters to a result expression. Even short functions in Clojure are dense so I try and minimize the magic. Each function does a single task. It could be argued that some of the reduce operations in this snippet should themselves be extracted into functions with more descriptive names.
  • Thread these functions together in a pipeline that gives a descriptive step-by-step outline of what you are trying to achieve. I have been finding the some->> threading macro useful in these scenarios. It will short-circuit the operation if any function within the pipeline returns a nil value.
;; ====================
;; Transformation Logic
;; ====================

(defn sum-volume-for-jurisdiction-profile
  [rm16-summary-to-row-seq]
  (let [sum-volume   (/ (reduce +
                                (mapcat :hhr-data
                                        (val rm16-summary-to-row-seq)))
                        1000)]
    (s/validate RM16Summary
      (->RM16Summary (:jurisdiction (key rm16-summary-to-row-seq))
                     (:profile      (key rm16-summary-to-row-seq))
                     sum-volume))))

(defn jurisdiction-profile-key
  [rm16-row]
  (let [tni-lead-char (str (first (:tni rm16-row)))
        profile       (:profile-name rm16-row)
        jurisdiction (case tni-lead-char
                       "V" "VIC"
                       "N" "NSW"
                       "A" "ACT"
                       "Q" "QLD"
                       "S" "SA")]
    (->RM16Summary jurisdiction profile 0)))

(defn construct-mwh-per-jurisdiction-per-profile
  [seq-of-rm16row]
  (->>
    (group-by jurisdiction-profile-key seq-of-rm16row)
    (map      sum-volume-for-jurisdiction-profile)))

(defn print-summary
  [seq-of-rm16-summary]
  (let [total-volume (reduce + (map :volume seq-of-rm16-summary))]
    (doseq [summary seq-of-rm16-summary]
      (printf "Jurisdiction: %5s   Profile: %12s   Volume MWh: %13s \n"
              (:jurisdiction  summary)
              (:profile       summary)
              (:volume        summary)))
    (printf "Total MWh: %10s" total-volume)
    (flush)))

(defn process-file
  [file-path]
  ;; Some will continue to thread for non nil values
  (some->>
    (parse-rm16-doc file-path)
    (extract-csv-payload)
    (construct-rm16-data)
    (construct-mwh-per-jurisdiction-per-profile)
    (sort rm16-summary-compare)
    (print-summary)))
blog4_part3.clj — view on GitHub

the output

Below is the obfuscated output of the code after the jar has been created using lein uberjar. It was reasonably quick to get a solution working. While it took me longer to write than it would have in Groovy but I think the solution in Clojure lends itself to better abstractions.

I do like the interactive workflow in Clojure, it helps me get to the essence of my problem quicker. Like any coding session, you need to spend some time to build momentum when working with the REPL but once you do it really does provide an immersive development experience.

> java -jar rm16viewer-standalone.jar resources/mdmtl_#######.xml
Jurisdiction:   ACT   Profile:   CLOADNSWCE   Volume MWh:     ##.######
Jurisdiction:   ACT   Profile:       NOPROF   Volume MWh:     ##.######
Jurisdiction:   ACT   Profile:         NSLP   Volume MWh:     ##.######
Jurisdiction:   NSW   Profile:   CLOADNSWCE   Volume MWh:     ##.######
Jurisdiction:   NSW   Profile:   CLOADNSWEA   Volume MWh:     ##.######
Jurisdiction:   NSW   Profile:   CLOADNSWIE   Volume MWh:     ##.######
Jurisdiction:   NSW   Profile:       NOPROF   Volume MWh:     ##.######
Jurisdiction:   NSW   Profile:         NSLP   Volume MWh:     ##.######
Jurisdiction:   QLD   Profile:       NOPROF   Volume MWh:     ##.######
Jurisdiction:   QLD   Profile:         NSLP   Volume MWh:     ##.######
Jurisdiction:   QLD   Profile:   QLDEGXCL33   Volume MWh:     ##.######
Jurisdiction:    SA   Profile:       NOPROF   Volume MWh:     ##.######
Jurisdiction:    SA   Profile:         NSLP   Volume MWh:     ##.######
Jurisdiction:    SA   Profile:      SACLOAD   Volume MWh:     ##.######
Jurisdiction:   VIC   Profile:   CLOADNSWCE   Volume MWh:     ##.######
Jurisdiction:   VIC   Profile:       NOPROF   Volume MWh:     ##.######
Jurisdiction:   VIC   Profile:         NSLP   Volume MWh:     ##.######
Total MWh: ###.###
blog4_part4.sh — view on GitHub

complete code

(defproject rm16viewer "0.1.0-SNAPSHOT"
  :description "AEMO RM16 Parser"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure              "1.6.0"]
                 [com.github.kyleburton/clj-xpath  "1.4.3"]
                 [prismatic/schema                 "1.0.1"]
                 [org.clojure/tools.cli            "0.3.3"]
                 [clj-time                         "0.10.0"]]

  :main ^:skip-aot rm16viewer.core
  :target-path "target/%s"
  :profiles {:uberjar {:aot :all}})
blog4_part5.clj — view on GitHub
(ns rm16viewer.core
  (:require [clojure.string    :as string]
            [clojure.java.io   :as io]
            [clj-time.format   :as format]
            [schema.core       :as s]
            [clojure.tools.cli :refer [parse-opts]])
    (:use clj-xpath.core)
    (:import (java.io StringReader File)
             (org.joda.time LocalDateTime LocalDate))
    (:gen-class))

;; =====================
;; Constants and Helpers
;; =====================

(defn string-reader
  [s]
  (StringReader. s))

(defn file-exists?
  [path]
  (.exists (File. path)))

(def AEMO-RM16-DATE-FORMAT     (format/formatter "yyyy/MM/dd"))
(def AEMO-RM16-DATETIME-FORMAT (format/formatter "yyyy/MM/dd HH:mm:ss"))

;; ==========
;; Data Types
;; ==========

;; This record represents the typed representation of a CSV row from the AEMO file.
(s/defrecord RM16Row
  [tni             :- String
   data-type       :- String
   frmp            :- String
   lr              :- String
   mdp             :- String
   profile-name    :- String
   creation-dt     :- LocalDateTime
   settlement-date :- LocalDate
   hhr-data        :- [s/Num]])

;; The summary data to be printed.
(s/defrecord RM16Summary
  [jurisdiction      :- String
   profile           :- String
   volume            :- BigDecimal])

(defn rm16-summary-compare
  [rm16-summary-a rm16-summary-b]
  (compare
    (str (:jurisdiction rm16-summary-a) (:profile rm16-summary-a))
    (str (:jurisdiction rm16-summary-b) (:profile rm16-summary-b))))

(defn seq-of-bigdec-from-csv-chunk
  [row-of-strings start-pos]
  (->> (nthrest row-of-strings start-pos)
       (drop-last)
       (map bigdec)))

(defn csvdata->rm16row
  [row-of-strings]
  ;; Validate will return the object or throw an exception if the
  ;; item doesn't validate
  (s/validate RM16Row
    (->RM16Row
      (nth row-of-strings 0)
      (nth row-of-strings 1)
      (nth row-of-strings 2)
      (nth row-of-strings 3)
      (nth row-of-strings 4)
      (nth row-of-strings 5)
      (format/parse-local           AEMO-RM16-DATETIME-FORMAT (nth row-of-strings 6))
      (format/parse-local-date      AEMO-RM16-DATE-FORMAT     (nth row-of-strings 7))
      (seq-of-bigdec-from-csv-chunk row-of-strings 8))))

;; =============
;; Parsing Logic
;; =============

(defn parse-rm16-doc
  [file-path]
  (if (file-exists? file-path)
    (xml->doc (slurp file-path))
    (do
      (println (str "File " file-path " not found."))
      nil)))

(defn extract-csv-payload
  [rm16-doc]
  (line-seq
    (io/reader
      (string-reader ($x:text "//CSVData" rm16-doc)))))

(defn construct-rm16-data
  [csv-payload]
  (->>
    (map #(string/split % #",") csv-payload)
    (rest)
    (map csvdata->rm16row)))

;; ====================
;; Transformation Logic
;; ====================

(defn sum-volume-for-jurisdiction-profile
  [rm16-summary-to-row-seq]
  (let [sum-volume   (/ (reduce +
                                (mapcat :hhr-data
                                        (val rm16-summary-to-row-seq)))
                        1000)]
    (s/validate RM16Summary
      (->RM16Summary (:jurisdiction (key rm16-summary-to-row-seq))
                     (:profile      (key rm16-summary-to-row-seq))
                     sum-volume))))

(defn jurisdiction-profile-key
  [rm16-row]
  (let [tni-lead-char (str (first (:tni rm16-row)))
        profile       (:profile-name rm16-row)
        jurisdiction (case tni-lead-char
                       "V" "VIC"
                       "N" "NSW"
                       "A" "ACT"
                       "Q" "QLD"
                       "S" "SA")]
    (->RM16Summary jurisdiction profile 0)))

(defn construct-mwh-per-jurisdiction-per-profile
  [seq-of-rm16row]
  (->>
    (group-by jurisdiction-profile-key seq-of-rm16row)
    (map      sum-volume-for-jurisdiction-profile)))

(defn print-summary
  [seq-of-rm16-summary]
  (let [total-volume (reduce + (map :volume seq-of-rm16-summary))]
    (doseq [summary seq-of-rm16-summary]
      (printf "Jurisdiction: %5s   Profile: %12s   Volume MWh: %13s \n"
              (:jurisdiction  summary)
              (:profile       summary)
              (:volume        summary)))
    (printf "Total MWh: %10s" total-volume)
    (flush)))

(defn process-file
  [file-path]
  ;; Some will continue to thread for non nil values
  (some->>
    (parse-rm16-doc file-path)
    (extract-csv-payload)
    (construct-rm16-data)
    (construct-mwh-per-jurisdiction-per-profile)
    (sort rm16-summary-compare)
    (print-summary)))

;; =====================
;; Command Line and Main
;; =====================

(defn exit [status msg]
  (println msg)
  (System/exit status))

(def command-line-schema
  [["-h" "--help"]])

(defn usage
  [options-summary]
  (->> ["Summarize the Volume in an AEMO RM16 File."
        ""
        "Usage: rm16viewer fileName"
        ""
        "Options:"
        options-summary]
       (string/join \newline)))

(defn -main
  [& args]
  (let [cli-params (parse-opts args command-line-schema)
        options    (:options cli-params)
        summary    (:summary cli-params)
        arguments  (:arguments cli-params)]
    (cond
      (:help options)            (exit 0 (usage summary))
      (not= (count arguments) 1) (exit 1 (usage summary))
      :default                   (process-file (first arguments)))))
blog4_part6.clj — view on GitHub