Update to current project system

- Add F# driver project
- Replace async with task (both RethinkDB and ASP.NET Core are task-based)
- Add cancellation token support (F# RethinkDB driver does not support them yet)
This commit is contained in:
Daniel J. Summers 2022-04-19 12:12:27 -04:00
parent b46f2a83f0
commit 21ef9bac02
4 changed files with 242 additions and 228 deletions

View File

@ -3,212 +3,235 @@ namespace RethinkDB.DistributedCache
open Microsoft.Extensions.Caching.Distributed open Microsoft.Extensions.Caching.Distributed
open Microsoft.Extensions.Logging open Microsoft.Extensions.Logging
open Microsoft.Extensions.Options open Microsoft.Extensions.Options
open Newtonsoft.Json
open RethinkDb.Driver open RethinkDb.Driver
open RethinkDb.Driver.Net open RethinkDb.Driver.FSharp
open System open System
open System.Text open System.Text
open System.Threading
open System.Threading.Tasks open System.Threading.Tasks
// H/T: Suave
[<AutoOpen>]
module AsyncExtensions =
type Microsoft.FSharp.Control.AsyncBuilder with
/// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on
/// a standard .NET task
member x.Bind(t : Task<'T>, f:'T -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f)
/// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on
/// a standard .NET task which does not commpute a value
member x.Bind(t : Task, f : unit -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f)
/// Persistence object for a cache entry /// Persistence object for a cache entry
type CacheEntry = { [<CLIMutable; NoComparison; NoEquality>]
/// The Id for the cache entry type CacheEntry =
[<JsonProperty("id")>] { /// The ID for the cache entry
Id : string id : string
/// The payload for the cache entry (as a UTF-8 string)
Payload : string /// The payload for the cache entry (as a UTF-8 string)
/// The ticks at which this entry expires payload : string
ExpiresAt : int64
/// The number of seconds in the sliding expiration /// The ticks at which this entry expires
SlidingExpiration : int expiresAt : int64
}
/// The number of seconds in the sliding expiration
slidingExpiration : int
}
/// Record to update sliding expiration for an entry
type SlidingExpirationUpdate = { ExpiresAt : int64 }
/// IDistributedCache implementation utilizing RethinkDB /// IDistributedCache implementation utilizing RethinkDB
[<AllowNullLiteral>] [<AllowNullLiteral>]
type DistributedRethinkDBCache(options : IOptions<DistributedRethinkDBCacheOptions>, type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOptions>,
log : ILogger<DistributedRethinkDBCache>) = log : ILogger<DistributedRethinkDBCache>) =
/// RethinkDB /// RethinkDB
static let r = RethinkDB.R static let r = RethinkDB.R
/// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist
static let mutable environmentChecked = false static let mutable environmentChecked = false
do do
match options with match options with
| null | _ when isNull options.Value -> nullArg "options" | null | _ when isNull options.Value -> nullArg "options"
| _ when isNull options.Value.Connection -> nullArg "Connection" | _ when isNull options.Value.Connection -> nullArg "Connection"
| _ -> () | _ -> ()
/// Options /// Options
let opts = options.Value let opts = options.Value
/// Shorthand to get the database /// The database name (blank uses connection default)
let database = match String.IsNullOrEmpty opts.Database with true -> r.Db() | db -> r.Db(db) let db = defaultArg (Option.ofObj opts.Database) ""
/// The table name; default to "Cache" if not provided
let table = match defaultArg (Option.ofObj opts.TableName) "" with "" -> "Cache" | tbl -> tbl
/// The name of the cache
let cacheName =
seq {
match db with "" -> () | _ -> $"{db}."
table
}
|> Seq.reduce (+)
/// Debug message
let dbug text =
if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{cacheName}] %s{text ()}"
/// Make sure the RethinkDB database, table, expiration index exist
let checkEnvironment (_ : CancellationToken) =
backgroundTask {
if environmentChecked then
dbug <| fun () -> "Skipping environment check because it has already been performed"
return ()
dbug <| fun () -> "|> Checking for proper RethinkDB cache environment"
// Database
match db with
| "" -> dbug <| fun () -> " Skipping database check because it was not specified"
| _ ->
dbug <| fun () -> $" Checking for database {db} existence..."
let! dbs = rethink<string list> { dbList; result; withRetryDefault opts.Connection }
if not (dbs |> List.contains db) then
dbug <| fun () -> sprintf $" ...creating database {db}..."
do! rethink { dbCreate db; write; withRetryDefault; ignoreResult opts.Connection }
dbug <| fun () -> " ...done"
// Table
dbug <| fun () -> sprintf $" Checking for table {table} existence..."
let! tables = rethink<string list> { tableList db; result; withRetryDefault opts.Connection }
if not (tables |> List.contains table) then
dbug <| fun () -> sprintf $" ...creating table {table}..."
do! rethink { withDb db; tableCreate table; write; withRetryDefault; ignoreResult opts.Connection }
dbug <| fun () -> " ...done"
// Index
dbug <| fun () -> sprintf $" Checking for index {table}.expiresAt..."
let! indexes = rethink<string list> {
withDb db; withTable table
indexList
result; withRetryDefault opts.Connection
}
if not (indexes |> List.contains "expiresAt") then
dbug <| fun () -> sprintf $" ...creating index expiresAt on table {table}..."
do! rethink {
withDb db; withTable table
indexCreate "expiresAt"
write; withRetryDefault; ignoreResult opts.Connection
}
dbug <| fun () -> " ...done"
dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..."
environmentChecked <- true
}
/// Remove entries from the cache that are expired
let purgeExpired (_ : CancellationToken) =
backgroundTask {
let tix = DateTime.UtcNow.Ticks - 1L
dbug <| fun () -> $"Purging expired entries (<= %i{tix})"
do! rethink {
withDb db; withTable table
between (r.Minval ()) tix [ BetweenOptArg.Index "expiresAt" ]
delete
write; withRetryDefault; ignoreResult opts.Connection
}
}
/// Default the table name to "Cache" if it is not provided /// Calculate ticks from now for the given number of seconds
let tableName = match String.IsNullOrEmpty opts.Database with true -> "Cache" | _ -> opts.TableName let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000)
/// Shorthand to get the table /// Get the cache entry specified
let table = database.Table tableName let getCacheEntry (key : string) (_ : CancellationToken) =
rethink<CacheEntry> {
withDb db; withTable table
get key
resultOption; withRetryDefault opts.Connection
}
/// The name of the cache /// Refresh (update expiration based on sliding expiration) the cache entry specified
let cacheName = let refreshCacheEntry (entry : CacheEntry) (_ : CancellationToken) =
seq { backgroundTask {
match String.IsNullOrEmpty opts.Database with true -> () | _ -> yield opts.Database; yield "." if entry.slidingExpiration > 0 then
yield tableName do! rethink {
} withDb db; withTable table
|> Seq.reduce (+) get entry.id
update [ "expiresAt", ticksFromNow entry.slidingExpiration :> obj ]
write; withRetryDefault; ignoreResult opts.Connection
}
}
/// Debug message /// Get the payload for the cache entry
let dbug text = let getEntry key (cnxToken : CancellationToken) =
match log.IsEnabled LogLevel.Debug with backgroundTask {
| true -> text () |> sprintf "[%s] %s" cacheName |> log.LogDebug cnxToken.ThrowIfCancellationRequested ()
| _ -> () do! checkEnvironment cnxToken
do! purgeExpired cnxToken
/// Make sure the RethinkDB database, table, expiration index exist match! getCacheEntry key cnxToken with
let checkEnvironment () = | None ->
async { dbug <| fun () -> $"Cache key {key} not found"
match environmentChecked with
| true -> dbug <| fun () -> "Skipping environment check because it has already been performed"
| _ ->
dbug <| fun () -> "|> Checking for proper RethinkDB cache environment"
// Database
match opts.Database with
| "" -> dbug <| fun () -> " Skipping database check because it was not specified"
| db -> dbug <| fun () -> sprintf " Checking for database %s existence..." db
let! dbs = r.DbList().RunResultAsync<string list>(opts.Connection)
match dbs |> List.contains db with
| true -> ()
| _ -> dbug <| fun () -> sprintf " ...creating database %s..." db
do! r.DbCreate(db).RunResultAsync(opts.Connection)
dbug <| fun () -> " ...done"
// Table
dbug <| fun () -> sprintf " Checking for table %s existence..." tableName
let! tables = database.TableList().RunResultAsync<string list>(opts.Connection)
match tables |> List.contains tableName with
| true -> ()
| _ -> dbug <| fun () -> sprintf " ...creating table %s..." tableName
do! database.TableCreate(tableName).RunResultAsync(opts.Connection)
dbug <| fun () -> " ...done"
// Index
dbug <| fun () -> sprintf " Checking for index %s.ExpiresAt..." tableName
let! indexes = table.IndexList().RunResultAsync<string list>(opts.Connection)
match indexes |> List.contains "ExpiresAt" with
| true -> ()
| _ -> dbug <| fun () -> sprintf " ...creating index ExpiresAt on table %s..." tableName
do! table.IndexCreate("ExpiresAt").RunResultAsync(opts.Connection)
dbug <| fun () -> " ...done"
dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..."
environmentChecked <- true
}
/// Remove entries from the cache that are expired
let purgeExpired () =
async {
let tix = DateTime.UtcNow.Ticks - 1L
dbug <| fun () -> sprintf "Purging expired entries (<= %i)" tix
do! table.Between(r.Minval, tix).OptArg("index", "ExpiresAt").Delete().RunResultAsync(opts.Connection)
}
/// Calculate ticks from now for the given number of seconds
let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000)
/// Get the cache entry specified
let getCacheEntry (key : string) =
async {
let! entry = table.Get(key).RunResultAsync<CacheEntry>(opts.Connection)
return entry
}
/// Refresh (update expiration based on sliding expiration) the cache entry specified
let refreshCacheEntry (entry : CacheEntry) =
async {
match entry.SlidingExpiration with
| 0 -> ()
| seconds -> do! table.Get(entry.Id)
.Update({ ExpiresAt = ticksFromNow seconds })
.RunResultAsync(opts.Connection)
}
/// Get the payload for the cache entry
let getEntry key =
async {
do! checkEnvironment ()
do! purgeExpired ()
let! entry = getCacheEntry key
match box entry with
| null -> dbug <| fun () -> sprintf "Cache key %s not found" key
return null return null
| _ -> dbug <| fun () -> sprintf "Cache key %s found" key | Some entry ->
do! refreshCacheEntry entry dbug <| fun () -> $"Cache key {key} found"
return UTF8Encoding.UTF8.GetBytes entry.Payload do! refreshCacheEntry entry cnxToken
} return UTF8Encoding.UTF8.GetBytes entry.payload
}
/// Update the sliding expiration for a cache entry /// Update the sliding expiration for a cache entry
let refreshEntry key = let refreshEntry key (cnxToken : CancellationToken) =
async { backgroundTask {
do! checkEnvironment () cnxToken.ThrowIfCancellationRequested ()
let! entry = getCacheEntry key do! checkEnvironment cnxToken
match box entry with null -> () | _ -> do! refreshCacheEntry entry match! getCacheEntry key cnxToken with None -> () | Some entry -> do! refreshCacheEntry entry cnxToken
do! purgeExpired () do! purgeExpired cnxToken
return () return ()
} }
/// Remove the specified cache entry /// Remove the specified cache entry
let removeEntry (key : string) = let removeEntry (key : string) (cnxToken : CancellationToken) =
async { backgroundTask {
do! checkEnvironment () cnxToken.ThrowIfCancellationRequested ()
do! table.Get(key).Delete().RunResultAsync(opts.Connection) do! checkEnvironment cnxToken
do! purgeExpired () do! rethink {
} withDb db; withTable table
get key
delete
write; withRetryDefault; ignoreResult opts.Connection
}
do! purgeExpired cnxToken
}
/// Set the value of a cache entry /// Set the value of a cache entry
let setEntry key payload (options : DistributedCacheEntryOptions) = let setEntry key (payload : byte[]) (options : DistributedCacheEntryOptions) (cnxToken : CancellationToken) =
async { backgroundTask {
do! checkEnvironment () cnxToken.ThrowIfCancellationRequested ()
do! purgeExpired () do! checkEnvironment cnxToken
let addExpiration entry = do! purgeExpired cnxToken
match true with let addExpiration entry =
| _ when options.SlidingExpiration.HasValue -> match true with
{ entry with ExpiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds | _ when options.SlidingExpiration.HasValue ->
SlidingExpiration = options.SlidingExpiration.Value.Seconds } { entry with expiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds
| _ when options.AbsoluteExpiration.HasValue -> slidingExpiration = options.SlidingExpiration.Value.Seconds }
{ entry with ExpiresAt = options.AbsoluteExpiration.Value.UtcTicks } | _ when options.AbsoluteExpiration.HasValue ->
| _ when options.AbsoluteExpirationRelativeToNow.HasValue -> { entry with expiresAt = options.AbsoluteExpiration.Value.UtcTicks }
{ entry with ExpiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds } | _ when options.AbsoluteExpirationRelativeToNow.HasValue ->
| _ -> entry { entry with expiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds }
let entry = { Id = key | _ -> entry
Payload = UTF8Encoding.UTF8.GetString payload let entry =
ExpiresAt = Int64.MaxValue { id = key
SlidingExpiration = 0 } payload = UTF8Encoding.UTF8.GetString payload
|> addExpiration expiresAt = Int64.MaxValue
do! match box (getCacheEntry key) with slidingExpiration = 0
| null -> table.Insert(entry).RunResultAsync(opts.Connection) }
| _ -> table.Get(key).Replace(entry).RunResultAsync(opts.Connection) |> addExpiration
return () match! getCacheEntry key cnxToken with
} | None ->
do! rethink {
interface IDistributedCache with withDb db; withTable table
member this.Get key = getEntry key |> Async.RunSynchronously insert entry
member this.GetAsync key = getEntry key |> Async.StartAsTask write; withRetryDefault; ignoreResult opts.Connection
member this.Refresh key = refreshEntry key |> Async.RunSynchronously }
member this.RefreshAsync key = refreshEntry key |> Async.StartAsTask :> Task | Some _ ->
member this.Remove key = removeEntry key |> Async.RunSynchronously do! rethink {
member this.RemoveAsync key = removeEntry key |> Async.StartAsTask :> Task withDb db; withTable table
member this.Set (key, value, options) = setEntry key value options |> Async.RunSynchronously get key
member this.SetAsync (key, value, options) = setEntry key value options |> Async.StartAsTask :> Task replace entry
write; withRetryDefault; ignoreResult opts.Connection
}
}
let runSync (task : CancellationToken -> Task<'T>) =
task CancellationToken.None |> (Async.AwaitTask >> Async.RunSynchronously)
interface IDistributedCache with
member this.Get key = getEntry key |> runSync
member this.GetAsync (key, cnxToken) = getEntry key cnxToken
member this.Refresh key = refreshEntry key |> runSync
member this.RefreshAsync (key, cnxToken) = refreshEntry key cnxToken
member this.Remove key = removeEntry key |> runSync
member this.RemoveAsync (key, cnxToken) = removeEntry key cnxToken
member this.Set (key, value, options) = setEntry key value options |> runSync
member this.SetAsync (key, value, options, cnxToken) = setEntry key value options cnxToken

View File

@ -5,17 +5,19 @@ open RethinkDb.Driver.Net
/// Options to use to configure the RethinkDB cache /// Options to use to configure the RethinkDB cache
[<AllowNullLiteral>] [<AllowNullLiteral>]
type DistributedRethinkDBCacheOptions() = type DistributedRethinkDBCacheOptions() =
/// The RethinkDB connection to use for caching operations
member val Connection : IConnection = null with get, set /// The RethinkDB connection to use for caching operations
member val Connection : IConnection = null with get, set
/// The RethinkDB database to use (leave blank for connection default) /// The RethinkDB database to use (leave blank for connection default)
member val Database = "" with get, set member val Database = "" with get, set
/// The RethinkDB table name to use for cache entries (defaults to "Cache") /// The RethinkDB table name to use for cache entries (defaults to "Cache")
member val TableName = "" with get, set member val TableName = "" with get, set
/// Whether this configuration is valid /// Whether this configuration is valid
member this.IsValid () = member this.IsValid () =
seq { seq {
match this.Connection with null -> yield "Connection cannot be null" | _ -> () match this.Connection with null -> yield "Connection cannot be null" | _ -> ()
} }

View File

@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="*" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="RethinkDb.Driver.FSharp" Version="0.8.0-alpha-0002" />
</ItemGroup>
<ItemGroup>
<Compile Include="DistributedRethinkDBCacheOptions.fs" />
<Compile Include="DistributedRethinkDBCache.fs" />
<Compile Include="IServiceCollectionExtensions.fs" />
</ItemGroup>
</Project>

View File

@ -1,32 +0,0 @@
{
"buildOptions": {
"compile": {
"includeFiles": [
"DistributedRethinkDBCacheOptions.fs",
"DistributedRethinkDBCache.fs",
"IServiceCollectionExtensions.fs"
]
},
"compilerName": "fsc",
"debugType": "portable"
},
"dependencies": {
"Microsoft.Extensions.Caching.Abstractions": "1.0.0",
"Microsoft.Extensions.Logging": "1.0.0",
"Microsoft.Extensions.Options": "1.0.0",
"Newtonsoft.Json": "9.0.1",
"RethinkDb.Driver": "2.3.15"
},
"frameworks": {
"netstandard1.6": {
"dependencies": {
"Microsoft.FSharp.Core.netcore": "1.0.0-alpha-160831",
"NETStandard.Library": "1.6.0"
}
}
},
"tools": {
"dotnet-compile-fsc":"1.0.0-preview2-*"
},
"version": "0.9.0"
}