diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs index 9fd047e..f1b37ae 100644 --- a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs @@ -3,212 +3,235 @@ namespace RethinkDB.DistributedCache open Microsoft.Extensions.Caching.Distributed open Microsoft.Extensions.Logging open Microsoft.Extensions.Options -open Newtonsoft.Json open RethinkDb.Driver -open RethinkDb.Driver.Net +open RethinkDb.Driver.FSharp open System open System.Text +open System.Threading open System.Threading.Tasks -// H/T: Suave -[] -module AsyncExtensions = - type Microsoft.FSharp.Control.AsyncBuilder with - /// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on - /// a standard .NET task - member x.Bind(t : Task<'T>, f:'T -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f) - /// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on - /// a standard .NET task which does not commpute a value - member x.Bind(t : Task, f : unit -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f) /// Persistence object for a cache entry -type CacheEntry = { - /// The Id for the cache entry - [] - Id : string - /// The payload for the cache entry (as a UTF-8 string) - Payload : string - /// The ticks at which this entry expires - ExpiresAt : int64 - /// The number of seconds in the sliding expiration - SlidingExpiration : int -} +[] +type CacheEntry = + { /// The ID for the cache entry + id : string + + /// The payload for the cache entry (as a UTF-8 string) + payload : string + + /// The ticks at which this entry expires + expiresAt : int64 + + /// The number of seconds in the sliding expiration + slidingExpiration : int + } -/// Record to update sliding expiration for an entry -type SlidingExpirationUpdate = { ExpiresAt : int64 } /// IDistributedCache implementation utilizing RethinkDB [] -type DistributedRethinkDBCache(options : IOptions, - log : ILogger) = +type DistributedRethinkDBCache (options : IOptions, + log : ILogger) = - /// RethinkDB - static let r = RethinkDB.R + /// RethinkDB + static let r = RethinkDB.R - /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist - static let mutable environmentChecked = false + /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist + static let mutable environmentChecked = false - do - match options with - | null | _ when isNull options.Value -> nullArg "options" - | _ when isNull options.Value.Connection -> nullArg "Connection" - | _ -> () + do + match options with + | null | _ when isNull options.Value -> nullArg "options" + | _ when isNull options.Value.Connection -> nullArg "Connection" + | _ -> () - /// Options - let opts = options.Value + /// Options + let opts = options.Value - /// Shorthand to get the database - let database = match String.IsNullOrEmpty opts.Database with true -> r.Db() | db -> r.Db(db) + /// The database name (blank uses connection default) + let db = defaultArg (Option.ofObj opts.Database) "" + + /// The table name; default to "Cache" if not provided + let table = match defaultArg (Option.ofObj opts.TableName) "" with "" -> "Cache" | tbl -> tbl + + /// The name of the cache + let cacheName = + seq { + match db with "" -> () | _ -> $"{db}." + table + } + |> Seq.reduce (+) + + /// Debug message + let dbug text = + if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{cacheName}] %s{text ()}" + + /// Make sure the RethinkDB database, table, expiration index exist + let checkEnvironment (_ : CancellationToken) = + backgroundTask { + if environmentChecked then + dbug <| fun () -> "Skipping environment check because it has already been performed" + return () + dbug <| fun () -> "|> Checking for proper RethinkDB cache environment" + // Database + match db with + | "" -> dbug <| fun () -> " Skipping database check because it was not specified" + | _ -> + dbug <| fun () -> $" Checking for database {db} existence..." + let! dbs = rethink { dbList; result; withRetryDefault opts.Connection } + if not (dbs |> List.contains db) then + dbug <| fun () -> sprintf $" ...creating database {db}..." + do! rethink { dbCreate db; write; withRetryDefault; ignoreResult opts.Connection } + dbug <| fun () -> " ...done" + // Table + dbug <| fun () -> sprintf $" Checking for table {table} existence..." + let! tables = rethink { tableList db; result; withRetryDefault opts.Connection } + if not (tables |> List.contains table) then + dbug <| fun () -> sprintf $" ...creating table {table}..." + do! rethink { withDb db; tableCreate table; write; withRetryDefault; ignoreResult opts.Connection } + dbug <| fun () -> " ...done" + // Index + dbug <| fun () -> sprintf $" Checking for index {table}.expiresAt..." + let! indexes = rethink { + withDb db; withTable table + indexList + result; withRetryDefault opts.Connection + } + if not (indexes |> List.contains "expiresAt") then + dbug <| fun () -> sprintf $" ...creating index expiresAt on table {table}..." + do! rethink { + withDb db; withTable table + indexCreate "expiresAt" + write; withRetryDefault; ignoreResult opts.Connection + } + dbug <| fun () -> " ...done" + dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..." + environmentChecked <- true + } + + /// Remove entries from the cache that are expired + let purgeExpired (_ : CancellationToken) = + backgroundTask { + let tix = DateTime.UtcNow.Ticks - 1L + dbug <| fun () -> $"Purging expired entries (<= %i{tix})" + do! rethink { + withDb db; withTable table + between (r.Minval ()) tix [ BetweenOptArg.Index "expiresAt" ] + delete + write; withRetryDefault; ignoreResult opts.Connection + } + } - /// Default the table name to "Cache" if it is not provided - let tableName = match String.IsNullOrEmpty opts.Database with true -> "Cache" | _ -> opts.TableName + /// Calculate ticks from now for the given number of seconds + let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000) - /// Shorthand to get the table - let table = database.Table tableName + /// Get the cache entry specified + let getCacheEntry (key : string) (_ : CancellationToken) = + rethink { + withDb db; withTable table + get key + resultOption; withRetryDefault opts.Connection + } - /// The name of the cache - let cacheName = - seq { - match String.IsNullOrEmpty opts.Database with true -> () | _ -> yield opts.Database; yield "." - yield tableName - } - |> Seq.reduce (+) + /// Refresh (update expiration based on sliding expiration) the cache entry specified + let refreshCacheEntry (entry : CacheEntry) (_ : CancellationToken) = + backgroundTask { + if entry.slidingExpiration > 0 then + do! rethink { + withDb db; withTable table + get entry.id + update [ "expiresAt", ticksFromNow entry.slidingExpiration :> obj ] + write; withRetryDefault; ignoreResult opts.Connection + } + } - /// Debug message - let dbug text = - match log.IsEnabled LogLevel.Debug with - | true -> text () |> sprintf "[%s] %s" cacheName |> log.LogDebug - | _ -> () - - /// Make sure the RethinkDB database, table, expiration index exist - let checkEnvironment () = - async { - match environmentChecked with - | true -> dbug <| fun () -> "Skipping environment check because it has already been performed" - | _ -> - dbug <| fun () -> "|> Checking for proper RethinkDB cache environment" - // Database - match opts.Database with - | "" -> dbug <| fun () -> " Skipping database check because it was not specified" - | db -> dbug <| fun () -> sprintf " Checking for database %s existence..." db - let! dbs = r.DbList().RunResultAsync(opts.Connection) - match dbs |> List.contains db with - | true -> () - | _ -> dbug <| fun () -> sprintf " ...creating database %s..." db - do! r.DbCreate(db).RunResultAsync(opts.Connection) - dbug <| fun () -> " ...done" - // Table - dbug <| fun () -> sprintf " Checking for table %s existence..." tableName - let! tables = database.TableList().RunResultAsync(opts.Connection) - match tables |> List.contains tableName with - | true -> () - | _ -> dbug <| fun () -> sprintf " ...creating table %s..." tableName - do! database.TableCreate(tableName).RunResultAsync(opts.Connection) - dbug <| fun () -> " ...done" - // Index - dbug <| fun () -> sprintf " Checking for index %s.ExpiresAt..." tableName - let! indexes = table.IndexList().RunResultAsync(opts.Connection) - match indexes |> List.contains "ExpiresAt" with - | true -> () - | _ -> dbug <| fun () -> sprintf " ...creating index ExpiresAt on table %s..." tableName - do! table.IndexCreate("ExpiresAt").RunResultAsync(opts.Connection) - dbug <| fun () -> " ...done" - dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..." - environmentChecked <- true - } - - /// Remove entries from the cache that are expired - let purgeExpired () = - async { - let tix = DateTime.UtcNow.Ticks - 1L - dbug <| fun () -> sprintf "Purging expired entries (<= %i)" tix - do! table.Between(r.Minval, tix).OptArg("index", "ExpiresAt").Delete().RunResultAsync(opts.Connection) - } - - /// Calculate ticks from now for the given number of seconds - let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000) - - /// Get the cache entry specified - let getCacheEntry (key : string) = - async { - let! entry = table.Get(key).RunResultAsync(opts.Connection) - return entry - } - - /// Refresh (update expiration based on sliding expiration) the cache entry specified - let refreshCacheEntry (entry : CacheEntry) = - async { - match entry.SlidingExpiration with - | 0 -> () - | seconds -> do! table.Get(entry.Id) - .Update({ ExpiresAt = ticksFromNow seconds }) - .RunResultAsync(opts.Connection) - } - - /// Get the payload for the cache entry - let getEntry key = - async { - do! checkEnvironment () - do! purgeExpired () - let! entry = getCacheEntry key - match box entry with - | null -> dbug <| fun () -> sprintf "Cache key %s not found" key + /// Get the payload for the cache entry + let getEntry key (cnxToken : CancellationToken) = + backgroundTask { + cnxToken.ThrowIfCancellationRequested () + do! checkEnvironment cnxToken + do! purgeExpired cnxToken + match! getCacheEntry key cnxToken with + | None -> + dbug <| fun () -> $"Cache key {key} not found" return null - | _ -> dbug <| fun () -> sprintf "Cache key %s found" key - do! refreshCacheEntry entry - return UTF8Encoding.UTF8.GetBytes entry.Payload - } + | Some entry -> + dbug <| fun () -> $"Cache key {key} found" + do! refreshCacheEntry entry cnxToken + return UTF8Encoding.UTF8.GetBytes entry.payload + } - /// Update the sliding expiration for a cache entry - let refreshEntry key = - async { - do! checkEnvironment () - let! entry = getCacheEntry key - match box entry with null -> () | _ -> do! refreshCacheEntry entry - do! purgeExpired () - return () - } + /// Update the sliding expiration for a cache entry + let refreshEntry key (cnxToken : CancellationToken) = + backgroundTask { + cnxToken.ThrowIfCancellationRequested () + do! checkEnvironment cnxToken + match! getCacheEntry key cnxToken with None -> () | Some entry -> do! refreshCacheEntry entry cnxToken + do! purgeExpired cnxToken + return () + } - /// Remove the specified cache entry - let removeEntry (key : string) = - async { - do! checkEnvironment () - do! table.Get(key).Delete().RunResultAsync(opts.Connection) - do! purgeExpired () - } + /// Remove the specified cache entry + let removeEntry (key : string) (cnxToken : CancellationToken) = + backgroundTask { + cnxToken.ThrowIfCancellationRequested () + do! checkEnvironment cnxToken + do! rethink { + withDb db; withTable table + get key + delete + write; withRetryDefault; ignoreResult opts.Connection + } + do! purgeExpired cnxToken + } - /// Set the value of a cache entry - let setEntry key payload (options : DistributedCacheEntryOptions) = - async { - do! checkEnvironment () - do! purgeExpired () - let addExpiration entry = - match true with - | _ when options.SlidingExpiration.HasValue -> - { entry with ExpiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds - SlidingExpiration = options.SlidingExpiration.Value.Seconds } - | _ when options.AbsoluteExpiration.HasValue -> - { entry with ExpiresAt = options.AbsoluteExpiration.Value.UtcTicks } - | _ when options.AbsoluteExpirationRelativeToNow.HasValue -> - { entry with ExpiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds } - | _ -> entry - let entry = { Id = key - Payload = UTF8Encoding.UTF8.GetString payload - ExpiresAt = Int64.MaxValue - SlidingExpiration = 0 } - |> addExpiration - do! match box (getCacheEntry key) with - | null -> table.Insert(entry).RunResultAsync(opts.Connection) - | _ -> table.Get(key).Replace(entry).RunResultAsync(opts.Connection) - return () - } - - interface IDistributedCache with - member this.Get key = getEntry key |> Async.RunSynchronously - member this.GetAsync key = getEntry key |> Async.StartAsTask - member this.Refresh key = refreshEntry key |> Async.RunSynchronously - member this.RefreshAsync key = refreshEntry key |> Async.StartAsTask :> Task - member this.Remove key = removeEntry key |> Async.RunSynchronously - member this.RemoveAsync key = removeEntry key |> Async.StartAsTask :> Task - member this.Set (key, value, options) = setEntry key value options |> Async.RunSynchronously - member this.SetAsync (key, value, options) = setEntry key value options |> Async.StartAsTask :> Task + /// Set the value of a cache entry + let setEntry key (payload : byte[]) (options : DistributedCacheEntryOptions) (cnxToken : CancellationToken) = + backgroundTask { + cnxToken.ThrowIfCancellationRequested () + do! checkEnvironment cnxToken + do! purgeExpired cnxToken + let addExpiration entry = + match true with + | _ when options.SlidingExpiration.HasValue -> + { entry with expiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds + slidingExpiration = options.SlidingExpiration.Value.Seconds } + | _ when options.AbsoluteExpiration.HasValue -> + { entry with expiresAt = options.AbsoluteExpiration.Value.UtcTicks } + | _ when options.AbsoluteExpirationRelativeToNow.HasValue -> + { entry with expiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds } + | _ -> entry + let entry = + { id = key + payload = UTF8Encoding.UTF8.GetString payload + expiresAt = Int64.MaxValue + slidingExpiration = 0 + } + |> addExpiration + match! getCacheEntry key cnxToken with + | None -> + do! rethink { + withDb db; withTable table + insert entry + write; withRetryDefault; ignoreResult opts.Connection + } + | Some _ -> + do! rethink { + withDb db; withTable table + get key + replace entry + write; withRetryDefault; ignoreResult opts.Connection + } + } + let runSync (task : CancellationToken -> Task<'T>) = + task CancellationToken.None |> (Async.AwaitTask >> Async.RunSynchronously) + + interface IDistributedCache with + member this.Get key = getEntry key |> runSync + member this.GetAsync (key, cnxToken) = getEntry key cnxToken + member this.Refresh key = refreshEntry key |> runSync + member this.RefreshAsync (key, cnxToken) = refreshEntry key cnxToken + member this.Remove key = removeEntry key |> runSync + member this.RemoveAsync (key, cnxToken) = removeEntry key cnxToken + member this.Set (key, value, options) = setEntry key value options |> runSync + member this.SetAsync (key, value, options, cnxToken) = setEntry key value options cnxToken diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs index 1dc837e..81b641b 100644 --- a/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs @@ -5,17 +5,19 @@ open RethinkDb.Driver.Net /// Options to use to configure the RethinkDB cache [] type DistributedRethinkDBCacheOptions() = - /// The RethinkDB connection to use for caching operations - member val Connection : IConnection = null with get, set + + /// The RethinkDB connection to use for caching operations + member val Connection : IConnection = null with get, set - /// The RethinkDB database to use (leave blank for connection default) - member val Database = "" with get, set + /// The RethinkDB database to use (leave blank for connection default) + member val Database = "" with get, set - /// The RethinkDB table name to use for cache entries (defaults to "Cache") - member val TableName = "" with get, set + /// The RethinkDB table name to use for cache entries (defaults to "Cache") + member val TableName = "" with get, set - /// Whether this configuration is valid - member this.IsValid () = - seq { - match this.Connection with null -> yield "Connection cannot be null" | _ -> () - } \ No newline at end of file + /// Whether this configuration is valid + member this.IsValid () = + seq { + match this.Connection with null -> yield "Connection cannot be null" | _ -> () + } + \ No newline at end of file diff --git a/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj new file mode 100644 index 0000000..d67bc54 --- /dev/null +++ b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj @@ -0,0 +1,21 @@ + + + + net6.0;netstandard2.0 + true + + + + + + + + + + + + + + + + diff --git a/src/RethinkDB.DistributedCache/project.json b/src/RethinkDB.DistributedCache/project.json deleted file mode 100644 index 93e5bb0..0000000 --- a/src/RethinkDB.DistributedCache/project.json +++ /dev/null @@ -1,32 +0,0 @@ -{ - "buildOptions": { - "compile": { - "includeFiles": [ - "DistributedRethinkDBCacheOptions.fs", - "DistributedRethinkDBCache.fs", - "IServiceCollectionExtensions.fs" - ] - }, - "compilerName": "fsc", - "debugType": "portable" - }, - "dependencies": { - "Microsoft.Extensions.Caching.Abstractions": "1.0.0", - "Microsoft.Extensions.Logging": "1.0.0", - "Microsoft.Extensions.Options": "1.0.0", - "Newtonsoft.Json": "9.0.1", - "RethinkDb.Driver": "2.3.15" - }, - "frameworks": { - "netstandard1.6": { - "dependencies": { - "Microsoft.FSharp.Core.netcore": "1.0.0-alpha-160831", - "NETStandard.Library": "1.6.0" - } - } - }, - "tools": { - "dotnet-compile-fsc":"1.0.0-preview2-*" - }, - "version": "0.9.0" -} \ No newline at end of file