diff --git a/src/PrayerTracker.Data/DistributedCache.fs b/src/PrayerTracker.Data/DistributedCache.fs new file mode 100644 index 0000000..8abf8c8 --- /dev/null +++ b/src/PrayerTracker.Data/DistributedCache.fs @@ -0,0 +1,218 @@ +namespace PrayerTracker.Data + +open System.Threading +open System.Threading.Tasks +open Microsoft.Extensions.Caching.Distributed +open NodaTime +open Npgsql +open Npgsql.FSharp + +/// Helper types and functions for the cache +[] +module private CacheHelpers = + + open System + + /// The cache entry + type Entry = + { /// The ID of the cache entry + Id : string + + /// The value to be cached + Payload : byte[] + + /// When this entry will expire + ExpireAt : Instant + + /// The duration by which the expiration should be pushed out when being refreshed + SlidingExpiration : Duration option + + /// The must-expire-by date/time for the cache entry + AbsoluteExpiration : Instant option + } + + /// Run a task synchronously + let sync<'T> (it : Task<'T>) = it |> (Async.AwaitTask >> Async.RunSynchronously) + + /// Get the current instant + let getNow () = SystemClock.Instance.GetCurrentInstant () + + /// Create a parameter for the expire-at time + let expireParam (it : Instant) = + "@expireAt", Sql.parameter (NpgsqlParameter ("@expireAt", it)) + + /// Create a parameter for a possibly-missing NodaTime type + let optParam<'T> name (it : 'T option) = + let p = NpgsqlParameter ($"@%s{name}", if Option.isSome it then box it.Value else DBNull.Value) + p.ParameterName, Sql.parameter p + + +/// A distributed cache implementation in PostgreSQL used to handle sessions for myWebLog +type DistributedCache (connStr : string) = + + // ~~~ INITIALIZATION ~~~ + + do + task { + let! exists = + Sql.connect connStr + |> Sql.query $" + SELECT EXISTS + (SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'session') + AS does_exist" + |> Sql.executeRowAsync (fun row -> row.bool "does_exist") + if not exists then + let! _ = + Sql.connect connStr + |> Sql.query + "CREATE TABLE session ( + id TEXT NOT NULL PRIMARY KEY, + payload BYTEA NOT NULL, + expire_at TIMESTAMPTZ NOT NULL, + sliding_expiration INTERVAL, + absolute_expiration TIMESTAMPTZ); + CREATE INDEX idx_session_expiration ON session (expire_at)" + |> Sql.executeNonQueryAsync + () + } |> sync + + // ~~~ SUPPORT FUNCTIONS ~~~ + + /// Get an entry, updating it for sliding expiration + let getEntry key = backgroundTask { + let idParam = "@id", Sql.string key + let! tryEntry = + Sql.connect connStr + |> Sql.query "SELECT * FROM session WHERE id = @id" + |> Sql.parameters [ idParam ] + |> Sql.executeAsync (fun row -> + { Id = row.string "id" + Payload = row.bytea "payload" + ExpireAt = row.fieldValue "expire_at" + SlidingExpiration = row.fieldValueOrNone "sliding_expiration" + AbsoluteExpiration = row.fieldValueOrNone "absolute_expiration" }) + match List.tryHead tryEntry with + | Some entry -> + let now = getNow () + let slideExp = defaultArg entry.SlidingExpiration Duration.MinValue + let absExp = defaultArg entry.AbsoluteExpiration Instant.MinValue + let needsRefresh, item = + if entry.ExpireAt = absExp then false, entry + elif slideExp = Duration.MinValue && absExp = Instant.MinValue then false, entry + elif absExp > Instant.MinValue && entry.ExpireAt.Plus slideExp > absExp then + true, { entry with ExpireAt = absExp } + else true, { entry with ExpireAt = now.Plus slideExp } + if needsRefresh then + let! _ = + Sql.connect connStr + |> Sql.query "UPDATE session SET expire_at = @expireAt WHERE id = @id" + |> Sql.parameters [ expireParam item.ExpireAt; idParam ] + |> Sql.executeNonQueryAsync + () + return if item.ExpireAt > now then Some entry else None + | None -> return None + } + + /// The last time expired entries were purged (runs every 30 minutes) + let mutable lastPurge = Instant.MinValue + + /// Purge expired entries every 30 minutes + let purge () = backgroundTask { + let now = getNow () + if lastPurge.Plus (Duration.FromMinutes 30L) < now then + let! _ = + Sql.connect connStr + |> Sql.query "DELETE FROM session WHERE expire_at < @expireAt" + |> Sql.parameters [ expireParam now ] + |> Sql.executeNonQueryAsync + lastPurge <- now + } + + /// Remove a cache entry + let removeEntry key = backgroundTask { + let! _ = + Sql.connect connStr + |> Sql.query "DELETE FROM session WHERE id = @id" + |> Sql.parameters [ "@id", Sql.string key ] + |> Sql.executeNonQueryAsync + () + } + + /// Save an entry + let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask { + let now = getNow () + let expireAt, slideExp, absExp = + if opts.SlidingExpiration.HasValue then + let slide = Duration.FromTimeSpan opts.SlidingExpiration.Value + now.Plus slide, Some slide, None + elif opts.AbsoluteExpiration.HasValue then + let exp = Instant.FromDateTimeOffset opts.AbsoluteExpiration.Value + exp, None, Some exp + elif opts.AbsoluteExpirationRelativeToNow.HasValue then + let exp = now.Plus (Duration.FromTimeSpan opts.AbsoluteExpirationRelativeToNow.Value) + exp, None, Some exp + else + // Default to 2 hour sliding expiration + let slide = Duration.FromHours 2 + now.Plus slide, Some slide, None + let! _ = + Sql.connect connStr + |> Sql.query + "INSERT INTO session ( + id, payload, expire_at, sliding_expiration, absolute_expiration + ) VALUES ( + @id, @payload, @expireAt, @slideExp, @absExp + ) ON CONFLICT (id) DO UPDATE + SET payload = EXCLUDED.payload, + expire_at = EXCLUDED.expire_at, + sliding_expiration = EXCLUDED.sliding_expiration, + absolute_expiration = EXCLUDED.absolute_expiration" + |> Sql.parameters + [ "@id", Sql.string key + "@payload", Sql.bytea payload + expireParam expireAt + optParam "slideExp" slideExp + optParam "absExp" absExp ] + |> Sql.executeNonQueryAsync + () + } + + // ~~~ IMPLEMENTATION FUNCTIONS ~~~ + + /// Retrieve the data for a cache entry + let get key (_ : CancellationToken) = backgroundTask { + match! getEntry key with + | Some entry -> + do! purge () + return entry.Payload + | None -> return null + } + + /// Refresh an entry + let refresh key (cancelToken : CancellationToken) = backgroundTask { + let! _ = get key cancelToken + () + } + + /// Remove an entry + let remove key (_ : CancellationToken) = backgroundTask { + do! removeEntry key + do! purge () + } + + /// Set an entry + let set key value options (_ : CancellationToken) = backgroundTask { + do! saveEntry options key value + do! purge () + } + + interface IDistributedCache with + member this.Get key = get key CancellationToken.None |> sync + member this.GetAsync (key, token) = get key token + member this.Refresh key = refresh key CancellationToken.None |> sync + member this.RefreshAsync (key, token) = refresh key token + member this.Remove key = remove key CancellationToken.None |> sync + member this.RemoveAsync (key, token) = remove key token + member this.Set (key, value, options) = set key value options CancellationToken.None |> sync + member this.SetAsync (key, value, options, token) = set key value options token + diff --git a/src/PrayerTracker.Data/PrayerTracker.Data.fsproj b/src/PrayerTracker.Data/PrayerTracker.Data.fsproj index 5480031..40cae83 100644 --- a/src/PrayerTracker.Data/PrayerTracker.Data.fsproj +++ b/src/PrayerTracker.Data/PrayerTracker.Data.fsproj @@ -7,11 +7,12 @@ + - + diff --git a/src/PrayerTracker.Tests/PrayerTracker.Tests.fsproj b/src/PrayerTracker.Tests/PrayerTracker.Tests.fsproj index 469b647..e98224a 100644 --- a/src/PrayerTracker.Tests/PrayerTracker.Tests.fsproj +++ b/src/PrayerTracker.Tests/PrayerTracker.Tests.fsproj @@ -16,7 +16,7 @@ - + diff --git a/src/PrayerTracker/App.fs b/src/PrayerTracker/App.fs index a84d339..d7484a8 100644 --- a/src/PrayerTracker/App.fs +++ b/src/PrayerTracker/App.fs @@ -35,13 +35,13 @@ module Configure = (ctx.Configuration.GetSection >> opts.Configure >> ignore) "Kestrel" open System.Globalization - open System.IO open Microsoft.AspNetCore.Authentication.Cookies open Microsoft.AspNetCore.Localization + open Microsoft.Extensions.Caching.Distributed open Microsoft.Extensions.DependencyInjection - open NeoSmart.Caching.Sqlite open NodaTime open Npgsql + open PrayerTracker.Data /// Configure ASP.NET Core's service collection (dependency injection container) let services (svc : IServiceCollection) = @@ -63,7 +63,10 @@ module Configure = opts.SlidingExpiration <- true opts.AccessDeniedPath <- "/error/403") let _ = svc.AddAuthorization () - let _ = svc.AddSqliteCache (fun opts -> opts.CachePath <- Path.Combine (".", "session.db")) + let _ = + svc.AddSingleton (fun sp -> + let cfg = sp.GetService () + DistributedCache (cfg.GetConnectionString "PrayerTracker") :> IDistributedCache) let _ = svc.AddSession () let _ = svc.AddAntiforgery () let _ = svc.AddRouting () diff --git a/src/PrayerTracker/PrayerTracker.fsproj b/src/PrayerTracker/PrayerTracker.fsproj index ffe1198..6864a3d 100644 --- a/src/PrayerTracker/PrayerTracker.fsproj +++ b/src/PrayerTracker/PrayerTracker.fsproj @@ -26,7 +26,6 @@ -