Fix cache entry creation
This commit is contained in:
parent
b87f3fbead
commit
652cf5ae91
|
@ -22,10 +22,10 @@ let debug cacheOpts (log : ILogger) text =
|
||||||
if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table cacheOpts}] %s{text ()}"
|
if log.IsEnabled LogLevel.Debug then log.LogDebug $"[{table cacheOpts}] %s{text ()}"
|
||||||
|
|
||||||
/// Convert seconds to .NET ticks
|
/// Convert seconds to .NET ticks
|
||||||
let secondsToTicks seconds = int64 (seconds * 10000000)
|
let secondsToTicks (span : TimeSpan) = int64 ((int span.TotalSeconds) * 10000000)
|
||||||
|
|
||||||
/// Calculate ticks from now for the given number of seconds
|
/// Calculate ticks from now for the given number of seconds
|
||||||
let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks seconds)
|
let ticksFromNow seconds = DateTime.UtcNow.Ticks + (secondsToTicks (TimeSpan.FromSeconds seconds))
|
||||||
|
|
||||||
|
|
||||||
/// Ensure that the necessary environment exists for this cache
|
/// Ensure that the necessary environment exists for this cache
|
||||||
|
@ -77,9 +77,7 @@ module Environment =
|
||||||
/// Cache entry manipulation functions
|
/// Cache entry manipulation functions
|
||||||
module Entry =
|
module Entry =
|
||||||
|
|
||||||
open System.Text
|
|
||||||
open Microsoft.Extensions.Caching.Distributed
|
open Microsoft.Extensions.Caching.Distributed
|
||||||
open RethinkDb.Driver.Model
|
|
||||||
|
|
||||||
/// RethinkDB
|
/// RethinkDB
|
||||||
let r = RethinkDb.Driver.RethinkDB.R
|
let r = RethinkDb.Driver.RethinkDB.R
|
||||||
|
@ -89,7 +87,7 @@ module Entry =
|
||||||
let table = table cacheOpts
|
let table = table cacheOpts
|
||||||
match DateTime.UtcNow - lastCheck > cacheOpts.DeleteExpiredInterval with
|
match DateTime.UtcNow - lastCheck > cacheOpts.DeleteExpiredInterval with
|
||||||
| true ->
|
| true ->
|
||||||
let tix = ticksFromNow 0
|
let tix = ticksFromNow 0.0
|
||||||
debug cacheOpts log <| fun () -> $"Purging expired entries (<= %i{tix})"
|
debug cacheOpts log <| fun () -> $"Purging expired entries (<= %i{tix})"
|
||||||
do! rethink {
|
do! rethink {
|
||||||
withTable table
|
withTable table
|
||||||
|
@ -102,37 +100,49 @@ module Entry =
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the cache entry specified, refreshing sliding expiration then checking for expiration
|
/// Get the cache entry specified, refreshing sliding expiration then checking for expiration
|
||||||
let get cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask {
|
let get cacheOpts log (key : string) (cancelToken : CancellationToken) = backgroundTask {
|
||||||
let table = table cacheOpts
|
let table = table cacheOpts
|
||||||
let now = ticksFromNow 0
|
let debug = debug cacheOpts log
|
||||||
let! result = rethink<Result> {
|
debug <| fun () -> $"Retriving cache entry {key}"
|
||||||
|
match! rethink<CacheEntry> {
|
||||||
withTable table
|
withTable table
|
||||||
get key
|
get key
|
||||||
update (fun row ->
|
resultOption cancelToken; withRetryOptionDefault cacheOpts.Connection
|
||||||
r.HashMap(
|
} with
|
||||||
expiresAt,
|
| Some entry ->
|
||||||
r.Branch(
|
debug <| fun () -> $"Refreshing cache entry {key}"
|
||||||
// If we have neither sliding nor absolute expiration, do not change the expiry time
|
let now = ticksFromNow 0.0
|
||||||
row.G(slidingExp).Le(0).Or(row.G(absoluteExp).Le(0)).Or(row.G(absoluteExp).Eq(row.G(expiresAt))),
|
let entry =
|
||||||
row.G(expiresAt),
|
match true with
|
||||||
// If the sliding expiry increment exceeds the absolute expiry, use the absolute
|
| _ when entry.absoluteExp = entry.expiresAt ->
|
||||||
row.G(expiresAt).Add(row.G(slidingExp)).Gt(row.G(absoluteExp)),
|
// Already expired
|
||||||
row.G(absoluteExp),
|
entry
|
||||||
// Else adjust for the sliding expiry increment
|
| _ when entry.slidingExp <= 0L && entry.absoluteExp <= 0L ->
|
||||||
row.G(slidingExp).Add(now))) :> obj) [ ReturnChanges All ]
|
// Not enough information to adjust expiration
|
||||||
write cancelToken; withRetryDefault cacheOpts.Connection
|
entry
|
||||||
}
|
| _ when entry.absoluteExp > 0 && entry.expiresAt + entry.slidingExp > entry.absoluteExp ->
|
||||||
match result.Changes.Count with
|
// Sliding update would push it past absolute; use absolute expiration
|
||||||
| 0 -> return None
|
{ entry with expiresAt = entry.absoluteExp }
|
||||||
| _ ->
|
| _ ->
|
||||||
match (box >> Option.ofObj) (result.ChangesAs<CacheEntry>().[0].NewValue) with
|
// Update sliding expiration
|
||||||
| Some boxedEntry ->
|
{ entry with expiresAt = now + entry.slidingExp }
|
||||||
let entry = unbox boxedEntry
|
do! rethink {
|
||||||
return if entry.expiresAt > now then Some entry else None
|
withTable table
|
||||||
| _ -> return None
|
get key
|
||||||
|
replace entry
|
||||||
|
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
|
||||||
|
}
|
||||||
|
debug <| fun () ->
|
||||||
|
let state = if entry.expiresAt > now then "valid" else "expired"
|
||||||
|
$"Cache entry {key} is {state}"
|
||||||
|
return (if entry.expiresAt > now then Some entry else None)
|
||||||
|
| None ->
|
||||||
|
debug <| fun () -> $"Cache entry {key} not found"
|
||||||
|
return None
|
||||||
}
|
}
|
||||||
|
|
||||||
let remove cacheOpts (key : string) (cancelToken : CancellationToken) = backgroundTask {
|
let remove cacheOpts log (key : string) (cancelToken : CancellationToken) = backgroundTask {
|
||||||
|
debug cacheOpts log <| fun () -> $"Deleting cache entry {key}"
|
||||||
let table = table cacheOpts
|
let table = table cacheOpts
|
||||||
do! rethink {
|
do! rethink {
|
||||||
withTable table
|
withTable table
|
||||||
|
@ -143,27 +153,27 @@ module Entry =
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set a cache entry
|
/// Set a cache entry
|
||||||
let set cacheOpts (entryOpts : DistributedCacheEntryOptions) key (payload : byte[])
|
let set cacheOpts log (entryOpts : DistributedCacheEntryOptions) key payload (cancelToken : CancellationToken) =
|
||||||
(cancelToken : CancellationToken) =
|
|
||||||
backgroundTask {
|
backgroundTask {
|
||||||
|
debug cacheOpts log <| fun () -> $"Creating cache entry {key}"
|
||||||
let table = table cacheOpts
|
let table = table cacheOpts
|
||||||
let addExpiration entry =
|
let addExpiration entry =
|
||||||
match true with
|
match true with
|
||||||
| _ when entryOpts.SlidingExpiration.HasValue ->
|
| _ when entryOpts.SlidingExpiration.HasValue ->
|
||||||
let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value.Seconds
|
let expTicks = secondsToTicks entryOpts.SlidingExpiration.Value
|
||||||
{ entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks }
|
{ entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks }
|
||||||
| _ when entryOpts.AbsoluteExpiration.HasValue ->
|
| _ when entryOpts.AbsoluteExpiration.HasValue ->
|
||||||
let exp = entryOpts.AbsoluteExpiration.Value.UtcTicks
|
let exp = entryOpts.AbsoluteExpiration.Value.UtcTicks
|
||||||
{ entry with expiresAt = exp; absoluteExp = exp }
|
{ entry with expiresAt = exp; absoluteExp = exp }
|
||||||
| _ when entryOpts.AbsoluteExpirationRelativeToNow.HasValue ->
|
| _ when entryOpts.AbsoluteExpirationRelativeToNow.HasValue ->
|
||||||
let exp = entryOpts.AbsoluteExpirationRelativeToNow.Value.Seconds
|
let exp = ticksFromNow 0.0 + secondsToTicks entryOpts.AbsoluteExpirationRelativeToNow.Value
|
||||||
{ entry with expiresAt = exp; absoluteExp = exp }
|
{ entry with expiresAt = exp; absoluteExp = exp }
|
||||||
| _ ->
|
| _ ->
|
||||||
let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration.Seconds
|
let expTicks = secondsToTicks cacheOpts.DefaultSlidingExpiration
|
||||||
{ entry with expiresAt = ticksFromNow 0 + expTicks; slidingExp = expTicks }
|
{ entry with expiresAt = ticksFromNow 0.0 + expTicks; slidingExp = expTicks }
|
||||||
let entry =
|
let entry =
|
||||||
{ id = key
|
{ id = key
|
||||||
payload = UTF8Encoding.UTF8.GetString payload
|
payload = Convert.ToBase64String payload
|
||||||
expiresAt = Int64.MinValue
|
expiresAt = Int64.MinValue
|
||||||
slidingExp = 0L
|
slidingExp = 0L
|
||||||
absoluteExp = 0L
|
absoluteExp = 0L
|
||||||
|
@ -171,7 +181,7 @@ module Entry =
|
||||||
|> addExpiration
|
|> addExpiration
|
||||||
do! rethink {
|
do! rethink {
|
||||||
withTable table
|
withTable table
|
||||||
replace entry
|
insert entry [ OnConflict Replace ]
|
||||||
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
|
write cancelToken; withRetryDefault; ignoreResult cacheOpts.Connection
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
namespace RethinkDB.DistributedCache
|
namespace RethinkDB.DistributedCache
|
||||||
|
|
||||||
open System
|
open System
|
||||||
open System.Text
|
|
||||||
open System.Threading
|
open System.Threading
|
||||||
open System.Threading.Tasks
|
open System.Threading.Tasks
|
||||||
open Microsoft.Extensions.Caching.Distributed
|
open Microsoft.Extensions.Caching.Distributed
|
||||||
|
@ -49,7 +48,7 @@ type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOpti
|
||||||
/// Get the payload for the cache entry
|
/// Get the payload for the cache entry
|
||||||
let getEntry key cancelToken = backgroundTask {
|
let getEntry key cancelToken = backgroundTask {
|
||||||
do! checkEnvironment cancelToken
|
do! checkEnvironment cancelToken
|
||||||
let! result = Cache.Entry.get opts key cancelToken
|
let! result = Cache.Entry.get opts log key cancelToken
|
||||||
do! purgeExpired cancelToken
|
do! purgeExpired cancelToken
|
||||||
match result with
|
match result with
|
||||||
| None ->
|
| None ->
|
||||||
|
@ -57,26 +56,26 @@ type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOpti
|
||||||
return null
|
return null
|
||||||
| Some entry ->
|
| Some entry ->
|
||||||
debug <| fun () -> $"Cache key {key} found"
|
debug <| fun () -> $"Cache key {key} found"
|
||||||
return UTF8Encoding.UTF8.GetBytes entry.payload
|
return Convert.FromBase64String entry.payload
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the sliding expiration for a cache entry
|
/// Update the sliding expiration for a cache entry
|
||||||
let refreshEntry key cancelToken = backgroundTask {
|
let refreshEntry key cancelToken = backgroundTask {
|
||||||
do! checkEnvironment cancelToken
|
do! checkEnvironment cancelToken
|
||||||
let! _ = Cache.Entry.get opts key cancelToken
|
let! _ = Cache.Entry.get opts log key cancelToken
|
||||||
do! purgeExpired cancelToken
|
do! purgeExpired cancelToken
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove the specified cache entry
|
/// Remove the specified cache entry
|
||||||
let removeEntry key cancelToken = backgroundTask {
|
let removeEntry key cancelToken = backgroundTask {
|
||||||
do! checkEnvironment cancelToken
|
do! checkEnvironment cancelToken
|
||||||
do! Cache.Entry.remove opts key cancelToken
|
do! Cache.Entry.remove opts log key cancelToken
|
||||||
do! purgeExpired cancelToken
|
do! purgeExpired cancelToken
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set the value of a cache entry
|
/// Set the value of a cache entry
|
||||||
let setEntry key payload options cancelToken = backgroundTask {
|
let setEntry key payload options cancelToken = backgroundTask {
|
||||||
do! Cache.Entry.set opts options key payload cancelToken
|
do! Cache.Entry.set opts log options key payload cancelToken
|
||||||
do! purgeExpired cancelToken
|
do! purgeExpired cancelToken
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,11 +84,11 @@ type DistributedRethinkDBCache (options : IOptions<DistributedRethinkDBCacheOpti
|
||||||
task CancellationToken.None |> (Async.AwaitTask >> Async.RunSynchronously)
|
task CancellationToken.None |> (Async.AwaitTask >> Async.RunSynchronously)
|
||||||
|
|
||||||
interface IDistributedCache with
|
interface IDistributedCache with
|
||||||
member this.Get key = getEntry key |> runSync
|
member _.Get key = getEntry key |> runSync
|
||||||
member this.GetAsync (key, cancelToken) = getEntry key cancelToken
|
member _.GetAsync (key, cancelToken) = getEntry key cancelToken
|
||||||
member this.Refresh key = refreshEntry key |> runSync
|
member _.Refresh key = refreshEntry key |> runSync
|
||||||
member this.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken
|
member _.RefreshAsync (key, cancelToken) = refreshEntry key cancelToken
|
||||||
member this.Remove key = removeEntry key |> runSync
|
member _.Remove key = removeEntry key |> runSync
|
||||||
member this.RemoveAsync (key, cancelToken) = removeEntry key cancelToken
|
member _.RemoveAsync (key, cancelToken) = removeEntry key cancelToken
|
||||||
member this.Set (key, value, options) = setEntry key value options |> runSync
|
member _.Set (key, value, options) = setEntry key value options |> runSync
|
||||||
member this.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken
|
member _.SetAsync (key, value, options, cancelToken) = setEntry key value options cancelToken
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
<PackageLicenseExpression>MIT</PackageLicenseExpression>
|
||||||
<PackageTags>RethinkDB IDistributedCache ASP.NET Core</PackageTags>
|
<PackageTags>RethinkDB IDistributedCache ASP.NET Core</PackageTags>
|
||||||
<Description>An IDistributedCache implementation utilizing RethinkDB for storage</Description>
|
<Description>An IDistributedCache implementation utilizing RethinkDB for storage</Description>
|
||||||
<VersionSuffix>alpha04</VersionSuffix>
|
<VersionSuffix>alpha05</VersionSuffix>
|
||||||
<PackageReleaseNotes>Work toward starting a new session when encountering an expired one</PackageReleaseNotes>
|
<PackageReleaseNotes>Provider now stores successfully; do not use an earlier version</PackageReleaseNotes>
|
||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
|
Loading…
Reference in New Issue
Block a user