diff --git a/src/RethinkDB.DistributedCache/Cache.fs b/src/RethinkDB.DistributedCache/Cache.fs index f1774d7..cfeb017 100644 --- a/src/RethinkDB.DistributedCache/Cache.fs +++ b/src/RethinkDB.DistributedCache/Cache.fs @@ -22,10 +22,10 @@ let debug cacheOpts (log : ILogger) text = if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table cacheOpts}] %s{text ()}" /// Convert seconds to .NET ticks -let secondsToTicks seconds = int64 (seconds * 10000000) +let secondsToTicks (span : TimeSpan) = int64 ((int span.TotalSeconds) * 10000000) /// Calculate ticks from now for the given number of seconds -let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks seconds) +let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks (TimeSpan.FromSeconds seconds)) /// Ensure that the necessary environment exists for this cache @@ -77,9 +77,7 @@ module Environment = /// Cache entry manipulation functions module Entry = - open System.Text open Microsoft.Extensions.Caching.Distributed - open RethinkDb.Driver.Model /// RethinkDB let r = RethinkDb.Driver.RethinkDB.R @@ -89,7 +87,7 @@ module Entry = let table = table cacheOpts match DateTime.UtcNow - lastCheck > cacheOpts.DeleteExpiredInterval with | true -> - let tix = ticksFromNow 0 + let tix = ticksFromNow 0.0 debug cacheOpts log <| fun () -> $"Purging expired entries (<= %i{tix})" do! rethink { withTable table @@ -102,37 +100,49 @@ module Entry = } /// Get the cache entry specified, refreshing sliding expiration then checking for expiration - let get cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask { + let get cacheOpts log (key : string) (cancelToken : CancellationToken) = backgroundTask { let table = table cacheOpts - let now = ticksFromNow 0 - let! result = rethink { + let debug = debug cacheOpts log + debug <| fun () -> $"Retriving cache entry {key}" + match! rethink { withTable table get key - update (fun row -> - r.HashMap( - expiresAt, - r.Branch( - // If we have neither sliding nor absolute expiration, do not change the expiry time - row.G(slidingExp).Le(0).Or(row.G(absoluteExp).Le(0)).Or(row.G(absoluteExp).Eq(row.G(expiresAt))), - row.G(expiresAt), - // If the sliding expiry increment exceeds the absolute expiry, use the absolute - row.G(expiresAt).Add(row.G(slidingExp)).Gt(row.G(absoluteExp)), - row.G(absoluteExp), - // Else adjust for the sliding expiry increment - row.G(slidingExp).Add(now))) :> obj) [ ReturnChanges All ] - write cancelToken; withRetryDefault cacheOpts.Connection - } - match result.Changes.Count with - | 0 -> return None - | _ -> - match (box >> Option.ofObj) (result.ChangesAs().[0].NewValue) with - | Some boxedEntry -> - let entry = unbox boxedEntry - return if entry.expiresAt > now then Some entry else None - | _ -> return None + resultOption cancelToken; withRetryOptionDefault cacheOpts.Connection + } with + | Some entry -> + debug <| fun () -> $"Refreshing cache entry {key}" + let now = ticksFromNow 0.0 + let entry = + match true with + | _ when entry.absoluteExp = entry.expiresAt -> + // Already expired + entry + | _ when entry.slidingExp <= 0L && entry.absoluteExp <= 0L -> + // Not enough information to adjust expiration + entry + | _ when entry.absoluteExp > 0 && entry.expiresAt + entry.slidingExp > entry.absoluteExp -> + // Sliding update would push it past absolute; use absolute expiration + { entry with expiresAt = entry.absoluteExp } + | _ -> + // Update sliding expiration + { entry with expiresAt = now + entry.slidingExp } + do! rethink { + withTable table + get key + replace entry + write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection + } + debug <| fun () -> + let state = if entry.expiresAt > now then "valid" else "expired" + $"Cache entry {key} is {state}" + return (if entry.expiresAt > now then Some entry else None) + | None -> + debug <| fun () -> $"Cache entry {key} not found" + return None } - let remove cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask { + let remove cacheOpts log (key : string) (cancelToken : CancellationToken) = backgroundTask { + debug cacheOpts log <| fun () -> $"Deleting cache entry {key}" let table = table cacheOpts do! rethink { withTable table @@ -143,27 +153,27 @@ module Entry = } /// Set a cache entry - let set cacheOpts (entryOpts : DistributedCacheEntryOptions) key (payload : byte[]) - (cancelToken : CancellationToken) = + let set cacheOpts log (entryOpts : DistributedCacheEntryOptions) key payload (cancelToken : CancellationToken) = backgroundTask { + debug cacheOpts log <| fun () -> $"Creating cache entry {key}" let table = table cacheOpts let addExpiration entry = match true with | _ when entryOpts.SlidingExpiration.HasValue -> - let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value.Seconds + let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value { entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks } | _ when entryOpts.AbsoluteExpiration.HasValue -> let exp = entryOpts.AbsoluteExpiration.Value.UtcTicks { entry with expiresAt = exp; absoluteExp = exp } | _ when entryOpts.AbsoluteExpirationRelativeToNow.HasValue -> - let exp = entryOpts.AbsoluteExpirationRelativeToNow.Value.Seconds + let exp = ticksFromNow 0.0 + secondsToTicks entryOpts.AbsoluteExpirationRelativeToNow.Value { entry with expiresAt = exp; absoluteExp = exp } | _ -> - let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration.Seconds - { entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks } + let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration + { entry with expiresAt = ticksFromNow 0.0 + expTicks; slidingExp = expTicks } let entry = { id = key - payload = UTF8Encoding.UTF8.GetString payload + payload = Convert.ToBase64String payload expiresAt = Int64.MinValue slidingExp = 0L absoluteExp = 0L @@ -171,7 +181,7 @@ module Entry = |> addExpiration do! rethink { withTable table - replace entry + insert entry [ OnConflict Replace ] write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection } } diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs index 8f84ca4..42e370e 100644 --- a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs @@ -1,7 +1,6 @@ namespace RethinkDB.DistributedCache open System -open System.Text open System.Threading open System.Threading.Tasks open Microsoft.Extensions.Caching.Distributed @@ -49,34 +48,34 @@ type DistributedRethinkDBCache (options : IOptions debug <| fun () -> $"Cache key {key} not found" return null | Some entry -> debug <| fun () -> $"Cache key {key} found" - return UTF8Encoding.UTF8.GetBytes entry.payload + return Convert.FromBase64String entry.payload } /// Update the sliding expiration for a cache entry let refreshEntry key cancelToken = backgroundTask { do! checkEnvironment cancelToken - let! _ = Cache.Entry.get opts key cancelToken + let! _ = Cache.Entry.get opts log key cancelToken do! purgeExpired cancelToken } /// Remove the specified cache entry let removeEntry key cancelToken = backgroundTask { do! checkEnvironment cancelToken - do! Cache.Entry.remove opts key cancelToken + do! Cache.Entry.remove opts log key cancelToken do! purgeExpired cancelToken } /// Set the value of a cache entry let setEntry key payload options cancelToken = backgroundTask { - do! Cache.Entry.set opts options key payload cancelToken + do! Cache.Entry.set opts log options key payload cancelToken do! purgeExpired cancelToken } @@ -85,11 +84,11 @@ type DistributedRethinkDBCache (options : IOptions (Async.AwaitTask >> Async.RunSynchronously) interface IDistributedCache with - member this.Get key = getEntry key |> runSync - member this.GetAsync (key, cancelToken) = getEntry key cancelToken - member this.Refresh key = refreshEntry key |> runSync - member this.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken - member this.Remove key = removeEntry key |> runSync - member this.RemoveAsync (key, cancelToken) = removeEntry key cancelToken - member this.Set (key, value, options) = setEntry key value options |> runSync - member this.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken + member _.Get key = getEntry key |> runSync + member _.GetAsync (key, cancelToken) = getEntry key cancelToken + member _.Refresh key = refreshEntry key |> runSync + member _.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken + member _.Remove key = removeEntry key |> runSync + member _.RemoveAsync (key, cancelToken) = removeEntry key cancelToken + member _.Set (key, value, options) = setEntry key value options |> runSync + member _.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken diff --git a/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj index 5225bdd..ad60837 100644 --- a/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj +++ b/src/RethinkDB.DistributedCache/RethinkDB.DistributedCache.fsproj @@ -13,8 +13,8 @@ MIT RethinkDB IDistributedCache ASP.NET Core An IDistributedCache implementation utilizing RethinkDB for storage - alpha04 - Work toward starting a new session when encountering an expired one + alpha05 + Provider now stores successfully; do not use an earlier version