diff --git a/README.md b/README.md index 733b79c..1b1600e 100644 --- a/README.md +++ b/README.md @@ -17,24 +17,25 @@ The goal is to provide: - A composable pipeline for creating ReQL statements: ```fsharp -/// string -> (IConnection -> Async) +/// string -> (IConnection -> Task) let fetchPost (postId : string) = - fromDb "Blog" - |> table "Post" - |> get postId - |> asyncResult + fromDb "Blog" + |> table "Post" + |> get postId + |> runResult + |> withRetryDefault ``` - An F# domain-specific language (DSL) using a `rethink` computation expression: ```fsharp -/// string -> (IConnection -> Async) +/// string -> (IConnection -> Task) let fetchPost (postId : string) = - rethink { - fromDb "Blog" - table "Post" - get postId - asyncResult + rethink { + withTableInDb "Post" "Blog" + get postId + result + withRetryDefault } ``` diff --git a/src/RethinkDb.Driver.FSharp/Builder.fs b/src/RethinkDb.Driver.FSharp/Builder.fs index a3cab4d..dfc34c4 100644 --- a/src/RethinkDb.Driver.FSharp/Builder.fs +++ b/src/RethinkDb.Driver.FSharp/Builder.fs @@ -1,13 +1,19 @@ [] module RethinkDb.Driver.FSharp.RethinkBuilder -open Polly open RethinkDb.Driver open RethinkDb.Driver.Ast open RethinkDb.Driver.Net -open System open System.Threading.Tasks +/// Options for RethinkDB indexes +type IndexOption = + /// Index multiple values in the given field + | Multi + /// Create a geospatial index + | Geospatial + + /// Computation Expression builder for RethinkDB queries type RethinkBuilder<'T> () = @@ -19,20 +25,6 @@ type RethinkBuilder<'T> () = fields |> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ()) - /// Create a retry policy that attempts to reconnect to RethinkDB on each retry - let retryPolicy (intervals : float seq) (conn : IConnection) = - Policy - .Handle() - .WaitAndRetryAsync( - intervals |> Seq.map TimeSpan.FromSeconds, - System.Action (fun ex _ _ _ -> - printf $"Encountered RethinkDB exception: {ex.Message}" - match ex.Message.Contains "socket" with - | true -> - printf "Reconnecting to RethinkDB" - (conn :?> Connection).Reconnect false - | false -> ())) - member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr member this.For (expr, f) = this.Bind (expr, f) @@ -77,6 +69,11 @@ type RethinkBuilder<'T> () = [] member _.IndexCreate (tbl : Table, index : string, f : ReqlExpr -> obj) = tbl.IndexCreate (index, ReqlFunction1 f) + /// Specify options for certain types of indexes + [] + member _.IndexOption (idx : IndexCreate, opt : IndexOption) = + idx.OptArg ((match opt with Multi -> "multi" | Geospatial -> "geo"), true) + // database/table identification /// Specify a database for further commands @@ -189,7 +186,7 @@ type RethinkBuilder<'T> () = /// Execute the query, returning the result of the type specified [] member _.Result (expr : ReqlExpr) : IConnection -> Task<'T> = - fun conn -> task { + fun conn -> backgroundTask { return! expr.RunResultAsync<'T> conn } @@ -201,7 +198,7 @@ type RethinkBuilder<'T> () = /// Execute the query, returning the result of the type specified, or None if no result is found [] member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> = - fun conn -> task { + fun conn -> backgroundTask { let! result = expr.RunResultAsync<'T> conn return match (box >> isNull) result with true -> None | false -> Some result } @@ -227,9 +224,9 @@ type RethinkBuilder<'T> () = /// Ignore the result of an operation [] - member _.IgnoreResult (f : IConnection -> Task<'T>) = + member _.IgnoreResult<'T> (f : IConnection -> Task<'T>) = fun conn -> task { - let! _ = f conn + let! _ = (f conn).ConfigureAwait false () } @@ -237,7 +234,7 @@ type RethinkBuilder<'T> () = [] member _.IgnoreResult (f : IConnection -> Task<'T option>) = fun conn -> task { - let! _ = f conn + let! _ = (f conn).ConfigureAwait false () } @@ -256,16 +253,12 @@ type RethinkBuilder<'T> () = /// Retries a variable number of times, waiting each time for the seconds specified [] member _.WithRetry (f : IConnection -> Task<'T>, retries) = - fun conn -> task { - return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn) - } + Retry.withRetry f retries /// Retries a variable number of times, waiting each time for the seconds specified [] member _.WithRetry (f : IConnection -> Task<'T option>, retries) = - fun conn -> task { - return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn) - } + Retry.withRetry f retries /// Retries a variable number of times, waiting each time for the seconds specified [] @@ -279,43 +272,43 @@ type RethinkBuilder<'T> () = /// Retries at 200ms, 500ms, and 1s [] - member this.WithRetryDefault (f : IConnection -> Task<'T>) = - this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) + member _.WithRetryDefault (f : IConnection -> Task<'T>) = + Retry.withRetryDefault f /// Retries at 200ms, 500ms, and 1s [] - member this.WithRetryDefault (f : IConnection -> Task<'T option>) = - this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) + member _.WithRetryDefault (f : IConnection -> Task<'T option>) = + Retry.withRetryDefault f /// Retries at 200ms, 500ms, and 1s [] member this.WithRetryDefault (f : IConnection -> Task<'T>, conn) = - this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn + this.WithRetryDefault f conn /// Retries at 200ms, 500ms, and 1s [] member this.WithRetryDefault (f : IConnection -> Task<'T option>, conn) = - this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn + this.WithRetryDefault f conn /// Retries once immediately [] - member this.WithRetryOnce (f : IConnection -> Task<'T>) = - this.WithRetry (f, [ 0.0 ]) + member _.WithRetryOnce (f : IConnection -> Task<'T>) = + Retry.withRetryOnce f /// Retries once immediately [] - member this.WithRetryOnce (f : IConnection -> Task<'T option>) = - this.WithRetry (f, [ 0.0 ]) + member _.WithRetryOnce (f : IConnection -> Task<'T option>) = + Retry.withRetryOnce f /// Retries once immediately [] member this.WithRetryOnce (f : IConnection -> Task<'T>, conn) = - this.WithRetry (f, [ 0.0 ]) conn + this.WithRetryOnce f conn /// Retries once immediately [] member this.WithRetryOnce (f : IConnection -> Task<'T option>, conn) = - this.WithRetry (f, [ 0.0 ]) conn + this.WithRetryOnce f conn /// RethinkDB computation expression diff --git a/src/RethinkDb.Driver.FSharp/Functions.fs b/src/RethinkDb.Driver.FSharp/Functions.fs index df7f390..a6224f8 100644 --- a/src/RethinkDb.Driver.FSharp/Functions.fs +++ b/src/RethinkDb.Driver.FSharp/Functions.fs @@ -4,330 +4,343 @@ module RethinkDb.Driver.FSharp.Functions open RethinkDb.Driver open RethinkDb.Driver.Ast -let private r = RethinkDB.R +[] +module private Helpers = + /// Shorthand for the starting point for ReQL commands + let r = RethinkDB.R + + /// Create a Javascript object from a string (used mostly for type inference) + let toJS (js : string) = Javascript js /// Get a cursor with the results of an expression let asyncCursor<'T> conn (expr : ReqlExpr) = - expr.RunCursorAsync<'T> conn - |> Async.AwaitTask + expr.RunCursorAsync<'T> conn + |> Async.AwaitTask /// Get the result of a non-select ReQL expression let asyncReqlResult conn (expr : ReqlExpr) = - expr.RunWriteAsync conn - |> Async.AwaitTask + expr.RunWriteAsync conn + |> Async.AwaitTask +/// Write a ReQL command, always returning a result +let runWriteResult (expr : ReqlExpr) = + expr.RunWriteAsync + +/// Write a ReQL command, raising an exception if an error occurs +let runWrite (expr : ReqlExpr) = fun conn -> backgroundTask { + let! result = expr.RunWriteAsync conn + if result.Errors > 0UL then raise <| ReqlRuntimeError result.FirstError + return result +} + /// Get the results of an expression let asyncResult<'T> conn (expr : ReqlExpr) = - expr.RunResultAsync<'T> conn - |> Async.AwaitTask + expr.RunResultAsync<'T> conn + |> Async.AwaitTask + +/// Run the ReQL command, returning the result as the type specified +let runResult<'T> (expr : ReqlExpr) = expr.RunResultAsync<'T> /// Get documents between a lower bound and an upper bound based on a primary key -let between lowerKey upperKey (expr : ReqlExpr) = - expr.Between (lowerKey, upperKey) +let between (lowerKey : obj) (upperKey : obj) (expr : ReqlExpr) = + expr.Between (lowerKey, upperKey) /// Get document between a lower bound and an upper bound, specifying one or more optional arguments -let betweenWithOptArgs lowerKey upperKey (args : (string * _) seq) (expr : ReqlExpr) = - args - |> List.ofSeq - |> List.fold (fun (btw : Between) arg -> btw.OptArg (fst arg, snd arg)) (between lowerKey upperKey expr) +let betweenWithOptArgs (lowerKey : obj) (upperKey : obj) (args : (string * obj) seq) (expr : ReqlExpr) = + args + |> Seq.fold (fun (btw : Between) arg -> btw.OptArg (fst arg, snd arg)) (between lowerKey upperKey expr) /// Get documents between a lower bound and an upper bound based on an index -let betweenIndex lowerKey upperKey (index : string) (expr : ReqlExpr) = - betweenWithOptArgs lowerKey upperKey [ "index", index ] expr +let betweenIndex (lowerKey : obj) (upperKey : obj) (index : string) (expr : ReqlExpr) = + betweenWithOptArgs lowerKey upperKey [ "index", index ] expr /// Get a connection builder that can be used to create one RethinkDB connection let connection () = - r.Connection () + r.Connection () /// Reference a database let db dbName = - r.Db dbName + r.Db dbName /// Create a database -let dbCreate dbName conn = - r.DbCreate dbName - |> asyncReqlResult conn +let dbCreate (dbName : string) = + r.DbCreate dbName /// Drop a database -let dbDrop dbName conn = - r.DbDrop dbName - |> asyncReqlResult conn +let dbDrop (dbName : string) = + r.DbDrop dbName /// Get a list of databases -let dbList conn = - r.DbList () - |> asyncResult conn +let dbList () = + r.DbList () /// Delete documents let delete (expr : ReqlExpr) = - expr.Delete () + expr.Delete () /// Delete documents, providing optional arguments -let deleteWithOptArgs args expr = - args - |> List.ofSeq - |> List.fold (fun (del : Delete) arg -> del.OptArg (fst arg, snd arg)) (delete expr) +let deleteWithOptArgs (args : (string * obj) seq) expr = + args |> Seq.fold (fun (del : Delete) arg -> del.OptArg (fst arg, snd arg)) (delete expr) /// EqJoin the left field on the right-hand table using its primary key -let eqJoin field (table : Table) (expr : ReqlExpr) = - expr.EqJoin (field :> obj, table) +let eqJoin (field : string) (table : Table) (expr : ReqlExpr) = + expr.EqJoin (field, table) /// EqJoin the left function on the right-hand table using its primary key -let eqJoinFunc (f : ReqlExpr -> 'T) (table : Table) (expr : ReqlExpr) = - expr.EqJoin (ReqlFunction1 (fun row -> upcast f row), table) +let eqJoinFunc<'T> (f : ReqlExpr -> 'T) (table : Table) (expr : ReqlExpr) = + expr.EqJoin (ReqlFunction1 (fun row -> f row :> obj), table) /// EqJoin the left function on the right-hand table using the specified index -let eqJoinFuncIndex f table (indexName : string) expr = - (eqJoinFunc f table expr).OptArg ("index", indexName) +let eqJoinFuncIndex<'T> (f : ReqlExpr -> 'T) table (indexName : string) expr = + (eqJoinFunc f table expr).OptArg ("index", indexName) /// EqJoin the left field on the right-hand table using the specified index let eqJoinIndex field table (indexName : string) expr = - (eqJoin field table expr).OptArg ("index", indexName) + (eqJoin field table expr).OptArg ("index", indexName) /// EqJoin the left JavaScript on the right-hand table using its primary key -let eqJoinJS (jsString : string) (table : Table) (expr : ReqlExpr) = - expr.EqJoin (Javascript (jsString :> obj), table) +let eqJoinJS js (table : Table) (expr : ReqlExpr) = + expr.EqJoin (toJS js, table) /// EqJoin the left JavaScript on the right-hand table using the specified index -let eqJoinJSIndex jsString table (indexName : string) expr = - (eqJoinJS jsString table expr).OptArg ("index", indexName) +let eqJoinJSIndex js table (indexName : string) expr = + (eqJoinJS js table expr).OptArg ("index", indexName) /// Filter documents -let filter filterSpec (expr : ReqlExpr) = - expr.Filter (filterSpec :> obj) - +let filter (filterSpec : obj) (expr : ReqlExpr) = + expr.Filter filterSpec + +/// Apply optional arguments to a filter +let private optArgsFilter (args : (string * obj) seq) filter = + args |> Seq.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) filter + /// Filter documents, providing optional arguments -let filterWithOptArgs filterSpec args expr = - args - |> List.ofSeq - |> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filter filterSpec expr) +let filterWithOptArgs (filterSpec : obj) args expr = + filter filterSpec expr |> optArgsFilter args /// Filter documents using a function -let filterFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = - expr.Filter (ReqlFunction1 (fun row -> upcast f row)) +let filterFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) = + expr.Filter (ReqlFunction1 (fun row -> f row :> obj)) /// Filter documents using a function, providing optional arguments -let filterFuncWithOptArgs f args expr = - args - |> List.ofSeq - |> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filterFunc f expr) +let filterFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr = + filterFunc f expr |> optArgsFilter args /// Filter documents using JavaScript -let filterJS jsString (expr : ReqlExpr) = - expr.Filter (Javascript (jsString :> obj)) +let filterJS js (expr : ReqlExpr) = + expr.Filter (toJS js) /// Filter documents using JavaScript, providing optional arguments -let filterJSWithOptArgs jsString args expr = - args - |> List.ofSeq - |> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filterJS jsString expr) +let filterJSWithOptArgs js args expr = + filterJS js expr |> optArgsFilter args /// Get a document by its primary key -let get documentId (table : Table) = - table.Get documentId +let get (documentId : obj) (table : Table) = + table.Get documentId /// Get all documents matching keys in the given index -let getAll (ids : 'T seq) indexName (table : Table) = - table.GetAll(Array.ofSeq ids).OptArg ("index", indexName) +let getAll (ids : obj seq) (indexName : string) (table : Table) = + table.GetAll(Array.ofSeq ids).OptArg ("index", indexName) /// Create an index on the given table -let indexCreate indexName conn (table : Table) = - table.IndexCreate indexName - |> asyncReqlResult conn +let indexCreate (indexName : string) (table : Table) = + table.IndexCreate indexName /// Create an index on the given table using a function -let indexCreateFunc indexName (f : ReqlExpr -> 'T) conn (table : Table) = - table.IndexCreate (indexName, ReqlFunction1 (fun row -> upcast f row)) - |> asyncReqlResult conn +let indexCreateFunc<'T> (indexName : string) (f : ReqlExpr -> 'T) (table : Table) = + table.IndexCreate (indexName, ReqlFunction1 (fun row -> f row :> obj)) /// Create an index on the given table using JavaScript -let indexCreateJS indexName jsString conn (table : Table) = - table.IndexCreate (indexName, Javascript (jsString :> obj)) - |> asyncReqlResult conn +let indexCreateJS (indexName : string) js (table : Table) = + table.IndexCreate (indexName, toJS js) /// Drop an index -let indexDrop indexName conn (table : Table) = - table.IndexDrop indexName - |> asyncReqlResult conn +let indexDrop (indexName : string) (table : Table) = + table.IndexDrop indexName /// Get a list of indexes for the given table -let indexList conn (table : Table) = - table.IndexList () - |> asyncResult conn +let indexList (table : Table) = + table.IndexList () /// Rename an index (overwrite will fail) -let indexRename oldName newName conn (table : Table) = - table.IndexRename (oldName, newName) - |> asyncReqlResult conn +let indexRename (oldName : string) (newName : string) (table : Table) = + table.IndexRename (oldName, newName) /// Rename an index (overwrite will succeed) -let indexRenameWithOverwrite oldName newName conn (table : Table) = - table.IndexRename(oldName, newName).OptArg ("overwrite", true) - |> asyncReqlResult conn +let indexRenameWithOverwrite (oldName : string) (newName : string) (table : Table) = + table.IndexRename(oldName, newName).OptArg ("overwrite", true) /// Create an inner join between two sequences, specifying the join condition with a function -let innerJoinFunc otherSeq (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = - expr.InnerJoin (otherSeq, ReqlFunction2 (fun leftRow rightRow -> upcast f leftRow rightRow)) +let innerJoinFunc<'T> (otherSeq : obj) (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = + expr.InnerJoin (otherSeq, ReqlFunction2 (fun f1 f2 -> f f1 f2 :> obj)) /// Create an inner join between two sequences, specifying the join condition with JavaScript -let innerJoinJS otherSeq jsString (expr : ReqlExpr) = - expr.InnerJoin (otherSeq, Javascript (jsString :> obj)) +let innerJoinJS (otherSeq : obj) js (expr : ReqlExpr) = + expr.InnerJoin (otherSeq, toJS js) + +/// Apply optional arguments to an insert +let private optArgsInsert (args : (string * obj) seq) ins = + args |> Seq.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) ins /// Insert a single document (use insertMany for multiple) -let insert doc (table : Table) = - table.Insert doc +let insert<'T> (doc : 'T) (table : Table) = + table.Insert doc /// Insert multiple documents -let insertMany docs (table : Table) = - table.Insert <| Array.ofSeq docs +let insertMany<'T> (docs : 'T seq) (table : Table) = + table.Insert (Array.ofSeq docs) /// Insert a single document, providing optional arguments (use insertManyWithOptArgs for multiple) -let insertWithOptArgs doc (args : (string * _) seq) (table : Table) = - args - |> List.ofSeq - |> List.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) (insert doc table) +let insertWithOptArgs<'T> (doc : 'T) args table = + insert doc table |> optArgsInsert args /// Insert multiple documents, providing optional arguments -let insertManyWithOptArgs docs (args : (string * _) seq) (table : Table) = - args - |> List.ofSeq - |> List.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) (insertMany docs table) +let insertManyWithOptArgs<'T> (docs : 'T seq) args table = + insertMany docs table |> optArgsInsert args /// Test whether a sequence is empty let isEmpty (expr : ReqlExpr) = - expr.IsEmpty () + expr.IsEmpty () /// End a sequence after a given number of elements let limit n (expr : ReqlExpr) = - expr.Limit n + expr.Limit n /// Retrieve the nth element in a sequence let nth n (expr : ReqlExpr) = - expr.Nth n + expr.Nth n /// Create an outer join between two sequences, specifying the join condition with a function -let outerJoinFunc otherSeq (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = - expr.OuterJoin (otherSeq, ReqlFunction2 (fun leftRow rightRow -> upcast f leftRow rightRow)) +let outerJoinFunc<'T> (otherSeq : obj) (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = + expr.OuterJoin (otherSeq, ReqlFunction2 (fun f1 f2 -> f f1 f2 :> obj)) /// Create an outer join between two sequences, specifying the join condition with JavaScript -let outerJoinJS otherSeq jsString (expr : ReqlExpr) = - expr.OuterJoin (otherSeq, Javascript (jsString :> obj)) +let outerJoinJS (otherSeq : obj) js (expr : ReqlExpr) = + expr.OuterJoin (otherSeq, toJS js) /// Select one or more attributes from an object or sequence let pluck (fields : string seq) (expr : ReqlExpr) = - expr.Pluck (Array.ofSeq fields) + expr.Pluck (Array.ofSeq fields) + +/// Apply optional arguments to a replace +let private optArgsReplace (args : (string * obj) seq) repl = + args |> Seq.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) repl /// Replace documents -let replace replaceSpec (expr : ReqlExpr) = - expr.Replace (replaceSpec :> obj) +let replace<'T> (replaceSpec : 'T) (expr : ReqlExpr) = + expr.Replace replaceSpec /// Replace documents, providing optional arguments -let replaceWithOptArgs replaceSpec args expr = - args - |> List.ofSeq - |> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replace replaceSpec expr) +let replaceWithOptArgs<'T> (replaceSpec : 'T) args expr = + replace replaceSpec expr |> optArgsReplace args /// Replace documents using a function -let replaceFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = - expr.Replace (ReqlFunction1 (fun row -> upcast f row)) +let replaceFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) = + expr.Replace (ReqlFunction1 (fun row -> f row :> obj)) /// Replace documents using a function, providing optional arguments -let replaceFuncWithOptArgs f args expr = - args - |> List.ofSeq - |> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replaceFunc f expr) +let replaceFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr = + replaceFunc f expr |> optArgsReplace args /// Replace documents using JavaScript -let replaceJS jsString (expr : ReqlExpr) = - expr.Replace (Ast.Javascript (jsString :> obj)) +let replaceJS js (expr : ReqlExpr) = + expr.Replace (toJS js) /// Replace documents using JavaScript, providing optional arguments -let replaceJSWithOptArgs jsString args expr = - args - |> List.ofSeq - |> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replaceJS jsString expr) +let replaceJSWithOptArgs js args expr = + replaceJS js expr |> optArgsReplace args /// Skip a number of elements from the head of a sequence let skip n (expr : ReqlExpr) = - expr.Skip n + expr.Skip n /// Ensure changes to a table are written to permanent storage let sync (table : Table) = - table.Sync () + table.Sync () /// Return all documents in a table (may be further refined) let table tableName (db : Db) = - db.Table tableName + db.Table tableName /// Return all documents in a table from the default database (may be further refined) let fromTable tableName = - r.Table tableName + r.Table tableName /// Create a table in the given database -let tableCreate tableName conn (db : Db) = - db.TableCreate tableName - |> asyncReqlResult conn +let tableCreate tableName (db : Db) = + db.TableCreate tableName /// Create a table in the connection-default database -let tableCreateInDefault tableName conn = - r.TableCreate tableName - |> asyncReqlResult conn +let tableCreateInDefault tableName = + r.TableCreate tableName /// Drop a table in the given database -let tableDrop tableName conn (db : Db) = - db.TableDrop tableName - |> asyncReqlResult conn +let tableDrop tableName (db : Db) = + db.TableDrop tableName /// Drop a table from the connection-default database -let tableDropFromDefault tableName conn = - r.TableDrop tableName - |> asyncReqlResult conn +let tableDropFromDefault tableName = + r.TableDrop tableName /// Get a list of tables for the given database -let tableList conn (db : Db) = - db.TableList () - |> asyncResult conn +let tableList (db : Db) = + db.TableList () /// Get a list of tables from the connection-default database -let tableListFromDefault conn = - r.TableList () - |> asyncResult conn +let tableListFromDefault () = + r.TableList () + +/// Apply optional arguments to an update +let private optArgsUpdate (args : (string * obj) seq) upd = + args |> Seq.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) upd /// Update documents -let update updateSpec (expr : ReqlExpr) = - expr.Update (updateSpec :> obj) +let update<'T> (updateSpec : 'T) (expr : ReqlExpr) = + expr.Update updateSpec /// Update documents, providing optional arguments -let updateWithOptArgs updateSpec args expr = - args - |> List.ofSeq - |> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (update updateSpec expr) +let updateWithOptArgs<'T> (updateSpec : 'T) args expr = + update updateSpec expr |> optArgsUpdate args /// Update documents using a function -let updateFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = - expr.Update (ReqlFunction1 (fun row -> upcast f row)) +let updateFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) = + expr.Update (ReqlFunction1 (fun row -> f row :> obj)) /// Update documents using a function, providing optional arguments -let updateFuncWithOptArgs f args expr = - args - |> List.ofSeq - |> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (updateFunc f expr) +let updateFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr = + updateFunc f expr |> optArgsUpdate args /// Update documents using JavaScript -let updateJS jsString (expr : ReqlExpr) = - expr.Update (Ast.Javascript (jsString :> obj)) +let updateJS js (expr : ReqlExpr) = + expr.Update (toJS js) /// Update documents using JavaScript, providing optional arguments -let updateJSWithOptArgs jsString args expr = - args - |> List.ofSeq - |> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (updateJS jsString expr) +let updateJSWithOptArgs js args expr = + updateJS js expr |> optArgsUpdate args /// Exclude fields from the result let without (columns : string seq) (expr : ReqlExpr) = - expr.Without (Array.ofSeq columns) + expr.Without (Array.ofSeq columns) /// Merge the right-hand fields into the left-hand document of a sequence let zip (expr : ReqlExpr) = - expr.Zip () + expr.Zip () + + +// ~~ RETRY ~~ + +open RethinkDb.Driver.Net +open System.Threading.Tasks + +/// Retry, delaying for each the seconds provided (if required) +let withRetry intervals (f : IConnection -> Task<'T>) = + Retry.withRetry f intervals + +/// Retry failed commands with 200ms, 500ms, and 1 second delays +let withRetryDefault (f : IConnection -> Task<'T>) = + Retry.withRetryDefault f + +/// Retry failed commands one time with no delay +let withRetryOnce (f : IConnection -> Task<'T>) = + Retry.withRetryOnce f diff --git a/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj b/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj index 8c1466d..18962e0 100644 --- a/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj +++ b/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj @@ -1,21 +1,22 @@  - net6.0 - Idiomatic F# extentions to the official RethinkDB C# driver + net6.0;netstandard2.0 + Idiomatic F# extensions on the official RethinkDB C# driver Daniel J. Summers - https://github.com/danieljsummers/RethinkDb.Driver.FSharp/blob/master/LICENSE + Apache-2.0 https://github.com/danieljsummers/RethinkDb.Driver.FSharp false Alpha; use at your own risk See LICENSE RethinkDB document F# - 0.7.1 + 0.8.0 alpha-0001 + diff --git a/src/RethinkDb.Driver.FSharp/Retry.fs b/src/RethinkDb.Driver.FSharp/Retry.fs new file mode 100644 index 0000000..3495d3f --- /dev/null +++ b/src/RethinkDb.Driver.FSharp/Retry.fs @@ -0,0 +1,35 @@ +module RethinkDb.Driver.FSharp.Retry + +open System +open System.Threading.Tasks +open Polly +open RethinkDb.Driver +open RethinkDb.Driver.Net + +/// Create a retry policy that attempts to reconnect to RethinkDB on each retry +let retryPolicy (intervals : float seq) (conn : IConnection) = + Policy + .Handle() + .WaitAndRetryAsync( + intervals |> Seq.map TimeSpan.FromSeconds, + System.Action (fun ex _ _ _ -> + printf $"Encountered RethinkDB exception: {ex.Message}" + match ex.Message.Contains "socket" with + | true -> + printf "Reconnecting to RethinkDB" + (conn :?> Connection).Reconnect false + | false -> ())) + +/// Perform a query, retrying after each delay specified +let withRetry (f : IConnection -> Task<'T>) retries = + fun conn -> backgroundTask { + return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn) + } + +/// Retry three times, after 200ms, 500ms, and 1 second +let withRetryDefault f = + withRetry f [ 0.2; 0.5; 1.0 ] + +/// Retry one time immediately +let withRetryOnce f = + withRetry f [ 0.0 ]