5 Commits

Author SHA1 Message Date
1bfdcf165c Launch all tasks as background tasks
- Standardize on sync/async prefixes
  - affects toList, CreateConnection, ignore*
- Target netstandard-2.0 for max compatibility
- Update dependencies
2024-12-15 22:59:10 -05:00
a09181eee0 Add logging CreateConnection overloads 2022-07-19 09:12:57 -04:00
5d15de240d Add URI configuration option 2022-07-18 22:32:32 -04:00
61e17038f8 Add tableCreate opt arg functions 2022-07-18 08:52:26 -04:00
9281941603 Add retry for cursor DSL operations 2022-06-15 22:41:16 -04:00
9 changed files with 924 additions and 538 deletions

View File

@@ -41,13 +41,15 @@ let fetchPost (postId : string) =
}
```
### A standard way to translate JSON into a strongly-typed configuration
### A standard way to translate JSON or a URI into a strongly-typed configuration
```fsharp
/// type: DataConfig
let config = DataConfig.fromJsonFile "data-config.json"
// OR
let config = DataConfig.fromConfiguration (config.GetSection "RethinkDB")
// OR
let config = DataConfig.fromUri (config.GetConnectionString "RethinkDB")
/// type: IConnection
let conn = config.Connect ()

File diff suppressed because it is too large Load Diff

View File

@@ -1,10 +1,11 @@
namespace RethinkDb.Driver.FSharp
open System
open Microsoft.Extensions.Configuration
open Microsoft.Extensions.Logging
open Newtonsoft.Json.Linq
open RethinkDb.Driver
open RethinkDb.Driver.Net
open System.Threading.Tasks
/// Parameters for the RethinkDB configuration
type DataConfigParameter =
@@ -36,17 +37,42 @@ type DataConfig =
static member empty =
{ Parameters = [] }
/// Create a RethinkDB connection
member this.CreateConnection () : IConnection =
/// Build the connection from the given parameters
member private this.BuildConnection() =
this.Parameters
|> Seq.fold ConnectionBuilder.build (RethinkDB.R.Connection())
|> function builder -> builder.Connect ()
/// Create a RethinkDB connection
member this.CreateConnectionAsync () : Task<Connection> =
this.Parameters
|> Seq.fold ConnectionBuilder.build (RethinkDB.R.Connection ())
|> function builder -> builder.ConnectAsync ()
/// Create a RethinkDB connection (task async)
member this.CreateConnection() = backgroundTask {
let! conn = this.BuildConnection().ConnectAsync()
return conn :> IConnection
}
/// Create a RethinkDB connection, logging the connection settings (task async)
member this.CreateConnection(log: ILogger) = backgroundTask {
let builder = this.BuildConnection()
if not (isNull log) then log.LogInformation $"Connecting to {this.EffectiveUri}"
let! conn = builder.ConnectAsync()
return conn :> IConnection
}
/// Create a RethinkDB connection (F# async)
member this.CreateAsyncConnection() =
this.CreateConnection() |> Async.AwaitTask
/// Create a RethinkDB connection, logging the connection settings (F# async)
member this.CreateAsyncConnection(log: ILogger) =
this.CreateConnection(log) |> Async.AwaitTask
/// Create a RethinkDB connection (sync)
member this.CreateSyncConnection() : IConnection =
this.BuildConnection().Connect()
/// Create a RethinkDB connection, logging the connection settings (sync)
member this.CreateSyncConnection(log: ILogger) : IConnection =
let builder = this.BuildConnection()
if not (isNull log) then log.LogInformation $"Connecting to {this.EffectiveUri}"
builder.Connect()
/// The effective hostname
member this.Hostname =
@@ -72,6 +98,20 @@ type DataConfig =
| Some (Database x) -> x
| _ -> RethinkDBConstants.DefaultDbName
/// The effective configuration URI (excludes password / auth key)
member this.EffectiveUri =
seq {
"rethinkdb://"
match this.Parameters |> List.tryPick (fun x -> match x with User _ -> Some x | _ -> None) with
| Some (User(username, _)) -> $"{username}:***pw***@"
| _ ->
match this.Parameters |> List.tryPick (fun x -> match x with AuthKey _ -> Some x | _ -> None) with
| Some (AuthKey _) -> "****key****@"
| _ -> ()
$"{this.Hostname}:{this.Port}/{this.Database}?timeout={this.Timeout}"
}
|> String.concat ""
/// Parse settings from JSON
///
/// A sample JSON object with all the possible properties filled in:
@@ -116,3 +156,26 @@ type DataConfig =
match cfg["username"], cfg["password"] with null, _ | _, null -> () | user -> User user
]
}
/// Parse settings from a URI
///
/// rethinkdb://user:password@host:port/database?timeout=##
/// OR
/// rethinkdb://authkey@host:port/database?timeout=##
///
/// Scheme and host are required; all other settings optional
static member FromUri(uri: string) =
let it = Uri uri
if it.Scheme <> "rethinkdb" then invalidArg "Scheme" $"""URI scheme must be "rethinkdb" (was {it.Scheme})"""
{ Parameters =
[ Hostname it.Host
if it.Port <> -1 then Port it.Port
if it.UserInfo <> "" then
if it.UserInfo.Contains ":" then
let parts = it.UserInfo.Split ':' |> Array.truncate 2
User(parts[0], parts[1])
else AuthKey it.UserInfo
if it.Segments.Length > 1 then Database it.Segments[1]
if it.Query.Contains "?timeout=" then Timeout(int it.Query[9..])
]
}

View File

@@ -18,15 +18,17 @@ module private Helpers =
// ~~ WRITES ~~
/// Write a ReQL command with a cancellation token, always returning a result
let runWriteResultWithCancel cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunWriteAsync (conn, cancelToken)
let runWriteResultWithCancel cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunWriteAsync(conn, cancelToken)
}
/// Write a ReQL command, always returning a result
let runWriteResult expr = runWriteResultWithCancel CancellationToken.None expr
/// Write a ReQL command with optional arguments and a cancellation token, always returning a result
let runWriteResultWithOptArgsAndCancel args cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunWriteAsync (conn, RunOptArg.create args, cancelToken)
let runWriteResultWithOptArgsAndCancel args cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunWriteAsync(conn, RunOptArg.create args, cancelToken)
}
/// Write a ReQL command with optional arguments, always returning a result
let runWriteResultWithOptArgs args expr = runWriteResultWithOptArgsAndCancel args CancellationToken.None expr
@@ -108,12 +110,14 @@ let syncWriteWithOptArgs args expr = fun conn ->
// ~~ Full results (atom / sequence) ~~
/// Run the ReQL command using a cancellation token, returning the result as the type specified
let runResultWithCancel<'T> cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunResultAsync<'T> (conn, cancelToken)
let runResultWithCancel<'T> cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunResultAsync<'T>(conn, cancelToken)
}
/// Run the ReQL command using optional arguments and a cancellation token, returning the result as the type specified
let runResultWithOptArgsAndCancel<'T> args cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunResultAsync<'T> (conn, RunOptArg.create args, cancelToken)
let runResultWithOptArgsAndCancel<'T> args cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunResultAsync<'T>(conn, RunOptArg.create args, cancelToken)
}
/// Run the ReQL command, returning the result as the type specified
let runResult<'T> expr = runResultWithCancel<'T> CancellationToken.None expr
@@ -165,20 +169,22 @@ let asAsyncOption (f : IConnection -> Async<'T>) conn = async {
let asSyncOption (f: IConnection -> 'T) conn = nullToOption (f conn)
/// Ignore the result of a task-based query
let ignoreResult<'T> (f : IConnection -> Tasks.Task<'T>) conn = task {
let! _ = (f conn).ConfigureAwait false
let ignoreResult<'T> (f: IConnection -> Tasks.Task<'T>) conn = backgroundTask {
let! _ = f conn
()
}
// ~~ Cursors / partial results (sequence / partial) ~~
/// Run the ReQL command using a cancellation token, returning a cursor for the type specified
let runCursorWithCancel<'T> cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunCursorAsync<'T> (conn, cancelToken)
let runCursorWithCancel<'T> cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunCursorAsync<'T>(conn, cancelToken)
}
/// Run the ReQL command using optional arguments and a cancellation token, returning a cursor for the type specified
let runCursorWithOptArgsAndCancel<'T> args cancelToken (expr : ReqlExpr) = fun conn ->
expr.RunCursorAsync<'T> (conn, RunOptArg.create args, cancelToken)
let runCursorWithOptArgsAndCancel<'T> args cancelToken (expr: ReqlExpr) = fun conn -> backgroundTask {
return! expr.RunCursorAsync<'T>(conn, RunOptArg.create args, cancelToken)
}
/// Run the ReQL command, returning a cursor for the type specified
let runCursor<'T> expr = runCursorWithCancel<'T> CancellationToken.None expr
@@ -229,13 +235,13 @@ let toList<'T> (f : IConnection -> Task<Cursor<'T>>) = fun conn -> backgroundTas
}
/// Convert a cursor to a list of items
let toListAsync<'T> (f : IConnection -> Async<Cursor<'T>>) = fun conn -> async {
let asyncToList<'T> (f: IConnection -> Async<Cursor<'T>>) = fun conn -> async {
use! cursor = f conn
return! cursorToList<'T> cursor |> Async.AwaitTask
}
/// Convert a cursor to a list of items
let toListSync<'T> (f : IConnection -> Cursor<'T>) = fun conn ->
let syncToList<'T> (f: IConnection -> Cursor<'T>) = fun conn ->
use cursor = f conn
cursorToList cursor |> Async.AwaitTask |> Async.RunSynchronously
@@ -576,6 +582,14 @@ let tableCreate (tableName : string) (db : Db) =
let tableCreateInDefault (tableName: string) =
r.TableCreate tableName
/// Create a table in the connection-default database, providing optional arguments
let tableCreateInDefaultWithOptArgs (tableName: string) args =
r.TableCreate tableName |> TableCreateOptArg.apply args
/// Create a table in the given database, providing optional arguments
let tableCreateWithOptArgs (tableName: string) args (db: Db) =
db.TableCreate tableName |> TableCreateOptArg.apply args
/// Drop a table in the given database
let tableDrop (tableName: string) (db: Db) =
db.TableDrop tableName
@@ -632,7 +646,9 @@ let withRetry<'T> intervals f =
/// Convert an async function to a task function (Polly does not understand F# Async)
let private asyncFuncToTask<'T> (f : IConnection -> Async<'T>) =
fun conn -> f conn |> Async.StartAsTask
fun conn -> backgroundTask {
return! f conn |> Async.StartAsTask
}
/// Retry, delaying for each the seconds provided (if required)
let withAsyncRetry<'T> intervals f = fun conn ->

View File

@@ -159,10 +159,10 @@ val syncCursorWithOptArgs<'T> : RunOptArg list -> ReqlExpr -> (IConnection -> Cu
val toList<'T> : (IConnection -> Task<Cursor<'T>>) -> IConnection -> Task<'T list>
/// Convert a cursor to a list of items
val toListAsync<'T> : (IConnection -> Async<Cursor<'T>>) -> IConnection -> Async<'T list>
val asyncToList<'T> : (IConnection -> Async<Cursor<'T>>) -> IConnection -> Async<'T list>
/// Convert a cursor to a list of items
val toListSync<'T> : (IConnection -> Cursor<'T>) -> IConnection -> 'T list
val syncToList<'T> : (IConnection -> Cursor<'T>) -> IConnection -> 'T list
/// Apply a connection to the query pipeline (typically the final step)
val withConn<'T> : IConnection -> (IConnection -> 'T) -> 'T
@@ -418,6 +418,12 @@ val tableCreate : string -> Db -> TableCreate
/// Create a table in the connection-default database
val tableCreateInDefault : string -> TableCreate
/// Create a table in the connection-default database, providing optional arguments
val tableCreateInDefaultWithOptArgs : string -> TableCreateOptArg list -> TableCreate
/// Create a table in the given database, providing optional arguments
val tableCreateWithOptArgs : string -> TableCreateOptArg list -> Db -> TableCreate
/// Drop a table in the given database
val tableDrop : string -> Db -> TableDrop

View File

@@ -299,6 +299,57 @@ module RunOptArg =
args
/// Definition of server tag/replica count
type ReplicaTag =
/// A tagged server replica, along with the number of replicas per shard for that server
| ReplicaTag of string * int
/// Definition of replicas per shard when creating a table
type ReplicaSpec =
/// Create this number of replicas per shard
| Number of int
/// Create the replicas across tagged servers, using the specified tag as the primary server
| WithTags of string * ReplicaTag list
/// Optional arguments for creating tables
type TableCreateOptArg =
/// The name of the primary key field (default is "id")
| PrimaryKey of string
/// The durability of the command
| Durability of Durability
/// The number of shards to create (1 to 64)
| Shards of int
/// The replicas per shard for this table
| Replicas of ReplicaSpec
/// Functions to support `tableCreate` optional arguments
module TableCreateOptArg =
/// Apply a list of optional arguments to a tableCreate statement
let apply opts (tc: TableCreate) =
opts
|> List.fold (fun (tc: TableCreate) arg ->
match arg with
| PrimaryKey pk -> tc.OptArg("primary_key", pk)
| Durability dur -> tc.OptArg dur.reql
| Shards sh -> tc.OptArg("shards", sh)
| Replicas rep ->
match rep with
| Number count -> tc.OptArg("replicas", count)
| WithTags (primary, all) ->
let (ReplicaTag (firstTag, firstCount)) = List.head all
let replica =
all
|> List.skip 1
|> List.fold (fun (h: Model.MapObject) arg ->
let (ReplicaTag (tag, count)) = arg
h.With (tag, count))
(RethinkDB.R.HashMap(firstTag, firstCount))
tc.OptArg("replicas", replica).OptArg("primary_replica_tag", primary)
)
tc
/// Optional arguments for the `update` statement
type UpdateOptArg =
/// The durability of the command
@@ -323,4 +374,3 @@ module UpdateOptArg =
| NonAtomic non -> upd.OptArg("non_atomic", non)
| IgnoreWriteHook ign -> upd.OptArg("ignore_write_hook", ign))
u

View File

@@ -10,6 +10,8 @@ open RethinkDb.Driver.FSharp
let dataCfg = DataConfig.fromJson "rethink-config.json"
// - or -
let dataCfg = DataConfig.fromConfiguration [config-section]
// - or -
let dataCfg = DataConfig.fromUri [connection-string]
let conn = dataCfg.CreateConnection () // IConnection
```

View File

@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<TargetFramework>netstandard2.0</TargetFramework>
<Description>Idiomatic F# extensions on the official RethinkDB C# driver</Description>
<Authors>Daniel J. Summers,Bit Badger Solutions</Authors>
<PackageLicenseExpression>Apache-2.0</PackageLicenseExpression>
@@ -13,10 +13,9 @@
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
<Copyright>See LICENSE</Copyright>
<PackageTags>RethinkDB document F#</PackageTags>
<VersionPrefix>0.9.0</VersionPrefix>
<VersionSuffix>beta-04</VersionSuffix>
<VersionPrefix>1.0.0</VersionPrefix>
<PackageReleaseNotes>
Add cursor / toList functions and DSL operations
Add URI config option and logging CreateConnection overloads
</PackageReleaseNotes>
</PropertyGroup>
@@ -32,11 +31,11 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration" Version="6.0.1" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
<PackageReference Include="Polly" Version="7.2.3" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="9.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
<PackageReference Include="Polly" Version="8.5.0" />
<PackageReference Include="RethinkDb.Driver" Version="2.*" />
<PackageReference Update="FSharp.Core" Version="6.0.3" />
<PackageReference Update="FSharp.Core" Version="9.0.100" />
</ItemGroup>
</Project>