diff --git a/src/JobsJobsJobs/Citizens/Data.fs b/src/JobsJobsJobs/Citizens/Data.fs index 4e2aae0..edbb074 100644 --- a/src/JobsJobsJobs/Citizens/Data.fs +++ b/src/JobsJobsJobs/Citizens/Data.fs @@ -33,9 +33,8 @@ let saveSecurityInfo (security : SecurityInfo) = /// Purge expired tokens let private purgeExpiredTokens now = backgroundTask { let! info = - dataSource () - |> Sql.query $"{Query.selectFromTable Table.SecurityInfo} WHERE data ->> 'tokenExpires' IS NOT NULL" - |> Sql.executeAsync fromData + Custom.list $"{Query.selectFromTable Table.SecurityInfo} WHERE data ->> 'tokenExpires' IS NOT NULL" [] + fromData for expired in info |> List.filter (fun it -> it.TokenExpires.Value < now) do if expired.TokenUsage.Value = "confirm" then // Unconfirmed account; delete the entire thing @@ -66,7 +65,8 @@ let save citizen = let register (citizen : Citizen) (security : SecurityInfo) = backgroundTask { try let! _ = - dataSource () + Configuration.dataSource () + |> Sql.fromDataSource |> Sql.executeTransactionAsync [ Query.save Table.Citizen, [ Query.docParameters (CitizenId.toString citizen.Id) citizen ] Query.save Table.SecurityInfo, [ Query.docParameters (CitizenId.toString citizen.Id) security ] diff --git a/src/JobsJobsJobs/Common/Cache.fs b/src/JobsJobsJobs/Common/Cache.fs index c7ef8f7..1a98b90 100644 --- a/src/JobsJobsJobs/Common/Cache.fs +++ b/src/JobsJobsJobs/Common/Cache.fs @@ -35,12 +35,6 @@ module private CacheHelpers = /// Get the current instant let getNow () = SystemClock.Instance.GetCurrentInstant () - /// Get the first result of the given query - let tryHead<'T> (query : Task<'T list>) = backgroundTask { - let! results = query - return List.tryHead results - } - /// Create a parameter for a non-standard type let typedParam<'T> name (it : 'T) = $"@%s{name}", Sql.parameter (NpgsqlParameter ($"@{name}", it)) @@ -56,6 +50,7 @@ module private CacheHelpers = open System.Threading +open BitBadger.Npgsql.FSharp.Documents open JobsJobsJobs.Common.Data open Microsoft.Extensions.Caching.Distributed @@ -69,46 +64,38 @@ type DistributedCache () = do task { - let dataSource = dataSource () let! exists = - dataSource - |> Sql.query $" - SELECT EXISTS + Custom.scalar + $"SELECT EXISTS (SELECT 1 FROM pg_tables WHERE schemaname = 'jjj' AND tablename = 'session') AS does_exist" - |> Sql.executeRowAsync (fun row -> row.bool "does_exist") + [] (fun row -> row.bool "does_exist") if not exists then - let! _ = - dataSource - |> Sql.query + do! Custom.nonQuery "CREATE TABLE jjj.session ( id TEXT NOT NULL PRIMARY KEY, payload BYTEA NOT NULL, expire_at TIMESTAMPTZ NOT NULL, sliding_expiration INTERVAL, absolute_expiration TIMESTAMPTZ); - CREATE INDEX idx_session_expiration ON jjj.session (expire_at)" - |> Sql.executeNonQueryAsync - () + CREATE INDEX idx_session_expiration ON jjj.session (expire_at)" [] } |> sync // ~~~ SUPPORT FUNCTIONS ~~~ /// Get an entry, updating it for sliding expiration let getEntry key = backgroundTask { - let dataSource = dataSource () let idParam = "@id", Sql.string key let! tryEntry = - dataSource - |> Sql.query "SELECT * FROM jjj.session WHERE id = @id" - |> Sql.parameters [ idParam ] - |> Sql.executeAsync (fun row -> - { Id = row.string "id" - Payload = row.bytea "payload" - ExpireAt = row.fieldValue "expire_at" - SlidingExpiration = row.fieldValueOrNone "sliding_expiration" - AbsoluteExpiration = row.fieldValueOrNone "absolute_expiration" }) - |> tryHead + Custom.single + "SELECT * FROM jjj.session WHERE id = @id" [ idParam ] + (fun row -> + { Id = row.string "id" + Payload = row.bytea "payload" + ExpireAt = row.fieldValue "expire_at" + SlidingExpiration = row.fieldValueOrNone "sliding_expiration" + AbsoluteExpiration = row.fieldValueOrNone "absolute_expiration" + }) match tryEntry with | Some entry -> let now = getNow () @@ -121,12 +108,9 @@ type DistributedCache () = true, { entry with ExpireAt = absExp } else true, { entry with ExpireAt = now.Plus slideExp } if needsRefresh then - let! _ = - dataSource - |> Sql.query "UPDATE jjj.session SET expire_at = @expireAt WHERE id = @id" - |> Sql.parameters [ expireParam item.ExpireAt; idParam ] - |> Sql.executeNonQueryAsync - () + do! Custom.nonQuery + "UPDATE jjj.session SET expire_at = @expireAt WHERE id = @id" + [ expireParam item.ExpireAt; idParam ] return if item.ExpireAt > now then Some entry else None | None -> return None } @@ -138,23 +122,13 @@ type DistributedCache () = let purge () = backgroundTask { let now = getNow () if lastPurge.Plus (Duration.FromMinutes 30L) < now then - let! _ = - dataSource () - |> Sql.query "DELETE FROM jjj.session WHERE expire_at < @expireAt" - |> Sql.parameters [ expireParam now ] - |> Sql.executeNonQueryAsync + do! Custom.nonQuery "DELETE FROM jjj.session WHERE expire_at < @expireAt" [ expireParam now ] lastPurge <- now } /// Remove a cache entry - let removeEntry key = backgroundTask { - let! _ = - dataSource () - |> Sql.query "DELETE FROM jjj.session WHERE id = @id" - |> Sql.parameters [ "@id", Sql.string key ] - |> Sql.executeNonQueryAsync - () - } + let removeEntry key = + Custom.nonQuery "DELETE FROM jjj.session WHERE id = @id" [ "@id", Sql.string key ] /// Save an entry let saveEntry (opts : DistributedCacheEntryOptions) key payload = backgroundTask { @@ -173,9 +147,7 @@ type DistributedCache () = // Default to 1 hour sliding expiration let slide = Duration.FromHours 1 now.Plus slide, Some slide, None - let! _ = - dataSource () - |> Sql.query + do! Custom.nonQuery "INSERT INTO jjj.session ( id, payload, expire_at, sliding_expiration, absolute_expiration ) VALUES ( @@ -185,14 +157,11 @@ type DistributedCache () = expire_at = EXCLUDED.expire_at, sliding_expiration = EXCLUDED.sliding_expiration, absolute_expiration = EXCLUDED.absolute_expiration" - |> Sql.parameters [ "@id", Sql.string key "@payload", Sql.bytea payload expireParam expireAt optParam "slideExp" slideExp optParam "absExp" absExp ] - |> Sql.executeNonQueryAsync - () } // ~~~ IMPLEMENTATION FUNCTIONS ~~~ diff --git a/src/JobsJobsJobs/Common/Data.fs b/src/JobsJobsJobs/Common/Data.fs index fdbad61..b41d176 100644 --- a/src/JobsJobsJobs/Common/Data.fs +++ b/src/JobsJobsJobs/Common/Data.fs @@ -36,72 +36,56 @@ open Npgsql.FSharp [] module DataConnection = + open System.Text.Json + open BitBadger.Npgsql.Documents + open JobsJobsJobs open Microsoft.Extensions.Configuration open Npgsql - /// Get the data source as the start of a SQL statement - let dataSource = - Configuration.dataSource >> Sql.fromDataSource - /// Create tables let private createTables () = backgroundTask { - let! _ = - dataSource () - |> Sql.query "CREATE SCHEMA IF NOT EXISTS jjj" - |> Sql.executeNonQueryAsync + do! Custom.nonQuery "CREATE SCHEMA IF NOT EXISTS jjj" [] do! Definition.ensureTable Table.Citizen do! Definition.ensureTable Table.Continent do! Definition.ensureTable Table.Listing do! Definition.ensureTable Table.Success - let sql = [ - // Tables that use more than the default document configuration - $"CREATE TABLE IF NOT EXISTS {Table.Profile} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL, - text_search TSVECTOR NOT NULL, - CONSTRAINT fk_profile_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE)" - $"CREATE TABLE IF NOT EXISTS {Table.SecurityInfo} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL, - CONSTRAINT fk_security_info_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE)" - // Key indexes - $"CREATE UNIQUE INDEX IF NOT EXISTS uk_citizen_email ON {Table.Citizen} ((data -> 'email'))" - $"CREATE INDEX IF NOT EXISTS idx_listing_citizen ON {Table.Listing} ((data -> 'citizenId'))" - $"CREATE INDEX IF NOT EXISTS idx_listing_continent ON {Table.Listing} ((data -> 'continentId'))" - $"CREATE INDEX IF NOT EXISTS idx_profile_continent ON {Table.Profile} ((data -> 'continentId'))" - $"CREATE INDEX IF NOT EXISTS idx_success_citizen ON {Table.Success} ((data -> 'citizenId'))" - // Profile text search index - $"CREATE INDEX IF NOT EXISTS idx_profile_search ON {Table.Profile} USING GIN(text_search)" - ] - let! _ = - dataSource () - |> Sql.executeTransactionAsync (sql |> List.map (fun sql -> sql, [ [] ])) - () + // Tables that use more than the default document configuration, key indexes, and text search index + do! Custom.nonQuery + $"CREATE TABLE IF NOT EXISTS {Table.Profile} + (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL, text_search TSVECTOR NOT NULL, + CONSTRAINT fk_profile_citizen FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE); + CREATE TABLE IF NOT EXISTS {Table.SecurityInfo} (id TEXT NOT NULL PRIMARY KEY, data JSONB NOT NULL, + CONSTRAINT fk_security_info_citizen + FOREIGN KEY (id) REFERENCES {Table.Citizen} (id) ON DELETE CASCADE); + CREATE UNIQUE INDEX IF NOT EXISTS uk_citizen_email ON {Table.Citizen} ((data -> 'email')); + CREATE INDEX IF NOT EXISTS idx_listing_citizen ON {Table.Listing} ((data -> 'citizenId')); + CREATE INDEX IF NOT EXISTS idx_listing_continent ON {Table.Listing} ((data -> 'continentId')); + CREATE INDEX IF NOT EXISTS idx_profile_continent ON {Table.Profile} ((data -> 'continentId')); + CREATE INDEX IF NOT EXISTS idx_success_citizen ON {Table.Success} ((data -> 'citizenId')); + CREATE INDEX IF NOT EXISTS idx_profile_search ON {Table.Profile} USING GIN(text_search)" + [] } - /// Create functions and triggers required to - let createTriggers () = backgroundTask { + /// Create functions and triggers required to keep the search index current + let private createTriggers () = backgroundTask { let! functions = - dataSource () - |> Sql.query + Custom.list "SELECT p.proname FROM pg_catalog.pg_proc p LEFT JOIN pg_catalog.pg_namespace n ON n.oid = p.pronamespace WHERE n.nspname = 'jjj'" - |> Sql.executeAsync (fun row -> row.string "proname") + [] (fun row -> row.string "proname") if not (functions |> List.contains "indexable_array_string") then - let! _ = - dataSource () - |> Sql.query """ - CREATE FUNCTION jjj.indexable_array_string(target jsonb, path jsonpath) RETURNS text AS $$ + do! Custom.nonQuery + """CREATE FUNCTION jjj.indexable_array_string(target jsonb, path jsonpath) RETURNS text AS $$ BEGIN RETURN REPLACE(REPLACE(REPLACE(REPLACE(jsonb_path_query_array(target, path)::text, '["', ''), '", "', ' '), '"]', ''), '[]', ''); END; - $$ LANGUAGE plpgsql;""" - |> Sql.executeNonQueryAsync - () + $$ LANGUAGE plpgsql;""" [] if not (functions |> List.contains "set_text_search") then - let! _ = - dataSource () - |> Sql.query $" - CREATE FUNCTION jjj.set_text_search() RETURNS trigger AS $$ + do! Custom.nonQuery + $"CREATE FUNCTION jjj.set_text_search() RETURNS trigger AS $$ BEGIN NEW.text_search := to_tsvector('english', COALESCE(NEW.data ->> 'region', '') || ' ' @@ -115,9 +99,7 @@ module DataConnection = END; $$ LANGUAGE plpgsql; CREATE TRIGGER set_text_search BEFORE INSERT OR UPDATE ON {Table.Profile} - FOR EACH ROW EXECUTE FUNCTION jjj.set_text_search();" - |> Sql.executeNonQueryAsync - () + FOR EACH ROW EXECUTE FUNCTION jjj.set_text_search();" [] } /// Set up the data connection from the given configuration @@ -125,6 +107,11 @@ module DataConnection = let builder = NpgsqlDataSourceBuilder (cfg.GetConnectionString "PostgreSQL") let _ = builder.UseNodaTime () Configuration.useDataSource (builder.Build ()) + Configuration.useSerializer + { new IDocumentSerializer with + member _.Serialize<'T> (it : 'T) = JsonSerializer.Serialize (it, Json.options) + member _.Deserialize<'T> (it : string) = JsonSerializer.Deserialize<'T> (it, Json.options) + } do! createTables () do! createTriggers () }