Prep for 0.8.0

- Functions now prefer tasks
- Put retry logic in its own module; both CE and functions use it
- Annotated types where necessary for smooth F# 6 implicit casting
This commit is contained in:
Daniel J. Summers 2022-04-18 22:31:48 -04:00
parent 20a9ea461c
commit 2e749e1a27
5 changed files with 267 additions and 224 deletions

View File

@ -17,24 +17,25 @@ The goal is to provide:
- A composable pipeline for creating ReQL statements: - A composable pipeline for creating ReQL statements:
```fsharp ```fsharp
/// string -> (IConnection -> Async<Post>) /// string -> (IConnection -> Task<Post>)
let fetchPost (postId : string) = let fetchPost (postId : string) =
fromDb "Blog" fromDb "Blog"
|> table "Post" |> table "Post"
|> get postId |> get postId
|> asyncResult<Post> |> runResult<Post>
|> withRetryDefault
``` ```
- An F# domain-specific language (DSL) using a `rethink` computation expression: - An F# domain-specific language (DSL) using a `rethink` computation expression:
```fsharp ```fsharp
/// string -> (IConnection -> Async<Post>) /// string -> (IConnection -> Task<Post>)
let fetchPost (postId : string) = let fetchPost (postId : string) =
rethink { rethink<Post> {
fromDb "Blog" withTableInDb "Post" "Blog"
table "Post"
get postId get postId
asyncResult<Post> result
withRetryDefault
} }
``` ```

View File

@ -1,13 +1,19 @@
[<AutoOpen>] [<AutoOpen>]
module RethinkDb.Driver.FSharp.RethinkBuilder module RethinkDb.Driver.FSharp.RethinkBuilder
open Polly
open RethinkDb.Driver open RethinkDb.Driver
open RethinkDb.Driver.Ast open RethinkDb.Driver.Ast
open RethinkDb.Driver.Net open RethinkDb.Driver.Net
open System
open System.Threading.Tasks open System.Threading.Tasks
/// Options for RethinkDB indexes
type IndexOption =
/// Index multiple values in the given field
| Multi
/// Create a geospatial index
| Geospatial
/// Computation Expression builder for RethinkDB queries /// Computation Expression builder for RethinkDB queries
type RethinkBuilder<'T> () = type RethinkBuilder<'T> () =
@ -19,20 +25,6 @@ type RethinkBuilder<'T> () =
fields fields
|> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ()) |> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ())
/// Create a retry policy that attempts to reconnect to RethinkDB on each retry
let retryPolicy (intervals : float seq) (conn : IConnection) =
Policy
.Handle<ReqlDriverError>()
.WaitAndRetryAsync(
intervals |> Seq.map TimeSpan.FromSeconds,
System.Action<exn, TimeSpan, int, Context> (fun ex _ _ _ ->
printf $"Encountered RethinkDB exception: {ex.Message}"
match ex.Message.Contains "socket" with
| true ->
printf "Reconnecting to RethinkDB"
(conn :?> Connection).Reconnect false
| false -> ()))
member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr
member this.For (expr, f) = this.Bind (expr, f) member this.For (expr, f) = this.Bind (expr, f)
@ -77,6 +69,11 @@ type RethinkBuilder<'T> () =
[<CustomOperation "indexCreate">] [<CustomOperation "indexCreate">]
member _.IndexCreate (tbl : Table, index : string, f : ReqlExpr -> obj) = tbl.IndexCreate (index, ReqlFunction1 f) member _.IndexCreate (tbl : Table, index : string, f : ReqlExpr -> obj) = tbl.IndexCreate (index, ReqlFunction1 f)
/// Specify options for certain types of indexes
[<CustomOperation "indexOption">]
member _.IndexOption (idx : IndexCreate, opt : IndexOption) =
idx.OptArg ((match opt with Multi -> "multi" | Geospatial -> "geo"), true)
// database/table identification // database/table identification
/// Specify a database for further commands /// Specify a database for further commands
@ -189,7 +186,7 @@ type RethinkBuilder<'T> () =
/// Execute the query, returning the result of the type specified /// Execute the query, returning the result of the type specified
[<CustomOperation "result">] [<CustomOperation "result">]
member _.Result (expr : ReqlExpr) : IConnection -> Task<'T> = member _.Result (expr : ReqlExpr) : IConnection -> Task<'T> =
fun conn -> task { fun conn -> backgroundTask {
return! expr.RunResultAsync<'T> conn return! expr.RunResultAsync<'T> conn
} }
@ -201,7 +198,7 @@ type RethinkBuilder<'T> () =
/// Execute the query, returning the result of the type specified, or None if no result is found /// Execute the query, returning the result of the type specified, or None if no result is found
[<CustomOperation "resultOption">] [<CustomOperation "resultOption">]
member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> = member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> =
fun conn -> task { fun conn -> backgroundTask {
let! result = expr.RunResultAsync<'T> conn let! result = expr.RunResultAsync<'T> conn
return match (box >> isNull) result with true -> None | false -> Some result return match (box >> isNull) result with true -> None | false -> Some result
} }
@ -227,9 +224,9 @@ type RethinkBuilder<'T> () =
/// Ignore the result of an operation /// Ignore the result of an operation
[<CustomOperation "ignoreResult">] [<CustomOperation "ignoreResult">]
member _.IgnoreResult (f : IConnection -> Task<'T>) = member _.IgnoreResult<'T> (f : IConnection -> Task<'T>) =
fun conn -> task { fun conn -> task {
let! _ = f conn let! _ = (f conn).ConfigureAwait false
() ()
} }
@ -237,7 +234,7 @@ type RethinkBuilder<'T> () =
[<CustomOperation "ignoreResult">] [<CustomOperation "ignoreResult">]
member _.IgnoreResult (f : IConnection -> Task<'T option>) = member _.IgnoreResult (f : IConnection -> Task<'T option>) =
fun conn -> task { fun conn -> task {
let! _ = f conn let! _ = (f conn).ConfigureAwait false
() ()
} }
@ -256,16 +253,12 @@ type RethinkBuilder<'T> () =
/// Retries a variable number of times, waiting each time for the seconds specified /// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">] [<CustomOperation "withRetry">]
member _.WithRetry (f : IConnection -> Task<'T>, retries) = member _.WithRetry (f : IConnection -> Task<'T>, retries) =
fun conn -> task { Retry.withRetry f retries
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Retries a variable number of times, waiting each time for the seconds specified /// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">] [<CustomOperation "withRetry">]
member _.WithRetry (f : IConnection -> Task<'T option>, retries) = member _.WithRetry (f : IConnection -> Task<'T option>, retries) =
fun conn -> task { Retry.withRetry f retries
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Retries a variable number of times, waiting each time for the seconds specified /// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">] [<CustomOperation "withRetry">]
@ -279,43 +272,43 @@ type RethinkBuilder<'T> () =
/// Retries at 200ms, 500ms, and 1s /// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">] [<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T>) = member _.WithRetryDefault (f : IConnection -> Task<'T>) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) Retry.withRetryDefault f
/// Retries at 200ms, 500ms, and 1s /// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">] [<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T option>) = member _.WithRetryDefault (f : IConnection -> Task<'T option>) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) Retry.withRetryDefault f
/// Retries at 200ms, 500ms, and 1s /// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">] [<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T>, conn) = member this.WithRetryDefault (f : IConnection -> Task<'T>, conn) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn this.WithRetryDefault f conn
/// Retries at 200ms, 500ms, and 1s /// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">] [<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T option>, conn) = member this.WithRetryDefault (f : IConnection -> Task<'T option>, conn) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn this.WithRetryDefault f conn
/// Retries once immediately /// Retries once immediately
[<CustomOperation "withRetryOnce">] [<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T>) = member _.WithRetryOnce (f : IConnection -> Task<'T>) =
this.WithRetry (f, [ 0.0 ]) Retry.withRetryOnce f
/// Retries once immediately /// Retries once immediately
[<CustomOperation "withRetryOnce">] [<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T option>) = member _.WithRetryOnce (f : IConnection -> Task<'T option>) =
this.WithRetry (f, [ 0.0 ]) Retry.withRetryOnce f
/// Retries once immediately /// Retries once immediately
[<CustomOperation "withRetryOnce">] [<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T>, conn) = member this.WithRetryOnce (f : IConnection -> Task<'T>, conn) =
this.WithRetry (f, [ 0.0 ]) conn this.WithRetryOnce f conn
/// Retries once immediately /// Retries once immediately
[<CustomOperation "withRetryOnce">] [<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T option>, conn) = member this.WithRetryOnce (f : IConnection -> Task<'T option>, conn) =
this.WithRetry (f, [ 0.0 ]) conn this.WithRetryOnce f conn
/// RethinkDB computation expression /// RethinkDB computation expression

View File

@ -4,7 +4,13 @@ module RethinkDb.Driver.FSharp.Functions
open RethinkDb.Driver open RethinkDb.Driver
open RethinkDb.Driver.Ast open RethinkDb.Driver.Ast
let private r = RethinkDB.R [<AutoOpen>]
module private Helpers =
/// Shorthand for the starting point for ReQL commands
let r = RethinkDB.R
/// Create a Javascript object from a string (used mostly for type inference)
let toJS (js : string) = Javascript js
/// Get a cursor with the results of an expression /// Get a cursor with the results of an expression
@ -17,23 +23,36 @@ let asyncReqlResult conn (expr : ReqlExpr) =
expr.RunWriteAsync conn expr.RunWriteAsync conn
|> Async.AwaitTask |> Async.AwaitTask
/// Write a ReQL command, always returning a result
let runWriteResult (expr : ReqlExpr) =
expr.RunWriteAsync
/// Write a ReQL command, raising an exception if an error occurs
let runWrite (expr : ReqlExpr) = fun conn -> backgroundTask {
let! result = expr.RunWriteAsync conn
if result.Errors > 0UL then raise <| ReqlRuntimeError result.FirstError
return result
}
/// Get the results of an expression /// Get the results of an expression
let asyncResult<'T> conn (expr : ReqlExpr) = let asyncResult<'T> conn (expr : ReqlExpr) =
expr.RunResultAsync<'T> conn expr.RunResultAsync<'T> conn
|> Async.AwaitTask |> Async.AwaitTask
/// Run the ReQL command, returning the result as the type specified
let runResult<'T> (expr : ReqlExpr) = expr.RunResultAsync<'T>
/// Get documents between a lower bound and an upper bound based on a primary key /// Get documents between a lower bound and an upper bound based on a primary key
let between lowerKey upperKey (expr : ReqlExpr) = let between (lowerKey : obj) (upperKey : obj) (expr : ReqlExpr) =
expr.Between (lowerKey, upperKey) expr.Between (lowerKey, upperKey)
/// Get document between a lower bound and an upper bound, specifying one or more optional arguments /// Get document between a lower bound and an upper bound, specifying one or more optional arguments
let betweenWithOptArgs lowerKey upperKey (args : (string * _) seq) (expr : ReqlExpr) = let betweenWithOptArgs (lowerKey : obj) (upperKey : obj) (args : (string * obj) seq) (expr : ReqlExpr) =
args args
|> List.ofSeq |> Seq.fold (fun (btw : Between) arg -> btw.OptArg (fst arg, snd arg)) (between lowerKey upperKey expr)
|> List.fold (fun (btw : Between) arg -> btw.OptArg (fst arg, snd arg)) (between lowerKey upperKey expr)
/// Get documents between a lower bound and an upper bound based on an index /// Get documents between a lower bound and an upper bound based on an index
let betweenIndex lowerKey upperKey (index : string) (expr : ReqlExpr) = let betweenIndex (lowerKey : obj) (upperKey : obj) (index : string) (expr : ReqlExpr) =
betweenWithOptArgs lowerKey upperKey [ "index", index ] expr betweenWithOptArgs lowerKey upperKey [ "index", index ] expr
/// Get a connection builder that can be used to create one RethinkDB connection /// Get a connection builder that can be used to create one RethinkDB connection
@ -45,40 +64,35 @@ let db dbName =
r.Db dbName r.Db dbName
/// Create a database /// Create a database
let dbCreate dbName conn = let dbCreate (dbName : string) =
r.DbCreate dbName r.DbCreate dbName
|> asyncReqlResult conn
/// Drop a database /// Drop a database
let dbDrop dbName conn = let dbDrop (dbName : string) =
r.DbDrop dbName r.DbDrop dbName
|> asyncReqlResult conn
/// Get a list of databases /// Get a list of databases
let dbList conn = let dbList () =
r.DbList () r.DbList ()
|> asyncResult<string list> conn
/// Delete documents /// Delete documents
let delete (expr : ReqlExpr) = let delete (expr : ReqlExpr) =
expr.Delete () expr.Delete ()
/// Delete documents, providing optional arguments /// Delete documents, providing optional arguments
let deleteWithOptArgs args expr = let deleteWithOptArgs (args : (string * obj) seq) expr =
args args |> Seq.fold (fun (del : Delete) arg -> del.OptArg (fst arg, snd arg)) (delete expr)
|> List.ofSeq
|> List.fold (fun (del : Delete) arg -> del.OptArg (fst arg, snd arg)) (delete expr)
/// EqJoin the left field on the right-hand table using its primary key /// EqJoin the left field on the right-hand table using its primary key
let eqJoin field (table : Table) (expr : ReqlExpr) = let eqJoin (field : string) (table : Table) (expr : ReqlExpr) =
expr.EqJoin (field :> obj, table) expr.EqJoin (field, table)
/// EqJoin the left function on the right-hand table using its primary key /// EqJoin the left function on the right-hand table using its primary key
let eqJoinFunc (f : ReqlExpr -> 'T) (table : Table) (expr : ReqlExpr) = let eqJoinFunc<'T> (f : ReqlExpr -> 'T) (table : Table) (expr : ReqlExpr) =
expr.EqJoin (ReqlFunction1 (fun row -> upcast f row), table) expr.EqJoin (ReqlFunction1 (fun row -> f row :> obj), table)
/// EqJoin the left function on the right-hand table using the specified index /// EqJoin the left function on the right-hand table using the specified index
let eqJoinFuncIndex f table (indexName : string) expr = let eqJoinFuncIndex<'T> (f : ReqlExpr -> 'T) table (indexName : string) expr =
(eqJoinFunc f table expr).OptArg ("index", indexName) (eqJoinFunc f table expr).OptArg ("index", indexName)
/// EqJoin the left field on the right-hand table using the specified index /// EqJoin the left field on the right-hand table using the specified index
@ -86,113 +100,104 @@ let eqJoinIndex field table (indexName : string) expr =
(eqJoin field table expr).OptArg ("index", indexName) (eqJoin field table expr).OptArg ("index", indexName)
/// EqJoin the left JavaScript on the right-hand table using its primary key /// EqJoin the left JavaScript on the right-hand table using its primary key
let eqJoinJS (jsString : string) (table : Table) (expr : ReqlExpr) = let eqJoinJS js (table : Table) (expr : ReqlExpr) =
expr.EqJoin (Javascript (jsString :> obj), table) expr.EqJoin (toJS js, table)
/// EqJoin the left JavaScript on the right-hand table using the specified index /// EqJoin the left JavaScript on the right-hand table using the specified index
let eqJoinJSIndex jsString table (indexName : string) expr = let eqJoinJSIndex js table (indexName : string) expr =
(eqJoinJS jsString table expr).OptArg ("index", indexName) (eqJoinJS js table expr).OptArg ("index", indexName)
/// Filter documents /// Filter documents
let filter filterSpec (expr : ReqlExpr) = let filter (filterSpec : obj) (expr : ReqlExpr) =
expr.Filter (filterSpec :> obj) expr.Filter filterSpec
/// Apply optional arguments to a filter
let private optArgsFilter (args : (string * obj) seq) filter =
args |> Seq.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) filter
/// Filter documents, providing optional arguments /// Filter documents, providing optional arguments
let filterWithOptArgs filterSpec args expr = let filterWithOptArgs (filterSpec : obj) args expr =
args filter filterSpec expr |> optArgsFilter args
|> List.ofSeq
|> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filter filterSpec expr)
/// Filter documents using a function /// Filter documents using a function
let filterFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = let filterFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.Filter (ReqlFunction1 (fun row -> upcast f row)) expr.Filter (ReqlFunction1 (fun row -> f row :> obj))
/// Filter documents using a function, providing optional arguments /// Filter documents using a function, providing optional arguments
let filterFuncWithOptArgs f args expr = let filterFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr =
args filterFunc f expr |> optArgsFilter args
|> List.ofSeq
|> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filterFunc f expr)
/// Filter documents using JavaScript /// Filter documents using JavaScript
let filterJS jsString (expr : ReqlExpr) = let filterJS js (expr : ReqlExpr) =
expr.Filter (Javascript (jsString :> obj)) expr.Filter (toJS js)
/// Filter documents using JavaScript, providing optional arguments /// Filter documents using JavaScript, providing optional arguments
let filterJSWithOptArgs jsString args expr = let filterJSWithOptArgs js args expr =
args filterJS js expr |> optArgsFilter args
|> List.ofSeq
|> List.fold (fun (fil : Filter) arg -> fil.OptArg (fst arg, snd arg)) (filterJS jsString expr)
/// Get a document by its primary key /// Get a document by its primary key
let get documentId (table : Table) = let get (documentId : obj) (table : Table) =
table.Get documentId table.Get documentId
/// Get all documents matching keys in the given index /// Get all documents matching keys in the given index
let getAll (ids : 'T seq) indexName (table : Table) = let getAll (ids : obj seq) (indexName : string) (table : Table) =
table.GetAll(Array.ofSeq ids).OptArg ("index", indexName) table.GetAll(Array.ofSeq ids).OptArg ("index", indexName)
/// Create an index on the given table /// Create an index on the given table
let indexCreate indexName conn (table : Table) = let indexCreate (indexName : string) (table : Table) =
table.IndexCreate indexName table.IndexCreate indexName
|> asyncReqlResult conn
/// Create an index on the given table using a function /// Create an index on the given table using a function
let indexCreateFunc indexName (f : ReqlExpr -> 'T) conn (table : Table) = let indexCreateFunc<'T> (indexName : string) (f : ReqlExpr -> 'T) (table : Table) =
table.IndexCreate (indexName, ReqlFunction1 (fun row -> upcast f row)) table.IndexCreate (indexName, ReqlFunction1 (fun row -> f row :> obj))
|> asyncReqlResult conn
/// Create an index on the given table using JavaScript /// Create an index on the given table using JavaScript
let indexCreateJS indexName jsString conn (table : Table) = let indexCreateJS (indexName : string) js (table : Table) =
table.IndexCreate (indexName, Javascript (jsString :> obj)) table.IndexCreate (indexName, toJS js)
|> asyncReqlResult conn
/// Drop an index /// Drop an index
let indexDrop indexName conn (table : Table) = let indexDrop (indexName : string) (table : Table) =
table.IndexDrop indexName table.IndexDrop indexName
|> asyncReqlResult conn
/// Get a list of indexes for the given table /// Get a list of indexes for the given table
let indexList conn (table : Table) = let indexList (table : Table) =
table.IndexList () table.IndexList ()
|> asyncResult<string list> conn
/// Rename an index (overwrite will fail) /// Rename an index (overwrite will fail)
let indexRename oldName newName conn (table : Table) = let indexRename (oldName : string) (newName : string) (table : Table) =
table.IndexRename (oldName, newName) table.IndexRename (oldName, newName)
|> asyncReqlResult conn
/// Rename an index (overwrite will succeed) /// Rename an index (overwrite will succeed)
let indexRenameWithOverwrite oldName newName conn (table : Table) = let indexRenameWithOverwrite (oldName : string) (newName : string) (table : Table) =
table.IndexRename(oldName, newName).OptArg ("overwrite", true) table.IndexRename(oldName, newName).OptArg ("overwrite", true)
|> asyncReqlResult conn
/// Create an inner join between two sequences, specifying the join condition with a function /// Create an inner join between two sequences, specifying the join condition with a function
let innerJoinFunc otherSeq (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = let innerJoinFunc<'T> (otherSeq : obj) (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.InnerJoin (otherSeq, ReqlFunction2 (fun leftRow rightRow -> upcast f leftRow rightRow)) expr.InnerJoin (otherSeq, ReqlFunction2 (fun f1 f2 -> f f1 f2 :> obj))
/// Create an inner join between two sequences, specifying the join condition with JavaScript /// Create an inner join between two sequences, specifying the join condition with JavaScript
let innerJoinJS otherSeq jsString (expr : ReqlExpr) = let innerJoinJS (otherSeq : obj) js (expr : ReqlExpr) =
expr.InnerJoin (otherSeq, Javascript (jsString :> obj)) expr.InnerJoin (otherSeq, toJS js)
/// Apply optional arguments to an insert
let private optArgsInsert (args : (string * obj) seq) ins =
args |> Seq.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) ins
/// Insert a single document (use insertMany for multiple) /// Insert a single document (use insertMany for multiple)
let insert doc (table : Table) = let insert<'T> (doc : 'T) (table : Table) =
table.Insert doc table.Insert doc
/// Insert multiple documents /// Insert multiple documents
let insertMany docs (table : Table) = let insertMany<'T> (docs : 'T seq) (table : Table) =
table.Insert <| Array.ofSeq docs table.Insert (Array.ofSeq docs)
/// Insert a single document, providing optional arguments (use insertManyWithOptArgs for multiple) /// Insert a single document, providing optional arguments (use insertManyWithOptArgs for multiple)
let insertWithOptArgs doc (args : (string * _) seq) (table : Table) = let insertWithOptArgs<'T> (doc : 'T) args table =
args insert doc table |> optArgsInsert args
|> List.ofSeq
|> List.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) (insert doc table)
/// Insert multiple documents, providing optional arguments /// Insert multiple documents, providing optional arguments
let insertManyWithOptArgs docs (args : (string * _) seq) (table : Table) = let insertManyWithOptArgs<'T> (docs : 'T seq) args table =
args insertMany docs table |> optArgsInsert args
|> List.ofSeq
|> List.fold (fun (ins : Insert) arg -> ins.OptArg (fst arg, snd arg)) (insertMany docs table)
/// Test whether a sequence is empty /// Test whether a sequence is empty
let isEmpty (expr : ReqlExpr) = let isEmpty (expr : ReqlExpr) =
@ -207,46 +212,44 @@ let nth n (expr : ReqlExpr) =
expr.Nth n expr.Nth n
/// Create an outer join between two sequences, specifying the join condition with a function /// Create an outer join between two sequences, specifying the join condition with a function
let outerJoinFunc otherSeq (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) = let outerJoinFunc<'T> (otherSeq : obj) (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.OuterJoin (otherSeq, ReqlFunction2 (fun leftRow rightRow -> upcast f leftRow rightRow)) expr.OuterJoin (otherSeq, ReqlFunction2 (fun f1 f2 -> f f1 f2 :> obj))
/// Create an outer join between two sequences, specifying the join condition with JavaScript /// Create an outer join between two sequences, specifying the join condition with JavaScript
let outerJoinJS otherSeq jsString (expr : ReqlExpr) = let outerJoinJS (otherSeq : obj) js (expr : ReqlExpr) =
expr.OuterJoin (otherSeq, Javascript (jsString :> obj)) expr.OuterJoin (otherSeq, toJS js)
/// Select one or more attributes from an object or sequence /// Select one or more attributes from an object or sequence
let pluck (fields : string seq) (expr : ReqlExpr) = let pluck (fields : string seq) (expr : ReqlExpr) =
expr.Pluck (Array.ofSeq fields) expr.Pluck (Array.ofSeq fields)
/// Apply optional arguments to a replace
let private optArgsReplace (args : (string * obj) seq) repl =
args |> Seq.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) repl
/// Replace documents /// Replace documents
let replace replaceSpec (expr : ReqlExpr) = let replace<'T> (replaceSpec : 'T) (expr : ReqlExpr) =
expr.Replace (replaceSpec :> obj) expr.Replace replaceSpec
/// Replace documents, providing optional arguments /// Replace documents, providing optional arguments
let replaceWithOptArgs replaceSpec args expr = let replaceWithOptArgs<'T> (replaceSpec : 'T) args expr =
args replace replaceSpec expr |> optArgsReplace args
|> List.ofSeq
|> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replace replaceSpec expr)
/// Replace documents using a function /// Replace documents using a function
let replaceFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = let replaceFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.Replace (ReqlFunction1 (fun row -> upcast f row)) expr.Replace (ReqlFunction1 (fun row -> f row :> obj))
/// Replace documents using a function, providing optional arguments /// Replace documents using a function, providing optional arguments
let replaceFuncWithOptArgs f args expr = let replaceFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr =
args replaceFunc f expr |> optArgsReplace args
|> List.ofSeq
|> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replaceFunc f expr)
/// Replace documents using JavaScript /// Replace documents using JavaScript
let replaceJS jsString (expr : ReqlExpr) = let replaceJS js (expr : ReqlExpr) =
expr.Replace (Ast.Javascript (jsString :> obj)) expr.Replace (toJS js)
/// Replace documents using JavaScript, providing optional arguments /// Replace documents using JavaScript, providing optional arguments
let replaceJSWithOptArgs jsString args expr = let replaceJSWithOptArgs js args expr =
args replaceJS js expr |> optArgsReplace args
|> List.ofSeq
|> List.fold (fun (rep : Replace) arg -> rep.OptArg (fst arg, snd arg)) (replaceJS jsString expr)
/// Skip a number of elements from the head of a sequence /// Skip a number of elements from the head of a sequence
let skip n (expr : ReqlExpr) = let skip n (expr : ReqlExpr) =
@ -265,64 +268,56 @@ let fromTable tableName =
r.Table tableName r.Table tableName
/// Create a table in the given database /// Create a table in the given database
let tableCreate tableName conn (db : Db) = let tableCreate tableName (db : Db) =
db.TableCreate tableName db.TableCreate tableName
|> asyncReqlResult conn
/// Create a table in the connection-default database /// Create a table in the connection-default database
let tableCreateInDefault tableName conn = let tableCreateInDefault tableName =
r.TableCreate tableName r.TableCreate tableName
|> asyncReqlResult conn
/// Drop a table in the given database /// Drop a table in the given database
let tableDrop tableName conn (db : Db) = let tableDrop tableName (db : Db) =
db.TableDrop tableName db.TableDrop tableName
|> asyncReqlResult conn
/// Drop a table from the connection-default database /// Drop a table from the connection-default database
let tableDropFromDefault tableName conn = let tableDropFromDefault tableName =
r.TableDrop tableName r.TableDrop tableName
|> asyncReqlResult conn
/// Get a list of tables for the given database /// Get a list of tables for the given database
let tableList conn (db : Db) = let tableList (db : Db) =
db.TableList () db.TableList ()
|> asyncResult<string list> conn
/// Get a list of tables from the connection-default database /// Get a list of tables from the connection-default database
let tableListFromDefault conn = let tableListFromDefault () =
r.TableList () r.TableList ()
|> asyncResult<string list> conn
/// Apply optional arguments to an update
let private optArgsUpdate (args : (string * obj) seq) upd =
args |> Seq.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) upd
/// Update documents /// Update documents
let update updateSpec (expr : ReqlExpr) = let update<'T> (updateSpec : 'T) (expr : ReqlExpr) =
expr.Update (updateSpec :> obj) expr.Update updateSpec
/// Update documents, providing optional arguments /// Update documents, providing optional arguments
let updateWithOptArgs updateSpec args expr = let updateWithOptArgs<'T> (updateSpec : 'T) args expr =
args update updateSpec expr |> optArgsUpdate args
|> List.ofSeq
|> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (update updateSpec expr)
/// Update documents using a function /// Update documents using a function
let updateFunc (f : ReqlExpr -> 'T) (expr : ReqlExpr) = let updateFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.Update (ReqlFunction1 (fun row -> upcast f row)) expr.Update (ReqlFunction1 (fun row -> f row :> obj))
/// Update documents using a function, providing optional arguments /// Update documents using a function, providing optional arguments
let updateFuncWithOptArgs f args expr = let updateFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) args expr =
args updateFunc f expr |> optArgsUpdate args
|> List.ofSeq
|> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (updateFunc f expr)
/// Update documents using JavaScript /// Update documents using JavaScript
let updateJS jsString (expr : ReqlExpr) = let updateJS js (expr : ReqlExpr) =
expr.Update (Ast.Javascript (jsString :> obj)) expr.Update (toJS js)
/// Update documents using JavaScript, providing optional arguments /// Update documents using JavaScript, providing optional arguments
let updateJSWithOptArgs jsString args expr = let updateJSWithOptArgs js args expr =
args updateJS js expr |> optArgsUpdate args
|> List.ofSeq
|> List.fold (fun (upd : Update) arg -> upd.OptArg (fst arg, snd arg)) (updateJS jsString expr)
/// Exclude fields from the result /// Exclude fields from the result
let without (columns : string seq) (expr : ReqlExpr) = let without (columns : string seq) (expr : ReqlExpr) =
@ -331,3 +326,21 @@ let without (columns : string seq) (expr : ReqlExpr) =
/// Merge the right-hand fields into the left-hand document of a sequence /// Merge the right-hand fields into the left-hand document of a sequence
let zip (expr : ReqlExpr) = let zip (expr : ReqlExpr) =
expr.Zip () expr.Zip ()
// ~~ RETRY ~~
open RethinkDb.Driver.Net
open System.Threading.Tasks
/// Retry, delaying for each the seconds provided (if required)
let withRetry intervals (f : IConnection -> Task<'T>) =
Retry.withRetry f intervals
/// Retry failed commands with 200ms, 500ms, and 1 second delays
let withRetryDefault (f : IConnection -> Task<'T>) =
Retry.withRetryDefault f
/// Retry failed commands one time with no delay
let withRetryOnce (f : IConnection -> Task<'T>) =
Retry.withRetryOnce f

View File

@ -1,21 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<TargetFrameworks>net6.0</TargetFrameworks> <TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<Description>Idiomatic F# extentions to the official RethinkDB C# driver</Description> <Description>Idiomatic F# extensions on the official RethinkDB C# driver</Description>
<Authors>Daniel J. Summers</Authors> <Authors>Daniel J. Summers</Authors>
<PackageLicenseUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp/blob/master/LICENSE</PackageLicenseUrl> <PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp</PackageProjectUrl> <PackageProjectUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp</PackageProjectUrl>
<!-- PackageIconUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp/raw/master/icon/icon.png</PackageIconUrl --> <!-- PackageIconUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp/raw/master/icon/icon.png</PackageIconUrl -->
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance> <PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageReleaseNotes>Alpha; use at your own risk</PackageReleaseNotes> <PackageReleaseNotes>Alpha; use at your own risk</PackageReleaseNotes>
<Copyright>See LICENSE</Copyright> <Copyright>See LICENSE</Copyright>
<PackageTags>RethinkDB document F#</PackageTags> <PackageTags>RethinkDB document F#</PackageTags>
<VersionPrefix>0.7.1</VersionPrefix> <VersionPrefix>0.8.0</VersionPrefix>
<VersionSuffix>alpha-0001</VersionSuffix> <VersionSuffix>alpha-0001</VersionSuffix>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<Compile Include="Retry.fs" />
<Compile Include="Builder.fs" /> <Compile Include="Builder.fs" />
<Compile Include="Functions.fs" /> <Compile Include="Functions.fs" />
<Compile Include="Config.fs" /> <Compile Include="Config.fs" />

View File

@ -0,0 +1,35 @@
module RethinkDb.Driver.FSharp.Retry
open System
open System.Threading.Tasks
open Polly
open RethinkDb.Driver
open RethinkDb.Driver.Net
/// Create a retry policy that attempts to reconnect to RethinkDB on each retry
let retryPolicy (intervals : float seq) (conn : IConnection) =
Policy
.Handle<ReqlDriverError>()
.WaitAndRetryAsync(
intervals |> Seq.map TimeSpan.FromSeconds,
System.Action<exn, TimeSpan, int, Context> (fun ex _ _ _ ->
printf $"Encountered RethinkDB exception: {ex.Message}"
match ex.Message.Contains "socket" with
| true ->
printf "Reconnecting to RethinkDB"
(conn :?> Connection).Reconnect false
| false -> ()))
/// Perform a query, retrying after each delay specified
let withRetry (f : IConnection -> Task<'T>) retries =
fun conn -> backgroundTask {
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Retry three times, after 200ms, 500ms, and 1 second
let withRetryDefault f =
withRetry f [ 0.2; 0.5; 1.0 ]
/// Retry one time immediately
let withRetryOnce f =
withRetry f [ 0.0 ]