diff --git a/src/RethinkDB.DistributedCache/Cache.fs b/src/RethinkDB.DistributedCache/Cache.fs new file mode 100644 index 0000000..bcbd2ba --- /dev/null +++ b/src/RethinkDB.DistributedCache/Cache.fs @@ -0,0 +1,176 @@ +/// The implementation portion of this cache +module private RethinkDB.DistributedCache.Cache + +open System +open System.Threading +open Microsoft.Extensions.Logging +open RethinkDB.DistributedCache +open RethinkDb.Driver.FSharp + +/// The database name (blank uses connection default) +let db (cacheOpts : DistributedRethinkDBCacheOptions) = defaultArg (Option.ofObj cacheOpts.Database) "" + +/// The table name; default to "Cache" if not provided +let tbl (cacheOpts : DistributedRethinkDBCacheOptions) = + match defaultArg (Option.ofObj cacheOpts.TableName) "" with "" -> "Cache" | tbl -> tbl + +/// The name of the cache +let table cacheOpts = match db cacheOpts with "" -> tbl cacheOpts | d -> $"{d}.{tbl cacheOpts}" + +/// Debug message +let debug cacheOpts (log : ILogger) text = + if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table cacheOpts}] %s{text ()}" + +/// Convert seconds to .NET ticks +let secondsToTicks seconds = int64 (seconds * 10000000) + +/// Calculate ticks from now for the given number of seconds +let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks seconds) + + +/// Ensure that the necessary environment exists for this cache +module Environment = + + /// Make sure the RethinkDB database, table, expiration index exist + let check cacheOpts log (cancelToken : CancellationToken) = backgroundTask { + let debug = debug cacheOpts log + debug <| fun () -> "|> Checking for proper RethinkDB cache environment" + // Database + let db = db cacheOpts + match db with + | "" -> debug <| fun () -> " Skipping database check; using connection default" + | _ -> + debug <| fun () -> $" Checking for database {db} existence..." + let! dbs = rethink { dbList; result cancelToken; withRetryDefault cacheOpts.Connection } + if not (dbs |> List.contains db) then + debug <| fun () -> sprintf $" ...creating database {db}..." + do! rethink { dbCreate db; write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection } + debug <| fun () -> " ...done" + // Table + let tbl = tbl cacheOpts + let table = table cacheOpts + debug <| fun () -> sprintf $" Checking for table {tbl} existence..." + let! tables = rethink { tableList db; result cancelToken; withRetryDefault cacheOpts.Connection } + if not (tables |> List.contains tbl) then + debug <| fun () -> sprintf $" ...creating table {tbl}..." + do! rethink { tableCreate table; write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection } + debug <| fun () -> " ...done" + // Index + debug <| fun () -> sprintf $" Checking for index {tbl}.expiresAt..." + let! indexes = rethink { + withTable table + indexList + result cancelToken; withRetryDefault cacheOpts.Connection + } + if not (indexes |> List.contains expiresAt) then + debug <| fun () -> sprintf $" ...creating index {expiresAt} on table {tbl}..." + do! rethink { + withTable table + indexCreate expiresAt + write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection + } + debug <| fun () -> " ...done" + debug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..." + } + + +/// Cache entry manipulation functions +module Entry = + + open System.Text + open Microsoft.Extensions.Caching.Distributed + open RethinkDb.Driver.Ast + open RethinkDb.Driver.Model + + /// RethinkDB + let r = RethinkDb.Driver.RethinkDB.R + + /// Remove entries from the cache that are expired + let purge cacheOpts log lastCheck (cancelToken : CancellationToken) = backgroundTask { + let table = table cacheOpts + match DateTime.UtcNow - lastCheck > cacheOpts.DeleteExpiredInterval with + | true -> + let tix = ticksFromNow 0 + debug cacheOpts log <| fun () -> $"Purging expired entries (<= %i{tix})" + do! rethink { + withTable table + between (r.Minval ()) tix [ BetweenOptArg.Index expiresAt ] + delete + write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection + } + return DateTime.UtcNow + | false -> return lastCheck + } + + /// Get the cache entry specified, refreshing sliding expiration then checking for expiration + let get cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask { + let table = table cacheOpts + let now = ticksFromNow 0 + let filters : (ReqlExpr -> obj) list = [ + fun row -> row.G(expiresAt).Gt now + fun row -> row.G(slidingExp).Gt 0 + fun row -> row.G(absoluteExp).Gt(0).Or(row.G(absoluteExp).Ne(row.G expiresAt)) + ] + let expiration (row : ReqlExpr) : obj = + r.HashMap( + expiresAt, + r.Branch(row.G(expiresAt).Add(row.G(slidingExp)).Gt(row.G(absoluteExp)), row.G(absoluteExp), + row.G(slidingExp).Add(now))) + let! result = rethink { + withTable table + get key + filter filters + update expiration [ ReturnChanges All ] + write cancelToken; withRetryDefault cacheOpts.Connection + } + match result.Changes.Count with + | 0 -> return None + | _ -> + match result.ChangesAs().[0].NewValue with + | entry when entry.expiresAt > now -> return Some entry + | _ -> return None + } + + let remove cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask { + let table = table cacheOpts + do! rethink { + withTable table + get key + delete + write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection + } + } + + /// Set a cache entry + let set cacheOpts (entryOpts : DistributedCacheEntryOptions) key (payload : byte[]) + (cancelToken : CancellationToken) = + backgroundTask { + let table = table cacheOpts + let addExpiration entry = + match true with + | _ when entryOpts.SlidingExpiration.HasValue -> + let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value.Seconds + { entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks } + | _ when entryOpts.AbsoluteExpiration.HasValue -> + let exp = entryOpts.AbsoluteExpiration.Value.UtcTicks + { entry with expiresAt = exp; absoluteExp = exp } + | _ when entryOpts.AbsoluteExpirationRelativeToNow.HasValue -> + let exp = entryOpts.AbsoluteExpirationRelativeToNow.Value.Seconds + { entry with expiresAt = exp; absoluteExp = exp } + | _ -> + let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration.Seconds + { entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks } + let entry = + { id = key + payload = UTF8Encoding.UTF8.GetString payload + expiresAt = Int64.MinValue + slidingExp = 0L + absoluteExp = 0L + } + |> addExpiration + do! rethink { + withTable table + replace entry + write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection + } + } diff --git a/src/RethinkDB.DistributedCache/CacheEntry.fs b/src/RethinkDB.DistributedCache/CacheEntry.fs new file mode 100644 index 0000000..9bfc770 --- /dev/null +++ b/src/RethinkDB.DistributedCache/CacheEntry.fs @@ -0,0 +1,30 @@ +namespace RethinkDB.DistributedCache + +/// Persistence object for a cache entry +[] +type CacheEntry = + { /// The ID for the cache entry + id : string + + /// The payload for the cache entry (as a UTF-8 string) + payload : string + + /// The ticks at which this entry expires + expiresAt : int64 + + /// The number of ticks in the sliding expiration + slidingExp : int64 + + /// The ticks for absolute expiration + absoluteExp : int64 + } + +/// Field names for the above +[] +module private CacheEntry = + [] + let expiresAt = "expiresAt" + [] + let slidingExp = "slidingExp" + [] + let absoluteExp = "absoluteExp" diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs index b544536..8f84ca4 100644 --- a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs @@ -1,224 +1,84 @@ namespace RethinkDB.DistributedCache -open Microsoft.Extensions.Caching.Distributed -open Microsoft.Extensions.Logging -open Microsoft.Extensions.Options -open RethinkDb.Driver -open RethinkDb.Driver.FSharp open System open System.Text open System.Threading open System.Threading.Tasks - - -/// Persistence object for a cache entry -[] -type CacheEntry = - { /// The ID for the cache entry - id : string - - /// The payload for the cache entry (as a UTF-8 string) - payload : string - - /// The ticks at which this entry expires - expiresAt : int64 - - /// The number of seconds in the sliding expiration - slidingExpiration : int - } - +open Microsoft.Extensions.Caching.Distributed +open Microsoft.Extensions.Logging +open Microsoft.Extensions.Options /// IDistributedCache implementation utilizing RethinkDB [] type DistributedRethinkDBCache (options : IOptions, log : ILogger) = - /// RethinkDB - static let r = RethinkDB.R - /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist - static let mutable environmentChecked = false + let mutable environmentChecked = false + + /// The last time expired entries were deleted + let mutable lastExpiredCheck = DateTime.UtcNow - TimeSpan.FromDays 365.0 do - match options with - | null | _ when isNull options.Value -> nullArg "options" - | _ when isNull options.Value.Connection -> nullArg "Connection" - | _ -> () + if isNull options then nullArg "options" + if isNull options.Value then nullArg "options" + let validity = options.Value.IsValid () |> Seq.fold (fun it err -> $"{it}\n{err}") "" + if validity <> "" then invalidArg "options" $"Options are invalid:{validity}" /// Options let opts = options.Value - /// The database name (blank uses connection default) - let db = defaultArg (Option.ofObj opts.Database) "" - - /// The table name; default to "Cache" if not provided - let tbl = match defaultArg (Option.ofObj opts.TableName) "" with "" -> "Cache" | tbl -> tbl - - /// The name of the cache - let table = - seq { - match db with "" -> () | _ -> $"{db}." - tbl - } - |> Seq.reduce (+) - /// Debug message - let dbug text = - if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table}] %s{text ()}" + let debug = Cache.debug opts log /// Make sure the RethinkDB database, table, expiration index exist - let environmentCheck (_ : CancellationToken) = - backgroundTask { - dbug <| fun () -> "|> Checking for proper RethinkDB cache environment" - // Database - match db with - | "" -> dbug <| fun () -> " Skipping database check because it was not specified" - | _ -> - dbug <| fun () -> $" Checking for database {db} existence..." - let! dbs = rethink { dbList; result; withRetryDefault opts.Connection } - if not (dbs |> List.contains db) then - dbug <| fun () -> sprintf $" ...creating database {db}..." - do! rethink { dbCreate db; write; withRetryDefault; ignoreResult opts.Connection } - dbug <| fun () -> " ...done" - // Table - dbug <| fun () -> sprintf $" Checking for table {tbl} existence..." - let! tables = rethink { tableList db; result; withRetryDefault opts.Connection } - if not (tables |> List.contains tbl) then - dbug <| fun () -> sprintf $" ...creating table {tbl}..." - do! rethink { tableCreate table; write; withRetryDefault; ignoreResult opts.Connection } - dbug <| fun () -> " ...done" - // Index - dbug <| fun () -> sprintf $" Checking for index {tbl}.expiresAt..." - let! indexes = rethink { - withTable table - indexList - result; withRetryDefault opts.Connection - } - if not (indexes |> List.contains "expiresAt") then - dbug <| fun () -> sprintf $" ...creating index expiresAt on table {tbl}..." - do! rethink { - withTable table - indexCreate "expiresAt" - write; withRetryDefault; ignoreResult opts.Connection - } - dbug <| fun () -> " ...done" - dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..." + let checkEnvironment cancelToken = backgroundTask { + match environmentChecked with + | true -> debug <| fun () -> "Skipping environment check because it has already been performed" + | false -> + do! Cache.Environment.check opts log cancelToken environmentChecked <- true - } - - /// Make sure the RethinkDB database, table, expiration index exist - let checkEnvironment (cnxToken : CancellationToken) = - backgroundTask { - match environmentChecked with - | true -> dbug <| fun () -> "Skipping environment check because it has already been performed" - | false -> do! environmentCheck cnxToken - } + } /// Remove entries from the cache that are expired - let purgeExpired (_ : CancellationToken) = - backgroundTask { - let tix = DateTime.UtcNow.Ticks - 1L - dbug <| fun () -> $"Purging expired entries (<= %i{tix})" - do! rethink { - withTable table - between (r.Minval ()) tix [ BetweenOptArg.Index "expiresAt" ] - delete - write; withRetryDefault; ignoreResult opts.Connection - } - } - - /// Calculate ticks from now for the given number of seconds - let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000) - - /// Get the cache entry specified - let getCacheEntry (key : string) (_ : CancellationToken) = - rethink { - withTable table - get key - resultOption; withRetryDefault opts.Connection - } - - /// Refresh (update expiration based on sliding expiration) the cache entry specified - let refreshCacheEntry (entry : CacheEntry) (_ : CancellationToken) = - backgroundTask { - if entry.slidingExpiration > 0 then - do! rethink { - withTable table - get entry.id - update [ "expiresAt", ticksFromNow entry.slidingExpiration :> obj ] - write; withRetryDefault; ignoreResult opts.Connection - } - } - + let purgeExpired cancelToken = backgroundTask { + let! lastCheck = Cache.Entry.purge opts log lastExpiredCheck cancelToken + lastExpiredCheck <- lastCheck + } + /// Get the payload for the cache entry - let getEntry key (cnxToken : CancellationToken) = - backgroundTask { - cnxToken.ThrowIfCancellationRequested () - do! checkEnvironment cnxToken - do! purgeExpired cnxToken - match! getCacheEntry key cnxToken with - | None -> - dbug <| fun () -> $"Cache key {key} not found" - return null - | Some entry -> - dbug <| fun () -> $"Cache key {key} found" - do! refreshCacheEntry entry cnxToken - return UTF8Encoding.UTF8.GetBytes entry.payload - } + let getEntry key cancelToken = backgroundTask { + do! checkEnvironment cancelToken + let! result = Cache.Entry.get opts key cancelToken + do! purgeExpired cancelToken + match result with + | None -> + debug <| fun () -> $"Cache key {key} not found" + return null + | Some entry -> + debug <| fun () -> $"Cache key {key} found" + return UTF8Encoding.UTF8.GetBytes entry.payload + } /// Update the sliding expiration for a cache entry - let refreshEntry key (cnxToken : CancellationToken) = - backgroundTask { - cnxToken.ThrowIfCancellationRequested () - do! checkEnvironment cnxToken - match! getCacheEntry key cnxToken with None -> () | Some entry -> do! refreshCacheEntry entry cnxToken - do! purgeExpired cnxToken - return () - } + let refreshEntry key cancelToken = backgroundTask { + do! checkEnvironment cancelToken + let! _ = Cache.Entry.get opts key cancelToken + do! purgeExpired cancelToken + } /// Remove the specified cache entry - let removeEntry (key : string) (cnxToken : CancellationToken) = - backgroundTask { - cnxToken.ThrowIfCancellationRequested () - do! checkEnvironment cnxToken - do! rethink { - withTable table - get key - delete - write; withRetryDefault; ignoreResult opts.Connection - } - do! purgeExpired cnxToken - } + let removeEntry key cancelToken = backgroundTask { + do! checkEnvironment cancelToken + do! Cache.Entry.remove opts key cancelToken + do! purgeExpired cancelToken + } /// Set the value of a cache entry - let setEntry key (payload : byte[]) (options : DistributedCacheEntryOptions) (cnxToken : CancellationToken) = - backgroundTask { - cnxToken.ThrowIfCancellationRequested () - do! checkEnvironment cnxToken - do! purgeExpired cnxToken - let addExpiration entry = - match true with - | _ when options.SlidingExpiration.HasValue -> - { entry with expiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds - slidingExpiration = options.SlidingExpiration.Value.Seconds } - | _ when options.AbsoluteExpiration.HasValue -> - { entry with expiresAt = options.AbsoluteExpiration.Value.UtcTicks } - | _ when options.AbsoluteExpirationRelativeToNow.HasValue -> - { entry with expiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds } - | _ -> entry - let entry = - { id = key - payload = UTF8Encoding.UTF8.GetString payload - expiresAt = Int64.MaxValue - slidingExpiration = 0 - } - |> addExpiration - do! rethink { - withTable table - replace entry - write; withRetryDefault; ignoreResult opts.Connection - } - } + let setEntry key payload options cancelToken = backgroundTask { + do! Cache.Entry.set opts options key payload cancelToken + do! purgeExpired cancelToken + } /// Execute a task synchronously let runSync (task : CancellationToken -> Task<'T>) = @@ -226,10 +86,10 @@ type DistributedRethinkDBCache (options : IOptions runSync - member this.GetAsync (key, cnxToken) = getEntry key cnxToken + member this.GetAsync (key, cancelToken) = getEntry key cancelToken member this.Refresh key = refreshEntry key |> runSync - member this.RefreshAsync (key, cnxToken) = refreshEntry key cnxToken + member this.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken member this.Remove key = removeEntry key |> runSync - member this.RemoveAsync (key, cnxToken) = removeEntry key cnxToken + member this.RemoveAsync (key, cancelToken) = removeEntry key cancelToken member this.Set (key, value, options) = setEntry key value options |> runSync - member this.SetAsync (key, value, options, cnxToken) = setEntry key value options cnxToken + member this.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs index 81b641b..ef2e4e6 100644 --- a/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs @@ -1,23 +1,36 @@ namespace RethinkDB.DistributedCache +open System +open Microsoft.Extensions.Options open RethinkDb.Driver.Net /// Options to use to configure the RethinkDB cache [] -type DistributedRethinkDBCacheOptions() = +type DistributedRethinkDBCacheOptions () = /// The RethinkDB connection to use for caching operations member val Connection : IConnection = null with get, set - /// The RethinkDB database to use (leave blank for connection default) + /// The RethinkDB database to use; leave blank for connection default member val Database = "" with get, set - /// The RethinkDB table name to use for cache entries (defaults to "Cache") + /// The RethinkDB table name to use for cache entries; defaults to "Cache" member val TableName = "" with get, set + + /// How frequently we will delete expired cache items; default is 30 minutes + member val DeleteExpiredInterval = TimeSpan.FromMinutes 30.0 with get, set + + /// The default sliding expiration for items, if none is provided; default is 20 minutes + member val DefaultSlidingExpiration = TimeSpan.FromMinutes 20.0 with get, set /// Whether this configuration is valid member this.IsValid () = seq { - match this.Connection with null -> yield "Connection cannot be null" | _ -> () + if isNull this.Connection then "Connection cannot be null" + if this.DeleteExpiredInterval <= TimeSpan.Zero then "DeleteExpiredInterval must be positive" + if this.DefaultSlidingExpiration <= TimeSpan.Zero then "DefaultSlidingExpiration must be positive" } + + interface IOptions with + member this.Value = this \ No newline at end of file diff --git a/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs b/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs index 46d8ede..4afab36 100644 --- a/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs +++ b/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs @@ -13,7 +13,7 @@ type IServiceCollection with if isNull options then nullArg "options" this.AddOptions () |> ignore this.Configure options |> ignore - this.Add (ServiceDescriptor.Transient ()) + this.Add (ServiceDescriptor.Singleton ()) this /// diff --git a/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj index 15fed42..5225bdd 100644 --- a/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj +++ b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj @@ -4,7 +4,6 @@ net6.0;netstandard2.0 true 0.9.0 - alpha02 danieljsummers https://github.com/danieljsummers/RethinkDB.DistributedCache false @@ -14,18 +13,22 @@ MIT RethinkDB IDistributedCache ASP.NET Core An IDistributedCache implementation utilizing RethinkDB for storage - Updated to .NET 6 + alpha04 + Work toward starting a new session when encountering an expired one - + + + +