Use Custom funcs for SQL calls
This commit is contained in:
parent
fbbb15027f
commit
465d18b81f
|
@ -33,9 +33,8 @@ let saveSecurityInfo (security : SecurityInfo) =
|
||||||
/// Purge expired tokens
|
/// Purge expired tokens
|
||||||
let private purgeExpiredTokens now = backgroundTask {
|
let private purgeExpiredTokens now = backgroundTask {
|
||||||
let! info =
|
let! info =
|
||||||
dataSource ()
|
Custom.list $"{Query.selectFromTable Table.SecurityInfo} WHERE data ->> 'tokenExpires' IS NOT NULL" []
|
||||||
|> Sql.query $"{Query.selectFromTable Table.SecurityInfo} WHERE data ->> 'tokenExpires' IS NOT NULL"
|
fromData<SecurityInfo>
|
||||||
|> Sql.executeAsync fromData<SecurityInfo>
|
|
||||||
for expired in info |> List.filter (fun it -> it.TokenExpires.Value < now) do
|
for expired in info |> List.filter (fun it -> it.TokenExpires.Value < now) do
|
||||||
if expired.TokenUsage.Value = "confirm" then
|
if expired.TokenUsage.Value = "confirm" then
|
||||||
// Unconfirmed account; delete the entire thing
|
// Unconfirmed account; delete the entire thing
|
||||||
|
@ -66,7 +65,8 @@ let save citizen =
|
||||||
let register (citizen : Citizen) (security : SecurityInfo) = backgroundTask {
|
let register (citizen : Citizen) (security : SecurityInfo) = backgroundTask {
|
||||||
try
|
try
|
||||||
let! _ =
|
let! _ =
|
||||||
dataSource ()
|
Configuration.dataSource ()
|
||||||
|
|> Sql.fromDataSource
|
||||||
|> Sql.executeTransactionAsync
|
|> Sql.executeTransactionAsync
|
||||||
[ Query.save Table.Citizen, [ Query.docParameters (CitizenId.toString citizen.Id) citizen ]
|
[ Query.save Table.Citizen, [ Query.docParameters (CitizenId.toString citizen.Id) citizen ]
|
||||||
Query.save Table.SecurityInfo, [ Query.docParameters (CitizenId.toString citizen.Id) security ]
|
Query.save Table.SecurityInfo, [ Query.docParameters (CitizenId.toString citizen.Id) security ]
|
||||||
|
|
|
@ -35,12 +35,6 @@ module private CacheHelpers =
|
||||||
/// Get the current instant
|
/// Get the current instant
|
||||||
let getNow () = SystemClock.Instance.GetCurrentInstant ()
|
let getNow () = SystemClock.Instance.GetCurrentInstant ()
|
||||||
|
|
||||||
/// Get the first result of the given query
|
|
||||||
let tryHead<'T> (query : Task<'T list>) = backgroundTask {
|
|
||||||
let! results = query
|
|
||||||
return List.tryHead results
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a parameter for a non-standard type
|
/// Create a parameter for a non-standard type
|
||||||
let typedParam<'T> name (it : 'T) =
|
let typedParam<'T> name (it : 'T) =
|
||||||
$"@%s{name}", Sql.parameter (NpgsqlParameter ($"@{name}", it))
|
$"@%s{name}", Sql.parameter (NpgsqlParameter ($"@{name}", it))
|
||||||
|
@ -56,6 +50,7 @@ module private CacheHelpers =
|
||||||
|
|
||||||
|
|
||||||
open System.Threading
|
open System.Threading
|
||||||
|
open BitBadger.Npgsql.FSharp.Documents
|
||||||
open JobsJobsJobs.Common.Data
|
open JobsJobsJobs.Common.Data
|
||||||
open Microsoft.Extensions.Caching.Distributed
|
open Microsoft.Extensions.Caching.Distributed
|
||||||
|
|
||||||
|
@ -69,46 +64,38 @@ type DistributedCache () =
|
||||||
|
|
||||||
do
|
do
|
||||||
task {
|
task {
|
||||||
let dataSource = dataSource ()
|
|
||||||
let! exists =
|
let! exists =
|
||||||
dataSource
|
Custom.scalar
|
||||||
|> Sql.query $"
|
$"SELECT EXISTS
|
||||||
SELECT EXISTS
|
|
||||||
(SELECT 1 FROM pg_tables WHERE schemaname = 'jjj' AND tablename = 'session')
|
(SELECT 1 FROM pg_tables WHERE schemaname = 'jjj' AND tablename = 'session')
|
||||||
AS does_exist"
|
AS does_exist"
|
||||||
|> Sql.executeRowAsync (fun row -> row.bool "does_exist")
|
[] (fun row -> row.bool "does_exist")
|
||||||
if not exists then
|
if not exists then
|
||||||
let! _ =
|
do! Custom.nonQuery
|
||||||
dataSource
|
|
||||||
|> Sql.query
|
|
||||||
"CREATE TABLE jjj.session (
|
"CREATE TABLE jjj.session (
|
||||||
id TEXT NOT NULL PRIMARY KEY,
|
id TEXT NOT NULL PRIMARY KEY,
|
||||||
payload BYTEA NOT NULL,
|
payload BYTEA NOT NULL,
|
||||||
expire_at TIMESTAMPTZ NOT NULL,
|
expire_at TIMESTAMPTZ NOT NULL,
|
||||||
sliding_expiration INTERVAL,
|
sliding_expiration INTERVAL,
|
||||||
absolute_expiration TIMESTAMPTZ);
|
absolute_expiration TIMESTAMPTZ);
|
||||||
CREATE INDEX idx_session_expiration ON jjj.session (expire_at)"
|
CREATE INDEX idx_session_expiration ON jjj.session (expire_at)" []
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
} |> sync
|
} |> sync
|
||||||
|
|
||||||
// ~~~ SUPPORT FUNCTIONS ~~~
|
// ~~~ SUPPORT FUNCTIONS ~~~
|
||||||
|
|
||||||
/// Get an entry, updating it for sliding expiration
|
/// Get an entry, updating it for sliding expiration
|
||||||
let getEntry key = backgroundTask {
|
let getEntry key = backgroundTask {
|
||||||
let dataSource = dataSource ()
|
|
||||||
let idParam = "@id", Sql.string key
|
let idParam = "@id", Sql.string key
|
||||||
let! tryEntry =
|
let! tryEntry =
|
||||||
dataSource
|
Custom.single
|
||||||
|> Sql.query "SELECT * FROM jjj.session WHERE id = @id"
|
"SELECT * FROM jjj.session WHERE id = @id" [ idParam ]
|
||||||
|> Sql.parameters [ idParam ]
|
(fun row ->
|
||||||
|> Sql.executeAsync (fun row ->
|
|
||||||
{ Id = row.string "id"
|
{ Id = row.string "id"
|
||||||
Payload = row.bytea "payload"
|
Payload = row.bytea "payload"
|
||||||
ExpireAt = row.fieldValue<Instant> "expire_at"
|
ExpireAt = row.fieldValue<Instant> "expire_at"
|
||||||
SlidingExpiration = row.fieldValueOrNone<Duration> "sliding_expiration"
|
SlidingExpiration = row.fieldValueOrNone<Duration> "sliding_expiration"
|
||||||
AbsoluteExpiration = row.fieldValueOrNone<Instant> "absolute_expiration" })
|
AbsoluteExpiration = row.fieldValueOrNone<Instant> "absolute_expiration"
|
||||||
|> tryHead
|
})
|
||||||
match tryEntry with
|
match tryEntry with
|
||||||
| Some entry ->
|
| Some entry ->
|
||||||
let now = getNow ()
|
let now = getNow ()
|
||||||
|
@ -121,12 +108,9 @@ type DistributedCache () =
|
||||||
true, { entry with ExpireAt = absExp }
|
true, { entry with ExpireAt = absExp }
|
||||||
else true, { entry with ExpireAt = now.Plus slideExp }
|
else true, { entry with ExpireAt = now.Plus slideExp }
|
||||||
if needsRefresh then
|
if needsRefresh then
|
||||||
let! _ =
|
do! Custom.nonQuery
|
||||||
dataSource
|
"UPDATE jjj.session SET expire_at = @expireAt WHERE id = @id"
|
||||||
|> Sql.query "UPDATE jjj.session SET expire_at = @expireAt WHERE id = @id"
|
[ expireParam item.ExpireAt; idParam ]
|
||||||
|> Sql.parameters [ expireParam item.ExpireAt; idParam ]
|
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
return if item.ExpireAt > now then Some entry else None
|
return if item.ExpireAt > now then Some entry else None
|
||||||
| None -> return None
|
| None -> return None
|
||||||
}
|
}
|
||||||
|
@ -138,23 +122,13 @@ type DistributedCache () =
|
||||||
let purge () = backgroundTask {
|
let purge () = backgroundTask {
|
||||||
let now = getNow ()
|
let now = getNow ()
|
||||||
if lastPurge.Plus (Duration.FromMinutes 30L) < now then
|
if lastPurge.Plus (Duration.FromMinutes 30L) < now then
|
||||||
let! _ =
|
do! Custom.nonQuery "DELETE FROM jjj.session WHERE expire_at < @expireAt" [ expireParam now ]
|
||||||
dataSource ()
|
|
||||||
|> Sql.query "DELETE FROM jjj.session WHERE expire_at < @expireAt"
|
|
||||||
|> Sql.parameters [ expireParam now ]
|
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
lastPurge <- now
|
lastPurge <- now
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Remove a cache entry
|
/// Remove a cache entry
|
||||||
let removeEntry key = backgroundTask {
|
let removeEntry key =
|
||||||
let! _ =
|
Custom.nonQuery "DELETE FROM jjj.session WHERE id = @id" [ "@id", Sql.string key ]
|
||||||
dataSource ()
|
|
||||||
|> Sql.query "DELETE FROM jjj.session WHERE id = @id"
|
|
||||||
|> Sql.parameters [ "@id", Sql.string key ]
|
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Save an entry
|
/// Save an entry
|
||||||
let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask {
|
let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask {
|
||||||
|
@ -173,9 +147,7 @@ type DistributedCache () =
|
||||||
// Default to 1 hour sliding expiration
|
// Default to 1 hour sliding expiration
|
||||||
let slide = Duration.FromHours 1
|
let slide = Duration.FromHours 1
|
||||||
now.Plus slide, Some slide, None
|
now.Plus slide, Some slide, None
|
||||||
let! _ =
|
do! Custom.nonQuery
|
||||||
dataSource ()
|
|
||||||
|> Sql.query
|
|
||||||
"INSERT INTO jjj.session (
|
"INSERT INTO jjj.session (
|
||||||
id, payload, expire_at, sliding_expiration, absolute_expiration
|
id, payload, expire_at, sliding_expiration, absolute_expiration
|
||||||
) VALUES (
|
) VALUES (
|
||||||
|
@ -185,14 +157,11 @@ type DistributedCache () =
|
||||||
expire_at = EXCLUDED.expire_at,
|
expire_at = EXCLUDED.expire_at,
|
||||||
sliding_expiration = EXCLUDED.sliding_expiration,
|
sliding_expiration = EXCLUDED.sliding_expiration,
|
||||||
absolute_expiration = EXCLUDED.absolute_expiration"
|
absolute_expiration = EXCLUDED.absolute_expiration"
|
||||||
|> Sql.parameters
|
|
||||||
[ "@id", Sql.string key
|
[ "@id", Sql.string key
|
||||||
"@payload", Sql.bytea payload
|
"@payload", Sql.bytea payload
|
||||||
expireParam expireAt
|
expireParam expireAt
|
||||||
optParam "slideExp" slideExp
|
optParam "slideExp" slideExp
|
||||||
optParam "absExp" absExp ]
|
optParam "absExp" absExp ]
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ~~~ IMPLEMENTATION FUNCTIONS ~~~
|
// ~~~ IMPLEMENTATION FUNCTIONS ~~~
|
||||||
|
|
|
@ -36,72 +36,56 @@ open Npgsql.FSharp
|
||||||
[<AutoOpen>]
|
[<AutoOpen>]
|
||||||
module DataConnection =
|
module DataConnection =
|
||||||
|
|
||||||
|
open System.Text.Json
|
||||||
|
open BitBadger.Npgsql.Documents
|
||||||
|
open JobsJobsJobs
|
||||||
open Microsoft.Extensions.Configuration
|
open Microsoft.Extensions.Configuration
|
||||||
open Npgsql
|
open Npgsql
|
||||||
|
|
||||||
/// Get the data source as the start of a SQL statement
|
|
||||||
let dataSource =
|
|
||||||
Configuration.dataSource >> Sql.fromDataSource
|
|
||||||
|
|
||||||
/// Create tables
|
/// Create tables
|
||||||
let private createTables () = backgroundTask {
|
let private createTables () = backgroundTask {
|
||||||
let! _ =
|
do! Custom.nonQuery "CREATE SCHEMA IF NOT EXISTS jjj" []
|
||||||
dataSource ()
|
|
||||||
|> Sql.query "CREATE SCHEMA IF NOT EXISTS jjj"
|
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
do! Definition.ensureTable Table.Citizen
|
do! Definition.ensureTable Table.Citizen
|
||||||
do! Definition.ensureTable Table.Continent
|
do! Definition.ensureTable Table.Continent
|
||||||
do! Definition.ensureTable Table.Listing
|
do! Definition.ensureTable Table.Listing
|
||||||
do! Definition.ensureTable Table.Success
|
do! Definition.ensureTable Table.Success
|
||||||
let sql = [
|
// Tables that use more than the default document configuration, key indexes, and text search index
|
||||||
// Tables that use more than the default document configuration
|
do! Custom.nonQuery
|
||||||
$"CREATE TABLE IF NOT EXISTS {Table.Profile} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL,
|
$"CREATE TABLE IF NOT EXISTS {Table.Profile}
|
||||||
text_search TSVECTOR NOT NULL,
|
(id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL, text_search TSVECTOR NOT NULL,
|
||||||
CONSTRAINT fk_profile_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE)"
|
CONSTRAINT fk_profile_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE);
|
||||||
$"CREATE TABLE IF NOT EXISTS {Table.SecurityInfo} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL,
|
CREATE TABLE IF NOT EXISTS {Table.SecurityInfo} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL,
|
||||||
CONSTRAINT fk_security_info_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE)"
|
CONSTRAINT fk_security_info_citizen
|
||||||
// Key indexes
|
FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE);
|
||||||
$"CREATE UNIQUE INDEX IF NOT EXISTS uk_citizen_email ON {Table.Citizen} ((data -> 'email'))"
|
CREATE UNIQUE INDEX IF NOT EXISTS uk_citizen_email ON {Table.Citizen} ((data -> 'email'));
|
||||||
$"CREATE INDEX IF NOT EXISTS idx_listing_citizen ON {Table.Listing} ((data -> 'citizenId'))"
|
CREATE INDEX IF NOT EXISTS idx_listing_citizen ON {Table.Listing} ((data -> 'citizenId'));
|
||||||
$"CREATE INDEX IF NOT EXISTS idx_listing_continent ON {Table.Listing} ((data -> 'continentId'))"
|
CREATE INDEX IF NOT EXISTS idx_listing_continent ON {Table.Listing} ((data -> 'continentId'));
|
||||||
$"CREATE INDEX IF NOT EXISTS idx_profile_continent ON {Table.Profile} ((data -> 'continentId'))"
|
CREATE INDEX IF NOT EXISTS idx_profile_continent ON {Table.Profile} ((data -> 'continentId'));
|
||||||
$"CREATE INDEX IF NOT EXISTS idx_success_citizen ON {Table.Success} ((data -> 'citizenId'))"
|
CREATE INDEX IF NOT EXISTS idx_success_citizen ON {Table.Success} ((data -> 'citizenId'));
|
||||||
// Profile text search index
|
CREATE INDEX IF NOT EXISTS idx_profile_search ON {Table.Profile} USING GIN(text_search)"
|
||||||
$"CREATE INDEX IF NOT EXISTS idx_profile_search ON {Table.Profile} USING GIN(text_search)"
|
[]
|
||||||
]
|
|
||||||
let! _ =
|
|
||||||
dataSource ()
|
|
||||||
|> Sql.executeTransactionAsync (sql |> List.map (fun sql -> sql, [ [] ]))
|
|
||||||
()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create functions and triggers required to
|
/// Create functions and triggers required to keep the search index current
|
||||||
let createTriggers () = backgroundTask {
|
let private createTriggers () = backgroundTask {
|
||||||
let! functions =
|
let! functions =
|
||||||
dataSource ()
|
Custom.list
|
||||||
|> Sql.query
|
|
||||||
"SELECT p.proname
|
"SELECT p.proname
|
||||||
FROM pg_catalog.pg_proc p
|
FROM pg_catalog.pg_proc p
|
||||||
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
|
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace
|
||||||
WHERE n.nspname = 'jjj'"
|
WHERE n.nspname = 'jjj'"
|
||||||
|> Sql.executeAsync (fun row -> row.string "proname")
|
[] (fun row -> row.string "proname")
|
||||||
if not (functions |> List.contains "indexable_array_string") then
|
if not (functions |> List.contains "indexable_array_string") then
|
||||||
let! _ =
|
do! Custom.nonQuery
|
||||||
dataSource ()
|
"""CREATE FUNCTION jjj.indexable_array_string(target jsonb, path jsonpath) RETURNS text AS $$
|
||||||
|> Sql.query """
|
|
||||||
CREATE FUNCTION jjj.indexable_array_string(target jsonb, path jsonpath) RETURNS text AS $$
|
|
||||||
BEGIN
|
BEGIN
|
||||||
RETURN REPLACE(REPLACE(REPLACE(REPLACE(jsonb_path_query_array(target, path)::text,
|
RETURN REPLACE(REPLACE(REPLACE(REPLACE(jsonb_path_query_array(target, path)::text,
|
||||||
'["', ''), '", "', ' '), '"]', ''), '[]', '');
|
'["', ''), '", "', ' '), '"]', ''), '[]', '');
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;"""
|
$$ LANGUAGE plpgsql;""" []
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
if not (functions |> List.contains "set_text_search") then
|
if not (functions |> List.contains "set_text_search") then
|
||||||
let! _ =
|
do! Custom.nonQuery
|
||||||
dataSource ()
|
$"CREATE FUNCTION jjj.set_text_search() RETURNS trigger AS $$
|
||||||
|> Sql.query $"
|
|
||||||
CREATE FUNCTION jjj.set_text_search() RETURNS trigger AS $$
|
|
||||||
BEGIN
|
BEGIN
|
||||||
NEW.text_search := to_tsvector('english',
|
NEW.text_search := to_tsvector('english',
|
||||||
COALESCE(NEW.data ->> 'region', '') || ' '
|
COALESCE(NEW.data ->> 'region', '') || ' '
|
||||||
|
@ -115,9 +99,7 @@ module DataConnection =
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
CREATE TRIGGER set_text_search BEFORE INSERT OR UPDATE ON {Table.Profile}
|
CREATE TRIGGER set_text_search BEFORE INSERT OR UPDATE ON {Table.Profile}
|
||||||
FOR EACH ROW EXECUTE FUNCTION jjj.set_text_search();"
|
FOR EACH ROW EXECUTE FUNCTION jjj.set_text_search();" []
|
||||||
|> Sql.executeNonQueryAsync
|
|
||||||
()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Set up the data connection from the given configuration
|
/// Set up the data connection from the given configuration
|
||||||
|
@ -125,6 +107,11 @@ module DataConnection =
|
||||||
let builder = NpgsqlDataSourceBuilder (cfg.GetConnectionString "PostgreSQL")
|
let builder = NpgsqlDataSourceBuilder (cfg.GetConnectionString "PostgreSQL")
|
||||||
let _ = builder.UseNodaTime ()
|
let _ = builder.UseNodaTime ()
|
||||||
Configuration.useDataSource (builder.Build ())
|
Configuration.useDataSource (builder.Build ())
|
||||||
|
Configuration.useSerializer
|
||||||
|
{ new IDocumentSerializer with
|
||||||
|
member _.Serialize<'T> (it : 'T) = JsonSerializer.Serialize (it, Json.options)
|
||||||
|
member _.Deserialize<'T> (it : string) = JsonSerializer.Deserialize<'T> (it, Json.options)
|
||||||
|
}
|
||||||
do! createTables ()
|
do! createTables ()
|
||||||
do! createTriggers ()
|
do! createTriggers ()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user