Add skip, ignore, and retry logic

This commit is contained in:
Daniel J. Summers 2022-04-16 12:20:16 -04:00
parent e1dd4fe588
commit 711855037b
2 changed files with 129 additions and 0 deletions

View File

@ -1,9 +1,11 @@
[<AutoOpen>] [<AutoOpen>]
module RethinkDb.Driver.FSharp.RethinkBuilder module RethinkDb.Driver.FSharp.RethinkBuilder
open Polly
open RethinkDb.Driver open RethinkDb.Driver
open RethinkDb.Driver.Ast open RethinkDb.Driver.Ast
open RethinkDb.Driver.Net open RethinkDb.Driver.Net
open System
open System.Threading.Tasks open System.Threading.Tasks
/// Computation Expression builder for RethinkDB queries /// Computation Expression builder for RethinkDB queries
@ -14,6 +16,20 @@ type RethinkBuilder<'T> () =
fields fields
|> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ()) |> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ())
/// Create a retry policy that attempts to reconnect to RethinkDB on each retry
let retryPolicy (intervals : float seq) (conn : IConnection) =
Policy
.Handle<ReqlDriverError>()
.WaitAndRetryAsync(
intervals |> Seq.map TimeSpan.FromSeconds,
System.Action<exn, TimeSpan, int, Context> (fun ex _ _ _ ->
printf $"Encountered RethinkDB exception: {ex.Message}"
match ex.Message.Contains "socket" with
| true ->
printf "Reconnecting to RethinkDB"
(conn :?> Connection).Reconnect false
| false -> ()))
member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr
member this.For (expr, f) = this.Bind (expr, f) member this.For (expr, f) = this.Bind (expr, f)
@ -88,6 +104,10 @@ type RethinkBuilder<'T> () =
member _.GetAll (tbl : Table, keys : obj list, index : string) = member _.GetAll (tbl : Table, keys : obj list, index : string) =
tbl.GetAll(Array.ofList keys).OptArg ("index", index) tbl.GetAll(Array.ofList keys).OptArg ("index", index)
/// Skip a certain number of results
[<CustomOperation "skip">]
member _.Skip (expr : ReqlExpr, toSkip : int) = expr.Skip toSkip
/// Limit the results of this query /// Limit the results of this query
[<CustomOperation "limit">] [<CustomOperation "limit">]
member _.Limit (expr : ReqlExpr, limit : int) = expr.Limit limit member _.Limit (expr : ReqlExpr, limit : int) = expr.Limit limit
@ -170,6 +190,11 @@ type RethinkBuilder<'T> () =
return! expr.RunResultAsync<'T> conn return! expr.RunResultAsync<'T> conn
} }
/// Execute the query, returning the result of the type specified
[<CustomOperation "result">]
member this.Result (expr, conn) =
this.Result expr conn
/// Execute the query, returning the result of the type specified, or None if no result is found /// Execute the query, returning the result of the type specified, or None if no result is found
[<CustomOperation "resultOption">] [<CustomOperation "resultOption">]
member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> = member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> =
@ -178,6 +203,11 @@ type RethinkBuilder<'T> () =
return match (box >> isNull) result with true -> None | false -> Some result return match (box >> isNull) result with true -> None | false -> Some result
} }
/// Execute the query, returning the result of the type specified, or None if no result is found
[<CustomOperation "resultOption">]
member this.ResultOption (expr, conn) =
this.ResultOption expr conn
/// Perform a write operation /// Perform a write operation
[<CustomOperation "write">] [<CustomOperation "write">]
member _.Write (expr : ReqlExpr) : IConnection -> Task<Model.Result> = member _.Write (expr : ReqlExpr) : IConnection -> Task<Model.Result> =
@ -185,5 +215,103 @@ type RethinkBuilder<'T> () =
return! expr.RunWriteAsync conn return! expr.RunWriteAsync conn
} }
/// Perform a write operation
[<CustomOperation "write">]
member this.Write (expr, conn) =
this.Write expr conn
/// Ignore the result of an operation
[<CustomOperation "ignoreResult">]
member _.IgnoreResult (f : IConnection -> Task<'T>) =
fun conn -> task {
let! _ = f conn
()
}
/// Ignore the result of an operation
[<CustomOperation "ignoreResult">]
member _.IgnoreResult (f : IConnection -> Task<'T option>) =
fun conn -> task {
let! _ = f conn
()
}
/// Ignore the result of an operation
[<CustomOperation "ignoreResult">]
member this.IgnoreResult (f : IConnection -> Task<'T>, conn) =
this.IgnoreResult f conn
/// Ignore the result of an operation
[<CustomOperation "ignoreResult">]
member this.IgnoreResult (f : IConnection -> Task<'T option>, conn) =
this.IgnoreResult f conn
// Reconnection
/// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">]
member _.WithRetry (f : IConnection -> Task<'T>, retries) =
fun conn -> task {
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">]
member _.WithRetry (f : IConnection -> Task<'T option>, retries) =
fun conn -> task {
return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn)
}
/// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">]
member this.WithRetry (f : IConnection -> Task<'T>, retries, conn) =
this.WithRetry (f, retries) conn
/// Retries a variable number of times, waiting each time for the seconds specified
[<CustomOperation "withRetry">]
member this.WithRetry (f : IConnection -> Task<'T option>, retries, conn) =
this.WithRetry (f, retries) conn
/// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T>) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ])
/// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T option>) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ])
/// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T>, conn) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn
/// Retries at 200ms, 500ms, and 1s
[<CustomOperation "withRetryDefault">]
member this.WithRetryDefault (f : IConnection -> Task<'T option>, conn) =
this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn
/// Retries once immediately
[<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T>) =
this.WithRetry (f, [ 0.0 ])
/// Retries once immediately
[<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T option>) =
this.WithRetry (f, [ 0.0 ])
/// Retries once immediately
[<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T>, conn) =
this.WithRetry (f, [ 0.0 ]) conn
/// Retries once immediately
[<CustomOperation "withRetryOnce">]
member this.WithRetryOnce (f : IConnection -> Task<'T option>, conn) =
this.WithRetry (f, [ 0.0 ]) conn
/// RethinkDB computation expression /// RethinkDB computation expression
let rethink<'T> = RethinkBuilder<'T> () let rethink<'T> = RethinkBuilder<'T> ()

View File

@ -24,6 +24,7 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" /> <PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="RethinkDb.Driver" Version="2.*" /> <PackageReference Include="RethinkDb.Driver" Version="2.*" />
</ItemGroup> </ItemGroup>