Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 33 additions & 63 deletions impl/v5/src/house/jux__/prevayler_impl5__.clj
Original file line number Diff line number Diff line change
@@ -1,50 +1,10 @@
; TODO: Cooperative Consistency Across Multiple Instances Sharing the Same Network Dir
; ====================================================================================
;
; Init
; Loop until successful:
; Delete all files inside the "lock" folder, if any.
; Delete the "lock" folder, if it exists.
; Create "lock" folder. This is important: I must create it, it can't already exist.
; Create owner-{uuid}.a file.
;
; Wait 1min, so the old owner, if any, can yield.
; Loop every 30s, to see if I still am the owner: (lease mechanism)
; Rename lock/owner-{uuid}.a file to lock/owner-{uuid}.b or vice-versa.
; If success: lastSuccessfulLeaseCheck atom = currentTimeMillis; error atom = nil.
; If neither file (a nor b) exist: deposed atom = true
; If some other error: error atom = error message
;
; Check before every write: Prevayler is free to write if:
; Not deposed
; AND Not error
; AND lastSuccessfulLeaseCheck newer than 40s.
;
;
; TODO: Bonus: Extra Resilience Against Zombie Journal Writes
; -----------------------------------------------------------
;
; Zombie writes are writes arriving late at the server from some old dead owner that were buffered on a temporarily disconnected network client.
; To ignore zombie journal appends:
;
; For each journal to read:
; Does a file called {journal-number}.journal-metadata5 exist?
; No: read the whole journal and write this metadata file with the number of events I just read.
; Yes: read only as many events as the amount recorded in the metadata file.
;
; To avoid late zombie journal file creation from overriding existing journals:
; Before creating a new journal file:
; Create a lock/{journal-number}.journal-lock directory (MKDIR the only atomic mutually-exclusive operation we can trust. Even file renames will override each other.)
; If it already exists, but the journal file does not, treat the same as an empty journal.



(ns house.jux--.prevayler-impl5--
(:require
[clojure.java.io :as io]
[house.jux--.prevayler-- :as api]
[house.jux--.prevayler-impl5--.util :refer [check data-input-stream data-output-stream filename-number journal-ending journals part-file-ending rename! root-cause snapshot-ending snapshots]]
[house.jux--.prevayler-impl5--.cleanup :as cleanup]
[house.jux--.prevayler-impl5--.write-lease :as write-lease]
[taoensso.nippy :as nippy])
(:import
[clojure.lang IDeref]
Expand Down Expand Up @@ -102,53 +62,62 @@
(nippy/freeze-to-out! data-out value)
(.flush data-out))

(defn- write-snaphot! [{:keys [state journal-index]} dir]
(defn- write-snaphot! [{:keys [state journal-index]} dir-lease]
(let [snapshot-name (format (str filename-number-mask snapshot-ending) journal-index)
part-file (io/file dir (str snapshot-name part-file-ending))]
part-file (io/file (write-lease/dir dir-lease) (str snapshot-name part-file-ending))]
(println "Writing snapshot" snapshot-name)
(with-open [^Closeable out (data-output-stream part-file)] ; Overrides old .part file if any.
(write-with-flush! out state))
(rename! part-file (io/file dir snapshot-name))))
(write-lease/check! dir-lease)
(rename! part-file (io/file (write-lease/dir dir-lease) snapshot-name))))

(defn last-snapshot-file [dir]
(last (snapshots dir)))

(defn- restore-snapshot-if-necessary! [initial-state-envelope dir]
(if-some [snapshot-file (last-snapshot-file dir)]
(defn- restore-snapshot-if-necessary! [initial-state-envelope dir-lease]
(if-some [snapshot-file (last-snapshot-file (write-lease/dir dir-lease))]

{:state (restore-snapshot! snapshot-file)
:journal-index (filename-number snapshot-file)}

(do
(write-snaphot! initial-state-envelope dir)
(write-snaphot! initial-state-envelope dir-lease)
initial-state-envelope)))

(defn- restore! [dir handler initial-state]
(defn- restore! [handler initial-state dir-lease]
(-> {:state initial-state, :journal-index 0}
(restore-snapshot-if-necessary! dir)
(restore-journals-if-necessary! dir handler)))
(restore-snapshot-if-necessary! dir-lease)
(restore-journals-if-necessary! (write-lease/dir dir-lease) handler)))

(defn- start-new-journal! [dir journal-index]
(let [file (io/file dir (format (str filename-number-mask journal-ending) journal-index))]
(defn- start-new-journal! [dir-lease journal-index]
(write-lease/check! dir-lease)
(let [file (io/file (write-lease/dir dir-lease) (format (str filename-number-mask journal-ending) journal-index))]
(check (not (.exists file)) (str "journal file already exists, index: " journal-index))
(-> file data-output-stream)))

(def delete-old-snapshots! cleanup/delete-old-snapshots!)

(defn prevayler! [{:keys [dir initial-state business-fn timestamp-fn]
(defn prevayler! [{:keys [dir initial-state business-fn timestamp-fn sleep-interval]
:or {initial-state {}
timestamp-fn #(System/currentTimeMillis)}}]

timestamp-fn #(System/currentTimeMillis)
sleep-interval 30000}}]

(let [^File dir (io/file dir)
state-envelope-atom (atom (restore! dir business-fn initial-state))
journal-out-atom (atom (start-new-journal! dir (:journal-index @state-envelope-atom)))
journal-out-atom (atom nil)
close-journal! #(when-let [journal-out @journal-out-atom]
(.close ^Closeable journal-out)) ; TODO: Call .getFD().sync() on the underlying FileOutputStream to minimize zombie writes (writes that arrive late at the server because they were buffered at the client during a network hiccup)
dir-lease (write-lease/acquire-for! dir sleep-interval close-journal!)
state-envelope-atom (atom (restore! business-fn initial-state dir-lease))
snapshot-monitor (Object.)]


(reset! journal-out-atom (start-new-journal! dir-lease (:journal-index @state-envelope-atom)))

(reify
api/Prevayler

(handle! [this event]
(locking journal-out-atom ; (I)solation: strict serializability.
(write-lease/check! dir-lease)
(let [{:keys [state]} @state-envelope-atom
timestamp (timestamp-fn)
new-state (business-fn state event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.
Expand All @@ -164,15 +133,16 @@
(snapshot! [_]
(locking snapshot-monitor
(let [envelope (locking journal-out-atom
(.close ^Closeable @journal-out-atom)
(write-lease/check! dir-lease)
(close-journal!)
(let [envelope (swap! state-envelope-atom update :journal-index inc)
new-journal (start-new-journal! dir (:journal-index envelope))] ; Prevayler remains closed if this throws Exception. TODO: Recover from what might have been just a network volume hiccup.
(reset! journal-out-atom new-journal)
new-journal (start-new-journal! dir-lease (:journal-index envelope))] ; Prevayler remains closed if this throws Exception. TODO: Recover from what might have been just a network volume hiccup.
(reset! journal-out-atom new-journal)
envelope))]
(write-snaphot! envelope dir))))
(write-snaphot! envelope dir-lease))))

(timestamp [_] (timestamp-fn))

IDeref (deref [_] (:state @state-envelope-atom))

Closeable (close [_] (.close ^Closeable @journal-out-atom)))))
Closeable (close [_] (close-journal!)))))
97 changes: 97 additions & 0 deletions impl/v5/src/house/jux__/prevayler_impl5__/write_lease.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
; TODO: Cooperative Consistency Across Multiple Instances Sharing the Same Network Dir
; ====================================================================================
;
; Init
; Loop until successful:
; Delete all files inside the "lock" folder, if any.
; Delete the "lock" folder, if it exists.
; Create "lock" folder. This is important: I must create it, it can't already exist.
; Create owner-{uuid}.a file.
;
; Wait 1min, so the old owner, if any, can yield.
; Loop every 30s, to see if I still am the owner: (lease mechanism)
; Rename lock/owner-{uuid}.a file to lock/owner-{uuid}.b or vice-versa.
; If success: lastSuccessfulLeaseCheck atom = currentTimeMillis; error atom = nil.
; If neither file (a nor b) exist: deposed atom = true
; If some other error: error atom = error message
;
; Check before every write: Prevayler is free to write if:
; Not deposed
; AND Not error
; AND lastSuccessfulLeaseCheck newer than 40s.
;
;
; TODO: Bonus: Extra Resilience Against Zombie Journal Writes
; -----------------------------------------------------------
;
; Zombie writes are writes arriving late at the server from some old dead owner that were buffered on a temporarily disconnected network client.
; To ignore zombie journal appends:
;
; For each journal to read:
; Does a file called {journal-number}.journal-metadata5 exist?
; No: read the whole journal and write this metadata file with the number of events I just read.
; Yes: read only as many events as the amount recorded in the metadata file.
;
; To avoid late zombie journal file creation from overriding existing journals:
; Before creating a new journal file:
; Create a lock/{journal-number}.journal-lock directory (MKDIR the only atomic mutually-exclusive operation we can trust. Even file renames will override each other.)
; If it already exists, but the journal file does not, treat the same as an empty journal.

(ns house.jux--.prevayler-impl5--.write-lease
(:require [clojure.java.io :as io]))

(set! *warn-on-reflection* true)

(defn dir [lease]
(-> lease deref :dir))

(defn- try-to-replace-lock-dir [^java.io.File lock-dir ^java.io.File new-lock-dir]
(doseq [^java.io.File file (.listFiles lock-dir)]
(.delete file))
(.delete lock-dir)
(.renameTo new-lock-dir lock-dir))

(.delete (io/file "xyz"))

(defn acquire-for!
"Acquires a virtual, exclusive lease to write on dir. Implementation:
- Deletes the lock file of the previous owner, if any. Deletes the 'lock' directory in dir.
- Creates the 'lock' directory atomically (via rename) in dir with our own lock file in it.
- Starts to periodically refresh the lease (rename our lock file to make sure it still exists).
Cooperatively and definitively yields the lease when some other owner deletes our lock file.
Returns the lease (an opaque handle) to be used with the other fns in this namespace.
Calls on-deposed when we gain knowledge that some other owner (normally a different machine) has taken over the lease."
;; This "opaque handle + fns" smells like a protocol.
[dir sleep-interval _on-deposed]
(let [res (atom {:dir dir
:last-successful-lease-check (System/currentTimeMillis)
:sleep-interval sleep-interval})]
(let [uuid (-> (java.util.UUID/randomUUID) str)
lock-dir (io/file dir "lock")
new-lock-dir (doto (io/file dir (str "lock-" uuid))
(.mkdir))
lock-file (doto (io/file lock-dir (str uuid ".a"))
(spit ""))]
(loop []
(when-not (try-to-replace-lock-dir lock-dir new-lock-dir)
(recur)))
(future
(loop []
;; TODO implement real check
(swap! res assoc :last-successful-lease-check (System/currentTimeMillis))
(Thread/sleep ^long sleep-interval)
(recur)))
res)))

(defn check!
"Throws an exception with a helpful message in these situations:
- The lock file has been deleted (normally by another owner taking over).
- The last attempt to refresh the lease (rename our lock file) has failed and we cannot be certain whether the file still exists or has been deleted."
[my-lease]
(let [{:keys [deposed error-message last-successful-lease-check sleep-interval]} @my-lease]
(when deposed
(throw (ex-info "This instance is unable to write" {})))
(when error-message
(throw (ex-info "This instance is temporarily unable to write" {:error-message error-message})))
(when (> (- (System/currentTimeMillis) last-successful-lease-check) (* sleep-interval 1.2))
(throw (ex-info "This instance is temporarily unable to write" {:last-successfule-lease-check last-successful-lease-check})))))
16 changes: 10 additions & 6 deletions impl/v5/test/prevayler_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@
(apply +)
(check-positive "Total file length should be positive.")))

(defn counter [start] ; Deterministic timestamps.
(let [counter-atom (atom start)]
#(swap! counter-atom inc)))

(def t0 1600000000000) ; System/currentTimeMillis at some arbitrary moment in the past.

(defn- file-names [dir ending]
Expand All @@ -58,9 +54,11 @@
(is (.delete (File. prevayler-dir "000000000.journal5")))))

(let [prevayler-dir (tmp-dir)
counter-atom (atom t0)
options {:initial-state initial-state
:business-fn contact-list
:timestamp-fn (counter t0)
:timestamp-fn #(swap! counter-atom inc)
:sleep-period 10000 ; TODO reduce this number when we fix infinite loop
:dir prevayler-dir}
prev! #(prevayler! options)]

Expand Down Expand Up @@ -159,4 +157,10 @@
(delete-old-snapshots! prevayler-dir {:keep 0}))) ; At least one must be kept

(with-open [p (prev!)]
(is (= ["Ann" "Bob" "Cid" "Dan" "Edd"] (:contacts @p)))))))
(is (= ["Ann" "Bob" "Cid" "Dan" "Edd"] (:contacts @p)))))

#_(testing ""
(with-open [p1 (prev!)]
(with-open [_ (prev!)]
(is (thrown? Exception
(handle! p1 "Boom"))))))))