WIP toward completeness

- Add opt args for the run command
- Support Task/Async/sync for all run/write/result cmds
- Define builder using funcs from Functions module
- Functions module no longer auto-opened
- Update README with some of the above
This commit is contained in:
Daniel J. Summers 2022-04-21 20:36:30 -04:00
parent 78e5bbf77d
commit f23f7b90e9
6 changed files with 1004 additions and 227 deletions

View File

@ -13,6 +13,8 @@ The goal is to provide:
- A composable pipeline for creating ReQL statements:
```fsharp
open RethinkDb.Driver.FSharp.Functions
/// string -> (IConnection -> Task<Post>)
let fetchPost (postId : string) =
fromDb "Blog"
@ -25,6 +27,8 @@ let fetchPost (postId : string) =
- An F# domain-specific language (DSL) using a `rethink` computation expression (CE):
```fsharp
open RethinkDb.Driver.FSharp
/// string -> (IConnection -> Task<Post>)
let fetchPost (postId : string) =
rethink<Post> {
@ -59,19 +63,21 @@ The examples above both use the default retry logic.
- Only rename functions/methods where required
Within the CE, there are a few differing names, mostly notably at the start (selecting databases and tables); this is to allow for a more natural language flow. Its names may change in the 0.8.x series; it is the most alpha part of the project at this point.
Within the CE, there are a few differing names, mostly notably at the start (selecting databases and tables); this is to allow for a more natural language flow. Its names may change in the 0.8.x series; it is the most alpha part of the project at this point. Also, while CEs now support overloading (thank you F# 6 developers!), they do not detect if the first value in the tupled arguments is different. This is most noticeable once `result*` or `write*` commands have been issued; these support `Task<'T>`, `Async<'T>`, and synchronous `'T` operations, but the follow-on commands will be different (e.x. `withRetryDefault` (tasks) vs. `withAsyncRetryDefault` vs. `withSyncRetryDefault`). There are also versions of these that support optional arguments (for all) and cancellation tokens (for task/async).
The functions do have to change a bit, since they do not support overloading; an example for `filter` is below.
The functions show this pattern throughout, as functions in a module do not support overloading; an example for `filter` is below.
```fsharp
// Function names cannot be polymorphic the way object-oriented methods can, so filter's three overloads become
filter (r.HashMap ("age", 30))
// and
filterFunc (fun row -> row.["age"].Eq(30))
filterFunc (fun row -> row.G("age").Eq 30)
// and
filterJS "function (row) { return 30 == row['age'] }"
```
Functions that support optional arguments end with `WithOptArgs`; those that support cancellation tokens end with `WithCancel`; and, those that support both end with `WithOptArgsAndCancel`.
## Licensing
While no specific additional license restrictions exist for this project, there are modifications to the Apache v2

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,7 @@
[<AutoOpen>]
/// The function-based Domain-Specific Language (DSL) for RethinkDB
module RethinkDb.Driver.FSharp.Functions
open System.Threading
open RethinkDb.Driver
open RethinkDb.Driver.Ast
@ -18,29 +19,118 @@ let asyncCursor<'T> conn (expr : ReqlExpr) =
expr.RunCursorAsync<'T> conn
|> Async.AwaitTask
/// Get the result of a non-select ReQL expression
let asyncReqlResult conn (expr : ReqlExpr) =
expr.RunWriteAsync conn
|> Async.AwaitTask
/// Write a ReQL command, always returning a result
let runWriteResult (expr : ReqlExpr) =
expr.RunWriteAsync
/// Raise an exception if a write command encountered an error
let private raiseIfWriteError (result : Model.Result) =
match result.Errors with
| 0UL -> result
| _ -> raise <| ReqlRuntimeError result.FirstError
/// Write a ReQL command, raising an exception if an error occurs
let runWriteWithCancel cancelToken conn (expr : ReqlExpr) = backgroundTask {
let! result = expr.RunWriteAsync (conn, cancelToken)
return raiseIfWriteError result
}
/// Write a ReQL command, raising an exception if an error occurs
let runWrite (expr : ReqlExpr) = fun conn -> backgroundTask {
let! result = expr.RunWriteAsync conn
if result.Errors > 0UL then raise <| ReqlRuntimeError result.FirstError
return result
let runWrite conn expr = runWriteWithCancel CancellationToken.None conn expr
/// Write a ReQL command with optional arguments, raising an exception if an error occurs
let runWriteWithOptArgsAndCancel args cancelToken conn (expr : ReqlExpr) = backgroundTask {
let! result = expr.RunWriteAsync (conn, RunOptArg.create args, cancelToken)
return raiseIfWriteError result
}
/// Get the results of an expression
let asyncResult<'T> conn (expr : ReqlExpr) =
expr.RunResultAsync<'T> conn
|> Async.AwaitTask
/// Write a ReQL command with optional arguments, raising an exception if an error occurs
let runWriteWithOptArgs args conn expr = runWriteWithOptArgsAndCancel args CancellationToken.None conn expr
/// Write a ReQL command, raising an exception if an error occurs
let asyncWrite conn expr = runWrite conn expr |> Async.AwaitTask
/// Write a ReQL command with optional arguments, raising an exception if an error occurs
let asyncWriteWithOptArgs args conn expr = runWriteWithOptArgs args conn expr |> Async.AwaitTask
/// Write a ReQL command, raising an exception if an error occurs
let asyncWriteWithCancel cancelToken conn expr = runWriteWithCancel cancelToken conn expr |> Async.AwaitTask
/// Write a ReQL command with optional arguments, raising an exception if an error occurs
let asyncWriteWithOptArgsAndCancel args cancelToken conn expr =
runWriteWithOptArgsAndCancel args cancelToken conn expr |> Async.AwaitTask
/// Write a ReQL command synchronously, raising an exception if an error occurs
let syncWrite conn expr = asyncWrite conn expr |> Async.RunSynchronously
/// Write a ReQL command synchronously with optional arguments, raising an exception if an error occurs
let syncWriteWithOptArgs args conn expr = asyncWriteWithOptArgs args conn expr |> Async.RunSynchronously
/// Write a ReQL command with a cancellation token, always returning a result
let runWriteResultWithCancel cancelToken conn (expr : ReqlExpr) =
expr.RunWriteAsync (conn, cancelToken)
/// Write a ReQL command, always returning a result
let runWriteResult conn expr = runWriteResultWithCancel CancellationToken.None conn expr
/// Write a ReQL command with optional arguments and a cancellation token, always returning a result
let runWriteResultWithOptArgsAndCancel args cancelToken conn (expr : ReqlExpr) =
expr.RunWriteAsync (conn, RunOptArg.create args, cancelToken)
/// Write a ReQL command with optional arguments, always returning a result
let runWriteResultWithOptArgs args conn expr = runWriteResultWithOptArgsAndCancel args CancellationToken.None conn expr
/// Write a ReQL command, always returning a result
let asyncWriteResult conn expr = runWriteResult conn expr |> Async.AwaitTask
/// Write a ReQL command with optional arguments, always returning a result
let asyncWriteResultWithOptArgs args conn expr = runWriteResultWithOptArgs args conn expr |> Async.AwaitTask
/// Write a ReQL command with a cancellation token, always returning a result
let asyncWriteResultWithCancel cancelToken conn expr = runWriteResultWithCancel cancelToken conn expr |> Async.AwaitTask
/// Write a ReQL command with optional arguments and a cancellation token, always returning a result
let asyncWriteResultWithOptArgsAndCancel args cancelToken conn expr =
runWriteResultWithOptArgsAndCancel args cancelToken conn expr |> Async.AwaitTask
/// Write a ReQL command synchronously, always returning a result
let syncWriteResult conn expr = asyncWriteResult conn expr |> Async.RunSynchronously
/// Write a ReQL command synchronously with optional arguments, always returning a result
let syncWriteResultWithOptArgs args conn expr = asyncWriteResultWithOptArgs args conn expr |> Async.RunSynchronously
/// Run the ReQL command using a cancellation token, returning the result as the type specified
let runResultWithCancel<'T> cancelToken conn (expr : ReqlExpr) = expr.RunResultAsync<'T> (conn, cancelToken)
/// Run the ReQL command using optional arguments and a cancellation token, returning the result as the type specified
let runResultWithOptArgsAndCancel<'T> args cancelToken conn (expr : ReqlExpr) =
expr.RunResultAsync<'T> (conn, RunOptArg.create args, cancelToken)
/// Run the ReQL command, returning the result as the type specified
let runResult<'T> (expr : ReqlExpr) = expr.RunResultAsync<'T>
let runResult<'T> = runResultWithCancel<'T> CancellationToken.None
/// Run the ReQL command using optional arguments, returning the result as the type specified
let runResultWithOptArgs<'T> args = runResultWithOptArgsAndCancel<'T> args CancellationToken.None
/// Run the ReQL command, returning the result as the type specified
let asyncResult<'T> conn expr =
runResult<'T> expr conn |> Async.AwaitTask
/// Run the ReQL command using optional arguments, returning the result as the type specified
let asyncResultWithOptArgs<'T> args conn expr =
runResultWithOptArgs<'T> args conn expr |> Async.AwaitTask
/// Run the ReQL command using a cancellation token, returning the result as the type specified
let asyncResultWithCancel<'T> cancelToken conn (expr : ReqlExpr) =
runResultWithCancel<'T> cancelToken conn expr |> Async.AwaitTask
/// Run the ReQL command using optional arguments and a cancellation token, returning the result as the type specified
let asyncResultWithOptArgsAndCancel<'T> args cancelToken conn expr =
runResultWithOptArgsAndCancel<'T> args cancelToken conn expr |> Async.AwaitTask
/// Run the ReQL command, returning the result as the type specified
let syncResult<'T> conn expr =
asyncResult<'T> expr conn |> Async.RunSynchronously
/// Run the ReQL command using optional arguments, returning the result as the type specified
let syncResultWithOptArgs<'T> args conn expr =
asyncResultWithOptArgs<'T> args conn expr |> Async.RunSynchronously
/// Get documents between a lower bound and an upper bound based on a primary key
let between (lowerKey : obj) (upperKey : obj) (expr : ReqlExpr) =
@ -58,6 +148,18 @@ let betweenIndex (lowerKey : obj) (upperKey : obj) index expr =
let connection () =
r.Connection ()
/// Count the documents in this query
let count (expr : ReqlExpr) =
expr.Count ()
/// Count the documents in this query where the function returns true
let countFunc (f : ReqlExpr -> bool) (expr : ReqlExpr) =
expr.Count (ReqlFunction1 (fun row -> f row :> obj))
/// Count the documents in this query where the function returns true
let countJS js (expr : ReqlExpr) =
expr.Count (toJS js)
/// Reference a database
let db dbName =
match dbName with "" -> r.Db () | _ -> r.Db dbName
@ -110,21 +212,25 @@ let eqJoinJSIndex js table (indexName : string) expr =
let filter (filterSpec : obj) (expr : ReqlExpr) =
expr.Filter filterSpec
/// Apply optional argument to a filter
let private optArgFilter arg (filter : Filter) =
filter.OptArg (match arg with Default d -> d.reql)
/// Filter documents, providing optional arguments
let filterWithOptArgs (filterSpec : obj) arg expr =
filter filterSpec expr |> optArgFilter arg
filter filterSpec expr |> FilterOptArg.apply arg
/// Filter documents using a function
let filterFunc<'T> (f : ReqlExpr -> 'T) (expr : ReqlExpr) =
let filterFunc (f : ReqlExpr -> bool) (expr : ReqlExpr) =
expr.Filter (ReqlFunction1 (fun row -> f row :> obj))
/// Filter documents using a function, providing optional arguments
let filterFuncWithOptArgs<'T> (f : ReqlExpr -> 'T) arg expr =
filterFunc f expr |> optArgFilter arg
let filterFuncWithOptArgs f arg expr =
filterFunc f expr |> FilterOptArg.apply arg
/// Filter documents using multiple functions (has the effect of ANDing them)
let filterFuncAll (fs : (ReqlExpr -> bool) list) (expr : ReqlExpr) =
(fs |> List.fold (fun (e : ReqlExpr) f -> filterFunc f e) expr) :?> Filter
/// Filter documents using multiple functions (has the effect of ANDing them), providing optional arguments
let filterFuncAllWithOptArgs fs arg expr =
filterFuncAll fs expr |> FilterOptArg.apply arg
/// Filter documents using JavaScript
let filterJS js (expr : ReqlExpr) =
@ -132,7 +238,7 @@ let filterJS js (expr : ReqlExpr) =
/// Filter documents using JavaScript, providing optional arguments
let filterJSWithOptArgs js arg expr =
filterJS js expr |> optArgFilter arg
filterJS js expr |> FilterOptArg.apply arg
/// Get a document by its primary key
let get (documentId : obj) (table : Table) =
@ -182,12 +288,12 @@ let indexList (table : Table) =
let indexRename (oldName : string) (newName : string) (table : Table) =
table.IndexRename (oldName, newName)
/// Rename an index (overwrite will succeed)
let indexRenameWithOverwrite (oldName : string) (newName : string) (table : Table) =
indexRename oldName newName table |> IndexRenameOptArg.apply Overwrite
/// Rename an index (specifying overwrite action)
let indexRenameWithOptArg oldName newName arg table =
indexRename oldName newName table |> IndexRenameOptArg.apply arg
/// Get the status of specific indexes for the given table
let indexStatus (table : Table) (indexes : string list) =
let indexStatus (indexes : string list) (table : Table) =
table.IndexStatus (Array.ofList indexes)
/// Get the status of all indexes for the given table
@ -195,7 +301,7 @@ let indexStatusAll (table : Table) =
table.IndexStatus ()
/// Wait for specific indexes on the given table to become ready
let indexWait (table : Table) (indexes : string list) =
let indexWait (indexes : string list) (table : Table) =
table.IndexWait (Array.ofList indexes)
/// Wait for all indexes on the given table to become ready
@ -231,13 +337,65 @@ let isEmpty (expr : ReqlExpr) =
expr.IsEmpty ()
/// End a sequence after a given number of elements
let limit n (expr : ReqlExpr) =
let limit (n : int) (expr : ReqlExpr) =
expr.Limit n
/// Map the results using a function
let mapFunc f (expr : ReqlExpr) =
expr.Map (ReqlFunction1 f)
/// Map the results using a JavaScript function
let mapJS js (expr : ReqlExpr) =
expr.Map (toJS js)
/// Merge the current query with given document
let merge (doc : obj) (expr : ReqlExpr) =
expr.Merge doc
/// Merge the current query with the results of a function
let mergeFunc f (expr : ReqlExpr) =
expr.Merge (ReqlFunction1 f)
/// Merge the current query with the results of a JavaScript function
let mergeJS js (expr : ReqlExpr) =
expr.Merge (toJS js)
/// Retrieve the nth element in a sequence
let nth n (expr : ReqlExpr) =
expr.Nth n
/// Order a sequence by a given field
let orderBy (field : string) (expr : ReqlExpr) =
expr.OrderBy field
/// Order a sequence in descending order by a given field
let orderByDescending (field : string) (expr : ReqlExpr) =
expr.OrderBy (r.Desc field)
/// Order a sequence by a given function
let orderByFunc f (expr : ReqlExpr) =
expr.OrderBy (ReqlFunction1 f)
/// Order a sequence in descending order by a given function
let orderByFuncDescending f (expr : ReqlExpr) =
expr.OrderBy (r.Desc (ReqlFunction1 f))
/// Order a sequence by a given index
let orderByIndex (index : string) (expr : ReqlExpr) =
expr.OrderBy().OptArg("index", index)
/// Order a sequence in descending order by a given index
let orderByIndexDescending (index : string) (expr : ReqlExpr) =
expr.OrderBy().OptArg("index", r.Desc index)
/// Order a sequence by a given JavaScript function
let orderByJS js (expr : ReqlExpr) =
expr.OrderBy (toJS js)
/// Order a sequence in descending order by a given JavaScript function
let orderByJSDescending js (expr : ReqlExpr) =
expr.OrderBy (r.Desc (toJS js))
/// Create an outer join between two sequences, specifying the join condition with a function
let outerJoinFunc<'T> (otherSeq : obj) (f : ReqlExpr -> ReqlExpr -> 'T) (expr : ReqlExpr) =
expr.OuterJoin (otherSeq, ReqlFunction2 (fun f1 f2 -> f f1 f2 :> obj))
@ -275,7 +433,7 @@ let replaceJSWithOptArgs js args expr =
replaceJS js expr |> ReplaceOptArg.apply args
/// Skip a number of elements from the head of a sequence
let skip n (expr : ReqlExpr) =
let skip (n : int) (expr : ReqlExpr) =
expr.Skip n
/// Ensure changes to a table are written to permanent storage
@ -350,16 +508,43 @@ let zip (expr : ReqlExpr) =
// ~~ RETRY ~~
open RethinkDb.Driver.Net
open System.Threading.Tasks
/// Retry, delaying for each the seconds provided (if required)
let withRetry intervals (f : IConnection -> Task<'T>) =
Retry.withRetry f intervals
let withRetry<'T> intervals f =
Retry.withRetry<'T> f intervals
/// Convert an async function to a task function (Polly does not understand F# Async)
let private asyncFuncToTask<'T> (f : IConnection -> Async<'T>) =
fun conn -> f conn |> Async.StartAsTask
/// Retry, delaying for each the seconds provided (if required)
let withAsyncRetry<'T> intervals f = fun conn ->
withRetry<'T> intervals (asyncFuncToTask f) conn |> Async.AwaitTask
/// Retry, delaying for each the seconds provided (if required)
let withSyncRetry<'T> intervals f =
Retry.withRetrySync<'T> f intervals
/// Retry failed commands with 200ms, 500ms, and 1 second delays
let withRetryDefault (f : IConnection -> Task<'T>) =
Retry.withRetryDefault f
let withRetryDefault<'T> f =
Retry.withRetryDefault<'T> f
/// Retry, delaying for each the seconds provided (if required)
let withAsyncRetryDefault<'T> f = fun conn ->
withRetryDefault<'T> (asyncFuncToTask f) conn |> Async.AwaitTask
/// Retry, delaying for each the seconds provided (if required)
let withSyncRetryDefault<'T> f =
Retry.withRetrySyncDefault<'T> f
/// Retry failed commands one time with no delay
let withRetryOnce (f : IConnection -> Task<'T>) =
Retry.withRetryOnce f
let withRetryOnce<'T> f =
Retry.withRetryOnce<'T> f
/// Retry, delaying for each the seconds provided (if required)
let withAsyncRetryOnce<'T> f = fun conn ->
withRetryOnce<'T> (asyncFuncToTask f) conn |> Async.AwaitTask
/// Retry, delaying for each the seconds provided (if required)
let withSyncRetryOnce<'T> f =
Retry.withRetrySyncOnce<'T> f

View File

@ -215,6 +215,90 @@ module ReplaceOptArg =
r
/// How RethinkDB should read the data
type ReadMode =
/// Return values in memory from the primary replica (default)
| Single
/// Return values committed to a majority of replicas
| Majority
/// Return values in memory from an arbitrary replica
| Outdated
/// The ReQL representation of the read mode
member this.reql = match this with Single -> "single" | Majority -> "majority" | Outdated -> "outdated"
/// How the returned data should be formatted
type ReturnFormat =
/// The native representation for the calling environment
| Native
/// The raw representation
| Raw
/// The ReQL representation of the format
member this.reql = match this with Native -> "native" | Raw -> "raw"
/// Optional arguments for the `run` command
type RunOptArg =
/// How data should be read
| ReadMode of ReadMode
/// The time format (raw is JSON)
| TimeFormat of ReturnFormat
/// Whether to profile the query
| Profile of bool
/// The durability of the command
| Durability of Durability
/// The format of grouped data and streams
| GroupFormat of ReturnFormat
/// The database against which this run should be executed
| Db of string
/// The maximum size of a returned array (RethinkDB default is 100,000)
| ArrayLimit of int
/// The format of binary data
| BinaryFormat of ReturnFormat
/// Minimum number of rows to wait before batching results (RethinkDB default is 8)
| MinBatchRows of int
/// Maximum number of rows to wait before batching results (RethinkDB default is no upper bound)
| MaxBatchRows of int
/// Maximum number of bytes to wait before batching results (RethinkDB default is 1MB)
| MaxBatchBytes of int
/// Maximum number of seconds to wait before batching results (RethinkDB default is 0.5)
| MaxBatchSeconds of float
/// Factor to reduce other values for the first batch
| FirstBatchScaleDownFactor of int
/// The ReQL representation of the optional argument
member this.reql =
let pair =
match this with
| ReadMode mde -> "read_mode", mde.reql :> obj
| TimeFormat fmt -> "time_format", fmt.reql
| Profile prf -> "profile", prf
| Durability dur -> match dur.reql with k, v -> k, v
| GroupFormat fmt -> "group_format", fmt.reql
| Db db -> "db", db
| ArrayLimit lim -> "array_limit", lim
| BinaryFormat fmt -> "binary_format", fmt.reql
| MinBatchRows min -> "min_batch_rows", min
| MaxBatchRows max -> "max_batch_rows", max
| MaxBatchBytes max -> "max_batch_bytes", max
| MaxBatchSeconds max -> "max_batch_seconds", max
| FirstBatchScaleDownFactor fac -> "first_batch_scaledown_factor", fac
fst pair, RethinkDB.R.Expr (snd pair)
/// Function to support `run` optional arguments
module RunOptArg =
open RethinkDb.Driver.Model
/// Create optional argument for a run command
let create (opts : RunOptArg list) =
let args = OptArgs ()
opts |> List.iter (fun arg -> args.Add arg.reql)
args
/// Optional arguments for the `update` statement
type UpdateOptArg =
/// The durability of the command

View File

@ -8,18 +8,18 @@
<PackageProjectUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp</PackageProjectUrl>
<!-- PackageIconUrl>https://github.com/danieljsummers/RethinkDb.Driver.FSharp/raw/master/icon/icon.png</PackageIconUrl -->
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<PackageReleaseNotes>Alpha; use at your own risk</PackageReleaseNotes>
<Copyright>See LICENSE</Copyright>
<PackageTags>RethinkDB document F#</PackageTags>
<VersionPrefix>0.8.0</VersionPrefix>
<VersionSuffix>alpha-0005</VersionSuffix>
<VersionSuffix>alpha-0006</VersionSuffix>
<PackageReleaseNotes>Alpha; use at your own risk</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup>
<Compile Include="Retry.fs" />
<Compile Include="OptArgs.fs" />
<Compile Include="Builder.fs" />
<Compile Include="Functions.fs" />
<Compile Include="Builder.fs" />
<Compile Include="Config.fs" />
</ItemGroup>

View File

@ -20,16 +20,42 @@ let retryPolicy (intervals : float seq) (conn : IConnection) =
(conn :?> Connection).Reconnect false
| false -> ()))
/// Create a retry policy that attempts to reconnect to RethinkDB when a synchronous operation encounters an error
let retryPolicySync (intervals : float seq) (conn : IConnection) =
Policy
.Handle<ReqlDriverError>()
.WaitAndRetry(
intervals |> Seq.map TimeSpan.FromSeconds,
System.Action<exn, TimeSpan, int, Context> (fun ex _ _ _ ->
printf $"Encountered RethinkDB exception: {ex.Message}"
match ex.Message.Contains "socket" with
| true ->
printf "Reconnecting to RethinkDB"
(conn :?> Connection).Reconnect false
| false -> ()))
/// Perform a query, retrying after each delay specified
let withRetry (f : IConnection -> Task<'T>) retries =
let withRetry<'T> (f : IConnection -> Task<'T>) retries =
fun conn -> backgroundTask {
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Perform a synchronous query, retrying after each delay specified
let withRetrySync<'T> (f : IConnection -> 'T) retries = fun conn ->
(retryPolicySync retries conn).Execute(fun () -> f conn)
/// Retry three times, after 200ms, 500ms, and 1 second
let withRetryDefault f =
withRetry f [ 0.2; 0.5; 1.0 ]
let withRetryDefault<'T> f =
withRetry<'T> f [ 0.2; 0.5; 1.0 ]
/// Retry three times, after 200ms, 500ms, and 1 second
let withRetrySyncDefault<'T> f =
withRetrySync<'T> f [ 0.2; 0.5; 1.0 ]
/// Retry one time immediately
let withRetryOnce f =
withRetry f [ 0.0 ]
let withRetryOnce<'T> f =
withRetry<'T> f [ 0.0 ]
/// Retry one time immediately
let withRetrySyncOnce<'T> f =
withRetrySync<'T> f [ 0.0 ]