Version 8 #43
							
								
								
									
										218
									
								
								src/PrayerTracker.Data/DistributedCache.fs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										218
									
								
								src/PrayerTracker.Data/DistributedCache.fs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,218 @@ | ||||
| namespace PrayerTracker.Data | ||||
| 
 | ||||
| open System.Threading | ||||
| open System.Threading.Tasks | ||||
| open Microsoft.Extensions.Caching.Distributed | ||||
| open NodaTime | ||||
| open Npgsql | ||||
| open Npgsql.FSharp | ||||
| 
 | ||||
| /// Helper types and functions for the cache | ||||
| [<AutoOpen>] | ||||
| module private CacheHelpers = | ||||
|      | ||||
|     open System | ||||
|      | ||||
|     /// The cache entry | ||||
|     type Entry = | ||||
|         {   /// The ID of the cache entry | ||||
|             Id : string | ||||
|              | ||||
|             /// The value to be cached | ||||
|             Payload : byte[] | ||||
|              | ||||
|             /// When this entry will expire | ||||
|             ExpireAt : Instant | ||||
|              | ||||
|             /// The duration by which the expiration should be pushed out when being refreshed | ||||
|             SlidingExpiration : Duration option | ||||
|              | ||||
|             /// The must-expire-by date/time for the cache entry | ||||
|             AbsoluteExpiration : Instant option | ||||
|         } | ||||
|      | ||||
|     /// Run a task synchronously | ||||
|     let sync<'T> (it : Task<'T>) = it |> (Async.AwaitTask >> Async.RunSynchronously) | ||||
|      | ||||
|     /// Get the current instant | ||||
|     let getNow () = SystemClock.Instance.GetCurrentInstant () | ||||
|      | ||||
|     /// Create a parameter for the expire-at time | ||||
|     let expireParam (it : Instant) = | ||||
|         "@expireAt", Sql.parameter (NpgsqlParameter ("@expireAt", it)) | ||||
|      | ||||
|     /// Create a parameter for a possibly-missing NodaTime type | ||||
|     let optParam<'T> name (it : 'T option) = | ||||
|         let p = NpgsqlParameter ($"@%s{name}", if Option.isSome it then box it.Value else DBNull.Value) | ||||
|         p.ParameterName, Sql.parameter p | ||||
| 
 | ||||
| 
 | ||||
| /// A distributed cache implementation in PostgreSQL used to handle sessions for myWebLog | ||||
| type DistributedCache (connStr : string) = | ||||
|      | ||||
|     // ~~~ INITIALIZATION ~~~ | ||||
|      | ||||
|     do | ||||
|         task { | ||||
|             let! exists = | ||||
|                 Sql.connect connStr | ||||
|                 |> Sql.query $" | ||||
|                     SELECT EXISTS | ||||
|                         (SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'session') | ||||
|                       AS does_exist" | ||||
|                 |> Sql.executeRowAsync (fun row -> row.bool "does_exist") | ||||
|             if not exists then | ||||
|                 let! _ = | ||||
|                     Sql.connect connStr | ||||
|                     |> Sql.query | ||||
|                         "CREATE TABLE session ( | ||||
|                             id                  TEXT        NOT NULL PRIMARY KEY, | ||||
|                             payload             BYTEA       NOT NULL, | ||||
|                             expire_at           TIMESTAMPTZ NOT NULL, | ||||
|                             sliding_expiration  INTERVAL, | ||||
|                             absolute_expiration TIMESTAMPTZ); | ||||
|                         CREATE INDEX idx_session_expiration ON session (expire_at)" | ||||
|                     |> Sql.executeNonQueryAsync | ||||
|                 () | ||||
|         } |> sync | ||||
|      | ||||
|     // ~~~ SUPPORT FUNCTIONS ~~~ | ||||
|      | ||||
|     /// Get an entry, updating it for sliding expiration | ||||
|     let getEntry key = backgroundTask { | ||||
|         let idParam = "@id", Sql.string key | ||||
|         let! tryEntry = | ||||
|             Sql.connect connStr | ||||
|             |> Sql.query "SELECT * FROM session WHERE id = @id" | ||||
|             |> Sql.parameters [ idParam ] | ||||
|             |> Sql.executeAsync (fun row -> | ||||
|                 {   Id                 = row.string                     "id" | ||||
|                     Payload            = row.bytea                      "payload" | ||||
|                     ExpireAt           = row.fieldValue<Instant>        "expire_at" | ||||
|                     SlidingExpiration  = row.fieldValueOrNone<Duration> "sliding_expiration" | ||||
|                     AbsoluteExpiration = row.fieldValueOrNone<Instant>  "absolute_expiration"   }) | ||||
|         match List.tryHead tryEntry with | ||||
|         | Some entry -> | ||||
|             let now      = getNow () | ||||
|             let slideExp = defaultArg entry.SlidingExpiration  Duration.MinValue | ||||
|             let absExp   = defaultArg entry.AbsoluteExpiration Instant.MinValue | ||||
|             let needsRefresh, item = | ||||
|                 if entry.ExpireAt = absExp then false, entry | ||||
|                 elif slideExp = Duration.MinValue && absExp = Instant.MinValue then false, entry | ||||
|                 elif absExp > Instant.MinValue && entry.ExpireAt.Plus slideExp > absExp then | ||||
|                     true, { entry with ExpireAt = absExp } | ||||
|                 else true, { entry with ExpireAt = now.Plus slideExp } | ||||
|             if needsRefresh then | ||||
|                 let! _ = | ||||
|                     Sql.connect connStr | ||||
|                     |> Sql.query "UPDATE session SET expire_at = @expireAt WHERE id = @id" | ||||
|                     |> Sql.parameters [ expireParam item.ExpireAt; idParam ] | ||||
|                     |> Sql.executeNonQueryAsync | ||||
|                 () | ||||
|             return if item.ExpireAt > now then Some entry else None | ||||
|         | None -> return None | ||||
|     } | ||||
|      | ||||
|     /// The last time expired entries were purged (runs every 30 minutes) | ||||
|     let mutable lastPurge = Instant.MinValue | ||||
|      | ||||
|     /// Purge expired entries every 30 minutes | ||||
|     let purge () = backgroundTask { | ||||
|         let now = getNow () | ||||
|         if lastPurge.Plus (Duration.FromMinutes 30L) < now then | ||||
|             let! _ = | ||||
|                 Sql.connect connStr | ||||
|                 |> Sql.query "DELETE FROM session WHERE expire_at < @expireAt" | ||||
|                 |> Sql.parameters [ expireParam now ] | ||||
|                 |> Sql.executeNonQueryAsync | ||||
|             lastPurge <- now | ||||
|     } | ||||
|      | ||||
|     /// Remove a cache entry | ||||
|     let removeEntry key = backgroundTask { | ||||
|         let! _ = | ||||
|             Sql.connect connStr | ||||
|             |> Sql.query "DELETE FROM session WHERE id = @id" | ||||
|             |> Sql.parameters [ "@id", Sql.string key ] | ||||
|             |> Sql.executeNonQueryAsync | ||||
|         () | ||||
|     } | ||||
|      | ||||
|     /// Save an entry | ||||
|     let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask { | ||||
|         let now = getNow () | ||||
|         let expireAt, slideExp, absExp = | ||||
|             if opts.SlidingExpiration.HasValue then | ||||
|                 let slide = Duration.FromTimeSpan opts.SlidingExpiration.Value | ||||
|                 now.Plus slide, Some slide, None | ||||
|             elif opts.AbsoluteExpiration.HasValue then | ||||
|                 let exp = Instant.FromDateTimeOffset opts.AbsoluteExpiration.Value | ||||
|                 exp, None, Some exp | ||||
|             elif opts.AbsoluteExpirationRelativeToNow.HasValue then | ||||
|                 let exp = now.Plus (Duration.FromTimeSpan opts.AbsoluteExpirationRelativeToNow.Value) | ||||
|                 exp, None, Some exp | ||||
|             else | ||||
|                 // Default to 2 hour sliding expiration | ||||
|                 let slide = Duration.FromHours 2 | ||||
|                 now.Plus slide, Some slide, None | ||||
|         let! _ = | ||||
|             Sql.connect connStr | ||||
|             |> Sql.query | ||||
|                 "INSERT INTO session ( | ||||
|                     id, payload, expire_at, sliding_expiration, absolute_expiration | ||||
|                 ) VALUES ( | ||||
|                     @id, @payload, @expireAt, @slideExp, @absExp | ||||
|                 ) ON CONFLICT (id) DO UPDATE | ||||
|                 SET payload             = EXCLUDED.payload, | ||||
|                     expire_at           = EXCLUDED.expire_at, | ||||
|                     sliding_expiration  = EXCLUDED.sliding_expiration, | ||||
|                     absolute_expiration = EXCLUDED.absolute_expiration" | ||||
|             |> Sql.parameters | ||||
|                 [   "@id",      Sql.string key | ||||
|                     "@payload", Sql.bytea payload | ||||
|                     expireParam expireAt | ||||
|                     optParam "slideExp" slideExp | ||||
|                     optParam "absExp"   absExp ] | ||||
|             |> Sql.executeNonQueryAsync | ||||
|         () | ||||
|     } | ||||
|          | ||||
|     // ~~~ IMPLEMENTATION FUNCTIONS ~~~ | ||||
|      | ||||
|     /// Retrieve the data for a cache entry | ||||
|     let get key (_ : CancellationToken) = backgroundTask { | ||||
|         match! getEntry key with | ||||
|         | Some entry -> | ||||
|             do! purge () | ||||
|             return entry.Payload | ||||
|         | None -> return null | ||||
|     } | ||||
|      | ||||
|     /// Refresh an entry | ||||
|     let refresh key (cancelToken : CancellationToken) = backgroundTask { | ||||
|         let! _ = get key cancelToken | ||||
|         () | ||||
|     } | ||||
|      | ||||
|     /// Remove an entry | ||||
|     let remove key (_ : CancellationToken) = backgroundTask { | ||||
|         do! removeEntry key | ||||
|         do! purge () | ||||
|     } | ||||
|      | ||||
|     /// Set an entry | ||||
|     let set key value options (_ : CancellationToken) = backgroundTask { | ||||
|         do! saveEntry options key value | ||||
|         do! purge () | ||||
|     } | ||||
|      | ||||
|     interface IDistributedCache with | ||||
|         member this.Get key = get key CancellationToken.None |> sync | ||||
|         member this.GetAsync (key, token) = get key token | ||||
|         member this.Refresh key = refresh key CancellationToken.None |> sync | ||||
|         member this.RefreshAsync (key, token) = refresh key token | ||||
|         member this.Remove key = remove key CancellationToken.None |> sync | ||||
|         member this.RemoveAsync (key, token) = remove key token | ||||
|         member this.Set (key, value, options) = set key value options CancellationToken.None |> sync | ||||
|         member this.SetAsync (key, value, options, token) = set key value options token | ||||
| 
 | ||||
| @ -7,11 +7,12 @@ | ||||
|   <ItemGroup> | ||||
|     <Compile Include="Entities.fs" /> | ||||
|     <Compile Include="Access.fs" /> | ||||
|     <Compile Include="DistributedCache.fs" /> | ||||
|   </ItemGroup> | ||||
| 
 | ||||
|   <ItemGroup> | ||||
|     <PackageReference Include="Giraffe" Version="6.0.0" /> | ||||
|     <PackageReference Include="NodaTime" Version="3.1.1" /> | ||||
|     <PackageReference Include="NodaTime" Version="3.1.2" /> | ||||
|     <PackageReference Update="FSharp.Core" Version="6.0.5" /> | ||||
|     <PackageReference Include="Npgsql.FSharp" Version="5.3.0" /> | ||||
|     <PackageReference Include="Npgsql.NodaTime" Version="6.0.6" /> | ||||
|  | ||||
| @ -16,7 +16,7 @@ | ||||
| 
 | ||||
|   <ItemGroup> | ||||
|     <PackageReference Include="Expecto" Version="9.0.4" /> | ||||
|     <PackageReference Include="NodaTime.Testing" Version="3.1.1" /> | ||||
|     <PackageReference Include="NodaTime.Testing" Version="3.1.2" /> | ||||
|     <PackageReference Update="FSharp.Core" Version="6.0.5" /> | ||||
|   </ItemGroup> | ||||
| 
 | ||||
|  | ||||
| @ -35,13 +35,13 @@ module Configure = | ||||
|         (ctx.Configuration.GetSection >> opts.Configure >> ignore) "Kestrel" | ||||
| 
 | ||||
|     open System.Globalization | ||||
|     open System.IO | ||||
|     open Microsoft.AspNetCore.Authentication.Cookies | ||||
|     open Microsoft.AspNetCore.Localization | ||||
|     open Microsoft.Extensions.Caching.Distributed | ||||
|     open Microsoft.Extensions.DependencyInjection | ||||
|     open NeoSmart.Caching.Sqlite | ||||
|     open NodaTime | ||||
|     open Npgsql | ||||
|     open PrayerTracker.Data | ||||
|      | ||||
|     /// Configure ASP.NET Core's service collection (dependency injection container) | ||||
|     let services (svc : IServiceCollection) = | ||||
| @ -63,7 +63,10 @@ module Configure = | ||||
|                     opts.SlidingExpiration <- true | ||||
|                     opts.AccessDeniedPath  <- "/error/403") | ||||
|         let _ = svc.AddAuthorization () | ||||
|         let _ = svc.AddSqliteCache (fun opts -> opts.CachePath <- Path.Combine (".", "session.db")) | ||||
|         let _ = | ||||
|             svc.AddSingleton<IDistributedCache> (fun sp -> | ||||
|                 let cfg = sp.GetService<IConfiguration> () | ||||
|                 DistributedCache (cfg.GetConnectionString "PrayerTracker") :> IDistributedCache) | ||||
|         let _ = svc.AddSession () | ||||
|         let _ = svc.AddAntiforgery () | ||||
|         let _ = svc.AddRouting () | ||||
|  | ||||
| @ -26,7 +26,6 @@ | ||||
|   <ItemGroup> | ||||
|     <PackageReference Include="Giraffe" Version="6.0.0" /> | ||||
|     <PackageReference Include="Giraffe.Htmx" Version="1.8.0" /> | ||||
|     <PackageReference Include="NeoSmart.Caching.Sqlite" Version="6.0.1" /> | ||||
|     <PackageReference Include="NodaTime.Serialization.JsonNet" Version="3.0.0" /> | ||||
|     <PackageReference Update="FSharp.Core" Version="6.0.5" /> | ||||
|   </ItemGroup> | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user