From 711855037bc4e58e3265e330c643fcd2735c304d Mon Sep 17 00:00:00 2001 From: "Daniel J. Summers" Date: Sat, 16 Apr 2022 12:20:16 -0400 Subject: [PATCH] Add skip, ignore, and retry logic --- src/RethinkDb.Driver.FSharp/Builder.fs | 128 ++++++++++++++++++ .../RethinkDb.Driver.FSharp.fsproj | 1 + 2 files changed, 129 insertions(+) diff --git a/src/RethinkDb.Driver.FSharp/Builder.fs b/src/RethinkDb.Driver.FSharp/Builder.fs index 1776514..6e3a486 100644 --- a/src/RethinkDb.Driver.FSharp/Builder.fs +++ b/src/RethinkDb.Driver.FSharp/Builder.fs @@ -1,9 +1,11 @@ [] module RethinkDb.Driver.FSharp.RethinkBuilder +open Polly open RethinkDb.Driver open RethinkDb.Driver.Ast open RethinkDb.Driver.Net +open System open System.Threading.Tasks /// Computation Expression builder for RethinkDB queries @@ -14,6 +16,20 @@ type RethinkBuilder<'T> () = fields |> List.fold (fun (m : Model.MapObject) item -> m.With (fst item, snd item)) (RethinkDB.R.HashMap ()) + /// Create a retry policy that attempts to reconnect to RethinkDB on each retry + let retryPolicy (intervals : float seq) (conn : IConnection) = + Policy + .Handle() + .WaitAndRetryAsync( + intervals |> Seq.map TimeSpan.FromSeconds, + System.Action (fun ex _ _ _ -> + printf $"Encountered RethinkDB exception: {ex.Message}" + match ex.Message.Contains "socket" with + | true -> + printf "Reconnecting to RethinkDB" + (conn :?> Connection).Reconnect false + | false -> ())) + member _.Bind (expr : ReqlExpr, f : ReqlExpr -> ReqlExpr) = f expr member this.For (expr, f) = this.Bind (expr, f) @@ -88,6 +104,10 @@ type RethinkBuilder<'T> () = member _.GetAll (tbl : Table, keys : obj list, index : string) = tbl.GetAll(Array.ofList keys).OptArg ("index", index) + /// Skip a certain number of results + [] + member _.Skip (expr : ReqlExpr, toSkip : int) = expr.Skip toSkip + /// Limit the results of this query [] member _.Limit (expr : ReqlExpr, limit : int) = expr.Limit limit @@ -170,6 +190,11 @@ type RethinkBuilder<'T> () = return! expr.RunResultAsync<'T> conn } + /// Execute the query, returning the result of the type specified + [] + member this.Result (expr, conn) = + this.Result expr conn + /// Execute the query, returning the result of the type specified, or None if no result is found [] member _.ResultOption (expr : ReqlExpr) : IConnection -> Task<'T option> = @@ -178,6 +203,11 @@ type RethinkBuilder<'T> () = return match (box >> isNull) result with true -> None | false -> Some result } + /// Execute the query, returning the result of the type specified, or None if no result is found + [] + member this.ResultOption (expr, conn) = + this.ResultOption expr conn + /// Perform a write operation [] member _.Write (expr : ReqlExpr) : IConnection -> Task = @@ -185,5 +215,103 @@ type RethinkBuilder<'T> () = return! expr.RunWriteAsync conn } + /// Perform a write operation + [] + member this.Write (expr, conn) = + this.Write expr conn + + /// Ignore the result of an operation + [] + member _.IgnoreResult (f : IConnection -> Task<'T>) = + fun conn -> task { + let! _ = f conn + () + } + + /// Ignore the result of an operation + [] + member _.IgnoreResult (f : IConnection -> Task<'T option>) = + fun conn -> task { + let! _ = f conn + () + } + + /// Ignore the result of an operation + [] + member this.IgnoreResult (f : IConnection -> Task<'T>, conn) = + this.IgnoreResult f conn + + /// Ignore the result of an operation + [] + member this.IgnoreResult (f : IConnection -> Task<'T option>, conn) = + this.IgnoreResult f conn + + // Reconnection + + /// Retries a variable number of times, waiting each time for the seconds specified + [] + member _.WithRetry (f : IConnection -> Task<'T>, retries) = + fun conn -> task { + return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn) + } + + /// Retries a variable number of times, waiting each time for the seconds specified + [] + member _.WithRetry (f : IConnection -> Task<'T option>, retries) = + fun conn -> task { + return! (retryPolicy retries conn).ExecuteAsync(fun () -> f conn) + } + + /// Retries a variable number of times, waiting each time for the seconds specified + [] + member this.WithRetry (f : IConnection -> Task<'T>, retries, conn) = + this.WithRetry (f, retries) conn + + /// Retries a variable number of times, waiting each time for the seconds specified + [] + member this.WithRetry (f : IConnection -> Task<'T option>, retries, conn) = + this.WithRetry (f, retries) conn + + /// Retries at 200ms, 500ms, and 1s + [] + member this.WithRetryDefault (f : IConnection -> Task<'T>) = + this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) + + /// Retries at 200ms, 500ms, and 1s + [] + member this.WithRetryDefault (f : IConnection -> Task<'T option>) = + this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) + + /// Retries at 200ms, 500ms, and 1s + [] + member this.WithRetryDefault (f : IConnection -> Task<'T>, conn) = + this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn + + /// Retries at 200ms, 500ms, and 1s + [] + member this.WithRetryDefault (f : IConnection -> Task<'T option>, conn) = + this.WithRetry (f, [ 0.2; 0.5; 1.0 ]) conn + + /// Retries once immediately + [] + member this.WithRetryOnce (f : IConnection -> Task<'T>) = + this.WithRetry (f, [ 0.0 ]) + + /// Retries once immediately + [] + member this.WithRetryOnce (f : IConnection -> Task<'T option>) = + this.WithRetry (f, [ 0.0 ]) + + /// Retries once immediately + [] + member this.WithRetryOnce (f : IConnection -> Task<'T>, conn) = + this.WithRetry (f, [ 0.0 ]) conn + + /// Retries once immediately + [] + member this.WithRetryOnce (f : IConnection -> Task<'T option>, conn) = + this.WithRetry (f, [ 0.0 ]) conn + + /// RethinkDB computation expression let rethink<'T> = RethinkBuilder<'T> () diff --git a/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj b/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj index ddd7a2f..8c1466d 100644 --- a/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj +++ b/src/RethinkDb.Driver.FSharp/RethinkDb.Driver.FSharp.fsproj @@ -24,6 +24,7 @@ +