V2 #36
|
@ -2,6 +2,7 @@ namespace MyWebLog.Data.Postgres
|
|||
|
||||
open System.Threading
|
||||
open System.Threading.Tasks
|
||||
open BitBadger.Npgsql.FSharp.Documents
|
||||
open Microsoft.Extensions.Caching.Distributed
|
||||
open NodaTime
|
||||
open Npgsql.FSharp
|
||||
|
@ -39,35 +40,30 @@ module private Helpers =
|
|||
typedParam "expireAt"
|
||||
|
||||
|
||||
open Npgsql
|
||||
|
||||
/// A distributed cache implementation in PostgreSQL used to handle sessions for myWebLog
|
||||
type DistributedCache (dataSource : NpgsqlDataSource) =
|
||||
type DistributedCache () =
|
||||
|
||||
// ~~~ INITIALIZATION ~~~
|
||||
|
||||
do
|
||||
task {
|
||||
let! exists =
|
||||
Sql.fromDataSource dataSource
|
||||
Configuration.dataSource ()
|
||||
|> Sql.fromDataSource
|
||||
|> Sql.query $"
|
||||
SELECT EXISTS
|
||||
(SELECT 1 FROM pg_tables WHERE schemaname = 'public' AND tablename = 'session')
|
||||
AS {existsName}"
|
||||
|> Sql.executeRowAsync Map.toExists
|
||||
if not exists then
|
||||
let! _ =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query
|
||||
do! Custom.nonQuery
|
||||
"CREATE TABLE session (
|
||||
id TEXT NOT NULL PRIMARY KEY,
|
||||
payload BYTEA NOT NULL,
|
||||
expire_at TIMESTAMPTZ NOT NULL,
|
||||
sliding_expiration INTERVAL,
|
||||
absolute_expiration TIMESTAMPTZ);
|
||||
CREATE INDEX idx_session_expiration ON session (expire_at)"
|
||||
|> Sql.executeNonQueryAsync
|
||||
()
|
||||
CREATE INDEX idx_session_expiration ON session (expire_at)" []
|
||||
} |> sync
|
||||
|
||||
// ~~~ SUPPORT FUNCTIONS ~~~
|
||||
|
@ -76,16 +72,13 @@ type DistributedCache (dataSource : NpgsqlDataSource) =
|
|||
let getEntry key = backgroundTask {
|
||||
let idParam = "@id", Sql.string key
|
||||
let! tryEntry =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query "SELECT * FROM session WHERE id = @id"
|
||||
|> Sql.parameters [ idParam ]
|
||||
|> Sql.executeAsync (fun row ->
|
||||
Custom.single "SELECT * FROM session WHERE id = @id" [ idParam ]
|
||||
(fun row ->
|
||||
{ Id = row.string "id"
|
||||
Payload = row.bytea "payload"
|
||||
ExpireAt = row.fieldValue<Instant> "expire_at"
|
||||
SlidingExpiration = row.fieldValueOrNone<Duration> "sliding_expiration"
|
||||
AbsoluteExpiration = row.fieldValueOrNone<Instant> "absolute_expiration" })
|
||||
|> tryHead
|
||||
match tryEntry with
|
||||
| Some entry ->
|
||||
let now = getNow ()
|
||||
|
@ -98,11 +91,8 @@ type DistributedCache (dataSource : NpgsqlDataSource) =
|
|||
true, { entry with ExpireAt = absExp }
|
||||
else true, { entry with ExpireAt = now.Plus slideExp }
|
||||
if needsRefresh then
|
||||
let! _ =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query "UPDATE session SET expire_at = @expireAt WHERE id = @id"
|
||||
|> Sql.parameters [ expireParam item.ExpireAt; idParam ]
|
||||
|> Sql.executeNonQueryAsync
|
||||
do! Custom.nonQuery "UPDATE session SET expire_at = @expireAt WHERE id = @id"
|
||||
[ expireParam item.ExpireAt; idParam ]
|
||||
()
|
||||
return if item.ExpireAt > now then Some entry else None
|
||||
| None -> return None
|
||||
|
@ -115,26 +105,16 @@ type DistributedCache (dataSource : NpgsqlDataSource) =
|
|||
let purge () = backgroundTask {
|
||||
let now = getNow ()
|
||||
if lastPurge.Plus (Duration.FromMinutes 30L) < now then
|
||||
let! _ =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query "DELETE FROM session WHERE expire_at < @expireAt"
|
||||
|> Sql.parameters [ expireParam now ]
|
||||
|> Sql.executeNonQueryAsync
|
||||
do! Custom.nonQuery "DELETE FROM session WHERE expire_at < @expireAt" [ expireParam now ]
|
||||
lastPurge <- now
|
||||
}
|
||||
|
||||
/// Remove a cache entry
|
||||
let removeEntry key = backgroundTask {
|
||||
let! _ =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query "DELETE FROM session WHERE id = @id"
|
||||
|> Sql.parameters [ "@id", Sql.string key ]
|
||||
|> Sql.executeNonQueryAsync
|
||||
()
|
||||
}
|
||||
let removeEntry key =
|
||||
Delete.byId "session" key
|
||||
|
||||
/// Save an entry
|
||||
let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask {
|
||||
let saveEntry (opts : DistributedCacheEntryOptions) key payload =
|
||||
let now = getNow ()
|
||||
let expireAt, slideExp, absExp =
|
||||
if opts.SlidingExpiration.HasValue then
|
||||
|
@ -150,9 +130,7 @@ type DistributedCache (dataSource : NpgsqlDataSource) =
|
|||
// Default to 1 hour sliding expiration
|
||||
let slide = Duration.FromHours 1
|
||||
now.Plus slide, Some slide, None
|
||||
let! _ =
|
||||
Sql.fromDataSource dataSource
|
||||
|> Sql.query
|
||||
Custom.nonQuery
|
||||
"INSERT INTO session (
|
||||
id, payload, expire_at, sliding_expiration, absolute_expiration
|
||||
) VALUES (
|
||||
|
@ -162,15 +140,11 @@ type DistributedCache (dataSource : NpgsqlDataSource) =
|
|||
expire_at = EXCLUDED.expire_at,
|
||||
sliding_expiration = EXCLUDED.sliding_expiration,
|
||||
absolute_expiration = EXCLUDED.absolute_expiration"
|
||||
|> Sql.parameters
|
||||
[ "@id", Sql.string key
|
||||
"@payload", Sql.bytea payload
|
||||
expireParam expireAt
|
||||
optParam "slideExp" slideExp
|
||||
optParam "absExp" absExp ]
|
||||
|> Sql.executeNonQueryAsync
|
||||
()
|
||||
}
|
||||
|
||||
// ~~~ IMPLEMENTATION FUNCTIONS ~~~
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
namespace MyWebLog.Data
|
||||
|
||||
open Microsoft.Extensions.Logging
|
||||
open BitBadger.Npgsql.Documents
|
||||
open BitBadger.Npgsql.FSharp.Documents
|
||||
open Microsoft.Extensions.Logging
|
||||
open MyWebLog
|
||||
open MyWebLog.Data.Postgres
|
||||
open Newtonsoft.Json
|
||||
|
@ -139,6 +139,23 @@ type PostgresData (source : NpgsqlDataSource, log : ILogger<PostgresData>, ser :
|
|||
let migrate version = backgroundTask {
|
||||
match version with
|
||||
| Some "v2-rc2" -> ()
|
||||
| Some "v2" ->
|
||||
printfn "** MANUAL DATABASE UPGRADE REQUIRED **\n"
|
||||
printfn "The data structure for PostgreSQL changed significantly between v2-rc2 and v2."
|
||||
printfn "To migrate your data:"
|
||||
printfn " - Using a v2-rc2 executable, back up each web log"
|
||||
printfn " - Drop all tables from the database"
|
||||
printfn " - Using this executable, restore each backup"
|
||||
|
||||
let! webLogs =
|
||||
Configuration.dataSource ()
|
||||
|> Sql.fromDataSource
|
||||
|> Sql.query $"SELECT url_base FROM {Table.WebLog}"
|
||||
|> Sql.executeAsync (fun row -> row.string "url_base")
|
||||
|
||||
printfn "\nCommands to back up all web logs:"
|
||||
webLogs |> List.iter (printfn "myWebLog backup %s")
|
||||
exit 1
|
||||
// Future versions will be inserted here
|
||||
| Some _
|
||||
| None ->
|
||||
|
|
|
@ -10,7 +10,7 @@ type WebLogMiddleware (next : RequestDelegate, log : ILogger<WebLogMiddleware>)
|
|||
/// Is the debug level enabled on the logger?
|
||||
let isDebug = log.IsEnabled LogLevel.Debug
|
||||
|
||||
member this.InvokeAsync (ctx : HttpContext) = task {
|
||||
member _.InvokeAsync (ctx : HttpContext) = task {
|
||||
/// Create the full path of the request
|
||||
let path = $"{ctx.Request.Scheme}://{ctx.Request.Host.Value}{ctx.Request.Path.Value}"
|
||||
match WebLogCache.tryGet path with
|
||||
|
@ -165,8 +165,8 @@ let rec main args =
|
|||
DataImplementation.createNpgsqlDataSource (sp.GetRequiredService<IConfiguration> ()))
|
||||
let _ = builder.Services.AddSingleton<IData> postgres
|
||||
let _ =
|
||||
builder.Services.AddSingleton<IDistributedCache> (fun sp ->
|
||||
Postgres.DistributedCache (sp.GetRequiredService<NpgsqlDataSource> ()) :> IDistributedCache)
|
||||
builder.Services.AddSingleton<IDistributedCache> (fun _ ->
|
||||
Postgres.DistributedCache () :> IDistributedCache)
|
||||
()
|
||||
| _ -> ()
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user