WIP on refreshing expired sessions

- Move RethinkDB operations to their own module
- Incorporated cancellation tokens throughout
This commit is contained in:
Daniel J. Summers 2022-04-21 21:38:51 -04:00
parent 9a6b04412a
commit c79306078c
6 changed files with 283 additions and 201 deletions

View File

@ -0,0 +1,176 @@
/// The implementation portion of this cache
module private RethinkDB.DistributedCache.Cache
open System
open System.Threading
open Microsoft.Extensions.Logging
open RethinkDB.DistributedCache
open RethinkDb.Driver.FSharp
/// The database name (blank uses connection default)
let db (cacheOpts : DistributedRethinkDBCacheOptions) = defaultArg (Option.ofObj cacheOpts.Database) ""
/// The table name; default to "Cache" if not provided
let tbl (cacheOpts : DistributedRethinkDBCacheOptions) =
match defaultArg (Option.ofObj cacheOpts.TableName) "" with "" -> "Cache" | tbl -> tbl
/// The name of the cache
let table cacheOpts = match db cacheOpts with "" -> tbl cacheOpts | d -> $"{d}.{tbl cacheOpts}"
/// Debug message
let debug cacheOpts (log : ILogger) text =
if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table cacheOpts}] %s{text ()}"
/// Convert seconds to .NET ticks
let secondsToTicks seconds = int64 (seconds * 10000000)
/// Calculate ticks from now for the given number of seconds
let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks seconds)
/// Ensure that the necessary environment exists for this cache
module Environment =
/// Make sure the RethinkDB database, table, expiration index exist
let check cacheOpts log (cancelToken : CancellationToken) = backgroundTask {
let debug = debug cacheOpts log
debug <| fun () -> "|> Checking for proper RethinkDB cache environment"
// Database
let db = db cacheOpts
match db with
| "" -> debug <| fun () -> " Skipping database check; using connection default"
| _ ->
debug <| fun () -> $" Checking for database {db} existence..."
let! dbs = rethink<string list> { dbList; result cancelToken; withRetryDefault cacheOpts.Connection }
if not (dbs |> List.contains db) then
debug <| fun () -> sprintf $" ...creating database {db}..."
do! rethink { dbCreate db; write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection }
debug <| fun () -> " ...done"
// Table
let tbl = tbl cacheOpts
let table = table cacheOpts
debug <| fun () -> sprintf $" Checking for table {tbl} existence..."
let! tables = rethink<string list> { tableList db; result cancelToken; withRetryDefault cacheOpts.Connection }
if not (tables |> List.contains tbl) then
debug <| fun () -> sprintf $" ...creating table {tbl}..."
do! rethink { tableCreate table; write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection }
debug <| fun () -> " ...done"
// Index
debug <| fun () -> sprintf $" Checking for index {tbl}.expiresAt..."
let! indexes = rethink<string list> {
withTable table
indexList
result cancelToken; withRetryDefault cacheOpts.Connection
}
if not (indexes |> List.contains expiresAt) then
debug <| fun () -> sprintf $" ...creating index {expiresAt} on table {tbl}..."
do! rethink {
withTable table
indexCreate expiresAt
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
}
debug <| fun () -> " ...done"
debug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..."
}
/// Cache entry manipulation functions
module Entry =
open System.Text
open Microsoft.Extensions.Caching.Distributed
open RethinkDb.Driver.Ast
open RethinkDb.Driver.Model
/// RethinkDB
let r = RethinkDb.Driver.RethinkDB.R
/// Remove entries from the cache that are expired
let purge cacheOpts log lastCheck (cancelToken : CancellationToken) = backgroundTask {
let table = table cacheOpts
match DateTime.UtcNow - lastCheck > cacheOpts.DeleteExpiredInterval with
| true ->
let tix = ticksFromNow 0
debug cacheOpts log <| fun () -> $"Purging expired entries (<= %i{tix})"
do! rethink {
withTable table
between (r.Minval ()) tix [ BetweenOptArg.Index expiresAt ]
delete
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
}
return DateTime.UtcNow
| false -> return lastCheck
}
/// Get the cache entry specified, refreshing sliding expiration then checking for expiration
let get cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask {
let table = table cacheOpts
let now = ticksFromNow 0
let filters : (ReqlExpr -> obj) list = [
fun row -> row.G(expiresAt).Gt now
fun row -> row.G(slidingExp).Gt 0
fun row -> row.G(absoluteExp).Gt(0).Or(row.G(absoluteExp).Ne(row.G expiresAt))
]
let expiration (row : ReqlExpr) : obj =
r.HashMap(
expiresAt,
r.Branch(row.G(expiresAt).Add(row.G(slidingExp)).Gt(row.G(absoluteExp)), row.G(absoluteExp),
row.G(slidingExp).Add(now)))
let! result = rethink<Result> {
withTable table
get key
filter filters
update expiration [ ReturnChanges All ]
write cancelToken; withRetryDefault cacheOpts.Connection
}
match result.Changes.Count with
| 0 -> return None
| _ ->
match result.ChangesAs<CacheEntry>().[0].NewValue with
| entry when entry.expiresAt > now -> return Some entry
| _ -> return None
}
let remove cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask {
let table = table cacheOpts
do! rethink {
withTable table
get key
delete
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
}
}
/// Set a cache entry
let set cacheOpts (entryOpts : DistributedCacheEntryOptions) key (payload : byte[])
(cancelToken : CancellationToken) =
backgroundTask {
let table = table cacheOpts
let addExpiration entry =
match true with
| _ when entryOpts.SlidingExpiration.HasValue ->
let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value.Seconds
{ entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks }
| _ when entryOpts.AbsoluteExpiration.HasValue ->
let exp = entryOpts.AbsoluteExpiration.Value.UtcTicks
{ entry with expiresAt = exp; absoluteExp = exp }
| _ when entryOpts.AbsoluteExpirationRelativeToNow.HasValue ->
let exp = entryOpts.AbsoluteExpirationRelativeToNow.Value.Seconds
{ entry with expiresAt = exp; absoluteExp = exp }
| _ ->
let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration.Seconds
{ entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks }
let entry =
{ id = key
payload = UTF8Encoding.UTF8.GetString payload
expiresAt = Int64.MinValue
slidingExp = 0L
absoluteExp = 0L
}
|> addExpiration
do! rethink {
withTable table
replace entry
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
}
}

View File

@ -0,0 +1,30 @@
namespace RethinkDB.DistributedCache
/// Persistence object for a cache entry
[<CLIMutable; NoComparison; NoEquality>]
type CacheEntry =
{ /// The ID for the cache entry
id : string
/// The payload for the cache entry (as a UTF-8 string)
payload : string
/// The ticks at which this entry expires
expiresAt : int64
/// The number of ticks in the sliding expiration
slidingExp : int64
/// The ticks for absolute expiration
absoluteExp : int64
}
/// Field names for the above
[<AutoOpen>]
module private CacheEntry =
[<Literal>]
let expiresAt = "expiresAt"
[<Literal>]
let slidingExp = "slidingExp"
[<Literal>]
let absoluteExp = "absoluteExp"

View File

@ -1,223 +1,83 @@
namespace RethinkDB.DistributedCache namespace RethinkDB.DistributedCache
open Microsoft.Extensions.Caching.Distributed
open Microsoft.Extensions.Logging
open Microsoft.Extensions.Options
open RethinkDb.Driver
open RethinkDb.Driver.FSharp
open System open System
open System.Text open System.Text
open System.Threading open System.Threading
open System.Threading.Tasks open System.Threading.Tasks
open Microsoft.Extensions.Caching.Distributed
open Microsoft.Extensions.Logging
/// Persistence object for a cache entry open Microsoft.Extensions.Options
[<CLIMutable; NoComparison; NoEquality>]
type CacheEntry =
{ /// The ID for the cache entry
id : string
/// The payload for the cache entry (as a UTF-8 string)
payload : string
/// The ticks at which this entry expires
expiresAt : int64
/// The number of seconds in the sliding expiration
slidingExpiration : int
}
/// IDistributedCache implementation utilizing RethinkDB /// IDistributedCache implementation utilizing RethinkDB
[<AllowNullLiteral>] [<AllowNullLiteral>]
type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOptions>, type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOptions>,
log : ILogger<DistributedRethinkDBCache>) = log : ILogger<DistributedRethinkDBCache>) =
/// RethinkDB
static let r = RethinkDB.R
/// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist
static let mutable environmentChecked = false let mutable environmentChecked = false
/// The last time expired entries were deleted
let mutable lastExpiredCheck = DateTime.UtcNow - TimeSpan.FromDays 365.0
do do
match options with if isNull options then nullArg "options"
| null | _ when isNull options.Value -> nullArg "options" if isNull options.Value then nullArg "options"
| _ when isNull options.Value.Connection -> nullArg "Connection" let validity = options.Value.IsValid () |> Seq.fold (fun it err -> $"{it}\n{err}") ""
| _ -> () if validity <> "" then invalidArg "options" $"Options are invalid:{validity}"
/// Options /// Options
let opts = options.Value let opts = options.Value
/// The database name (blank uses connection default)
let db = defaultArg (Option.ofObj opts.Database) ""
/// The table name; default to "Cache" if not provided
let tbl = match defaultArg (Option.ofObj opts.TableName) "" with "" -> "Cache" | tbl -> tbl
/// The name of the cache
let table =
seq {
match db with "" -> () | _ -> $"{db}."
tbl
}
|> Seq.reduce (+)
/// Debug message /// Debug message
let dbug text = let debug = Cache.debug opts log
if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table}] %s{text ()}"
/// Make sure the RethinkDB database, table, expiration index exist /// Make sure the RethinkDB database, table, expiration index exist
let environmentCheck (_ : CancellationToken) = let checkEnvironment cancelToken = backgroundTask {
backgroundTask { match environmentChecked with
dbug <| fun () -> "|> Checking for proper RethinkDB cache environment" | true -> debug <| fun () -> "Skipping environment check because it has already been performed"
// Database | false ->
match db with do! Cache.Environment.check opts log cancelToken
| "" -> dbug <| fun () -> " Skipping database check because it was not specified"
| _ ->
dbug <| fun () -> $" Checking for database {db} existence..."
let! dbs = rethink<string list> { dbList; result; withRetryDefault opts.Connection }
if not (dbs |> List.contains db) then
dbug <| fun () -> sprintf $" ...creating database {db}..."
do! rethink { dbCreate db; write; withRetryDefault; ignoreResult opts.Connection }
dbug <| fun () -> " ...done"
// Table
dbug <| fun () -> sprintf $" Checking for table {tbl} existence..."
let! tables = rethink<string list> { tableList db; result; withRetryDefault opts.Connection }
if not (tables |> List.contains tbl) then
dbug <| fun () -> sprintf $" ...creating table {tbl}..."
do! rethink { tableCreate table; write; withRetryDefault; ignoreResult opts.Connection }
dbug <| fun () -> " ...done"
// Index
dbug <| fun () -> sprintf $" Checking for index {tbl}.expiresAt..."
let! indexes = rethink<string list> {
withTable table
indexList
result; withRetryDefault opts.Connection
}
if not (indexes |> List.contains "expiresAt") then
dbug <| fun () -> sprintf $" ...creating index expiresAt on table {tbl}..."
do! rethink {
withTable table
indexCreate "expiresAt"
write; withRetryDefault; ignoreResult opts.Connection
}
dbug <| fun () -> " ...done"
dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..."
environmentChecked <- true environmentChecked <- true
} }
/// Make sure the RethinkDB database, table, expiration index exist
let checkEnvironment (cnxToken : CancellationToken) =
backgroundTask {
match environmentChecked with
| true -> dbug <| fun () -> "Skipping environment check because it has already been performed"
| false -> do! environmentCheck cnxToken
}
/// Remove entries from the cache that are expired /// Remove entries from the cache that are expired
let purgeExpired (_ : CancellationToken) = let purgeExpired cancelToken = backgroundTask {
backgroundTask { let! lastCheck = Cache.Entry.purge opts log lastExpiredCheck cancelToken
let tix = DateTime.UtcNow.Ticks - 1L lastExpiredCheck <- lastCheck
dbug <| fun () -> $"Purging expired entries (<= %i{tix})"
do! rethink {
withTable table
between (r.Minval ()) tix [ BetweenOptArg.Index "expiresAt" ]
delete
write; withRetryDefault; ignoreResult opts.Connection
}
}
/// Calculate ticks from now for the given number of seconds
let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000)
/// Get the cache entry specified
let getCacheEntry (key : string) (_ : CancellationToken) =
rethink<CacheEntry> {
withTable table
get key
resultOption; withRetryDefault opts.Connection
}
/// Refresh (update expiration based on sliding expiration) the cache entry specified
let refreshCacheEntry (entry : CacheEntry) (_ : CancellationToken) =
backgroundTask {
if entry.slidingExpiration > 0 then
do! rethink {
withTable table
get entry.id
update [ "expiresAt", ticksFromNow entry.slidingExpiration :> obj ]
write; withRetryDefault; ignoreResult opts.Connection
}
} }
/// Get the payload for the cache entry /// Get the payload for the cache entry
let getEntry key (cnxToken : CancellationToken) = let getEntry key cancelToken = backgroundTask {
backgroundTask { do! checkEnvironment cancelToken
cnxToken.ThrowIfCancellationRequested () let! result = Cache.Entry.get opts key cancelToken
do! checkEnvironment cnxToken do! purgeExpired cancelToken
do! purgeExpired cnxToken match result with
match! getCacheEntry key cnxToken with
| None -> | None ->
dbug <| fun () -> $"Cache key {key} not found" debug <| fun () -> $"Cache key {key} not found"
return null return null
| Some entry -> | Some entry ->
dbug <| fun () -> $"Cache key {key} found" debug <| fun () -> $"Cache key {key} found"
do! refreshCacheEntry entry cnxToken
return UTF8Encoding.UTF8.GetBytes entry.payload return UTF8Encoding.UTF8.GetBytes entry.payload
} }
/// Update the sliding expiration for a cache entry /// Update the sliding expiration for a cache entry
let refreshEntry key (cnxToken : CancellationToken) = let refreshEntry key cancelToken = backgroundTask {
backgroundTask { do! checkEnvironment cancelToken
cnxToken.ThrowIfCancellationRequested () let! _ = Cache.Entry.get opts key cancelToken
do! checkEnvironment cnxToken do! purgeExpired cancelToken
match! getCacheEntry key cnxToken with None -> () | Some entry -> do! refreshCacheEntry entry cnxToken
do! purgeExpired cnxToken
return ()
} }
/// Remove the specified cache entry /// Remove the specified cache entry
let removeEntry (key : string) (cnxToken : CancellationToken) = let removeEntry key cancelToken = backgroundTask {
backgroundTask { do! checkEnvironment cancelToken
cnxToken.ThrowIfCancellationRequested () do! Cache.Entry.remove opts key cancelToken
do! checkEnvironment cnxToken do! purgeExpired cancelToken
do! rethink {
withTable table
get key
delete
write; withRetryDefault; ignoreResult opts.Connection
}
do! purgeExpired cnxToken
} }
/// Set the value of a cache entry /// Set the value of a cache entry
let setEntry key (payload : byte[]) (options : DistributedCacheEntryOptions) (cnxToken : CancellationToken) = let setEntry key payload options cancelToken = backgroundTask {
backgroundTask { do! Cache.Entry.set opts options key payload cancelToken
cnxToken.ThrowIfCancellationRequested () do! purgeExpired cancelToken
do! checkEnvironment cnxToken
do! purgeExpired cnxToken
let addExpiration entry =
match true with
| _ when options.SlidingExpiration.HasValue ->
{ entry with expiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds
slidingExpiration = options.SlidingExpiration.Value.Seconds }
| _ when options.AbsoluteExpiration.HasValue ->
{ entry with expiresAt = options.AbsoluteExpiration.Value.UtcTicks }
| _ when options.AbsoluteExpirationRelativeToNow.HasValue ->
{ entry with expiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds }
| _ -> entry
let entry =
{ id = key
payload = UTF8Encoding.UTF8.GetString payload
expiresAt = Int64.MaxValue
slidingExpiration = 0
}
|> addExpiration
do! rethink {
withTable table
replace entry
write; withRetryDefault; ignoreResult opts.Connection
}
} }
/// Execute a task synchronously /// Execute a task synchronously
@ -226,10 +86,10 @@ type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOpti
interface IDistributedCache with interface IDistributedCache with
member this.Get key = getEntry key |> runSync member this.Get key = getEntry key |> runSync
member this.GetAsync (key, cnxToken) = getEntry key cnxToken member this.GetAsync (key, cancelToken) = getEntry key cancelToken
member this.Refresh key = refreshEntry key |> runSync member this.Refresh key = refreshEntry key |> runSync
member this.RefreshAsync (key, cnxToken) = refreshEntry key cnxToken member this.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken
member this.Remove key = removeEntry key |> runSync member this.Remove key = removeEntry key |> runSync
member this.RemoveAsync (key, cnxToken) = removeEntry key cnxToken member this.RemoveAsync (key, cancelToken) = removeEntry key cancelToken
member this.Set (key, value, options) = setEntry key value options |> runSync member this.Set (key, value, options) = setEntry key value options |> runSync
member this.SetAsync (key, value, options, cnxToken) = setEntry key value options cnxToken member this.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken

View File

@ -1,5 +1,7 @@
namespace RethinkDB.DistributedCache namespace RethinkDB.DistributedCache
open System
open Microsoft.Extensions.Options
open RethinkDb.Driver.Net open RethinkDb.Driver.Net
/// Options to use to configure the RethinkDB cache /// Options to use to configure the RethinkDB cache
@ -9,15 +11,26 @@ type DistributedRethinkDBCacheOptions() =
/// The RethinkDB connection to use for caching operations /// The RethinkDB connection to use for caching operations
member val Connection : IConnection = null with get, set member val Connection : IConnection = null with get, set
/// The RethinkDB database to use (leave blank for connection default) /// The RethinkDB database to use; leave blank for connection default
member val Database = "" with get, set member val Database = "" with get, set
/// The RethinkDB table name to use for cache entries (defaults to "Cache") /// The RethinkDB table name to use for cache entries; defaults to "Cache"
member val TableName = "" with get, set member val TableName = "" with get, set
/// How frequently we will delete expired cache items; default is 30 minutes
member val DeleteExpiredInterval = TimeSpan.FromMinutes 30.0 with get, set
/// The default sliding expiration for items, if none is provided; default is 20 minutes
member val DefaultSlidingExpiration = TimeSpan.FromMinutes 20.0 with get, set
/// Whether this configuration is valid /// Whether this configuration is valid
member this.IsValid () = member this.IsValid () =
seq { seq {
match this.Connection with null -> yield "Connection cannot be null" | _ -> () if isNull this.Connection then "Connection cannot be null"
if this.DeleteExpiredInterval <= TimeSpan.Zero then "DeleteExpiredInterval must be positive"
if this.DefaultSlidingExpiration <= TimeSpan.Zero then "DefaultSlidingExpiration must be positive"
} }
interface IOptions<DistributedRethinkDBCacheOptions> with
member this.Value = this

View File

@ -13,7 +13,7 @@ type IServiceCollection with
if isNull options then nullArg "options" if isNull options then nullArg "options"
this.AddOptions () |> ignore this.AddOptions () |> ignore
this.Configure options |> ignore this.Configure options |> ignore
this.Add (ServiceDescriptor.Transient<IDistributedCache, DistributedRethinkDBCache> ()) this.Add (ServiceDescriptor.Singleton<IDistributedCache, DistributedRethinkDBCache> ())
this this
/// <summary> /// <summary>

View File

@ -4,7 +4,6 @@
<TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks> <TargetFrameworks>net6.0;netstandard2.0</TargetFrameworks>
<GenerateDocumentationFile>true</GenerateDocumentationFile> <GenerateDocumentationFile>true</GenerateDocumentationFile>
<VersionPrefix>0.9.0</VersionPrefix> <VersionPrefix>0.9.0</VersionPrefix>
<VersionSuffix>alpha02</VersionSuffix>
<Authors>danieljsummers</Authors> <Authors>danieljsummers</Authors>
<PackageProjectUrl>https://github.com/danieljsummers/RethinkDB.DistributedCache</PackageProjectUrl> <PackageProjectUrl>https://github.com/danieljsummers/RethinkDB.DistributedCache</PackageProjectUrl>
<PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance> <PackageRequireLicenseAcceptance>false</PackageRequireLicenseAcceptance>
@ -14,18 +13,22 @@
<PackageLicenseExpression>MIT</PackageLicenseExpression> <PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageTags>RethinkDB IDistributedCache ASP.NET Core</PackageTags> <PackageTags>RethinkDB IDistributedCache ASP.NET Core</PackageTags>
<Description>An IDistributedCache implementation utilizing RethinkDB for storage</Description> <Description>An IDistributedCache implementation utilizing RethinkDB for storage</Description>
<PackageReleaseNotes>Updated to .NET 6</PackageReleaseNotes> <VersionSuffix>alpha04</VersionSuffix>
<PackageReleaseNotes>Work toward starting a new session when encountering an expired one</PackageReleaseNotes>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="6.0.0" /> <PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="*" /> <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="*" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="RethinkDb.Driver.FSharp" Version="0.8.0-alpha-0003" /> <PackageReference Include="RethinkDb.Driver.FSharp" Version="0.8.0-alpha-0007" />
<PackageReference Update="FSharp.Core" Version="6.0.3" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Compile Include="DistributedRethinkDBCacheOptions.fs" /> <Compile Include="DistributedRethinkDBCacheOptions.fs" />
<Compile Include="CacheEntry.fs" />
<Compile Include="Cache.fs" />
<Compile Include="DistributedRethinkDBCache.fs" /> <Compile Include="DistributedRethinkDBCache.fs" />
<Compile Include="IServiceCollectionExtensions.fs" /> <Compile Include="IServiceCollectionExtensions.fs" />
</ItemGroup> </ItemGroup>