From 28b116925b5959df5ac27fb12405487112858d28 Mon Sep 17 00:00:00 2001 From: "Daniel J. Summers" Date: Tue, 6 Jul 2021 23:05:36 -0400 Subject: [PATCH] Add first cut of data migration --- src/JobsJobsJobs.sln | 7 + src/JobsJobsJobs/Api/App.fs | 11 +- src/JobsJobsJobs/Api/Data.fs | 114 ++++++++++++++- .../DataMigrate/DataMigrate.fsproj | 23 +++ src/JobsJobsJobs/DataMigrate/Program.fs | 133 ++++++++++++++++++ src/JobsJobsJobs/Domain/Types.fs | 5 + 6 files changed, 282 insertions(+), 11 deletions(-) create mode 100644 src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj create mode 100644 src/JobsJobsJobs/DataMigrate/Program.fs diff --git a/src/JobsJobsJobs.sln b/src/JobsJobsJobs.sln index 91046a9..44a3c9d 100644 --- a/src/JobsJobsJobs.sln +++ b/src/JobsJobsJobs.sln @@ -26,6 +26,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "JobsJobsJobs\Doma EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Api", "JobsJobsJobs\Api\Api.fsproj", "{8F5A3D1E-562B-4F27-9787-6CB14B35E69E}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "DataMigrate", "JobsJobsJobs\DataMigrate\DataMigrate.fsproj", "{C5774E4F-2930-4B64-8407-77BF7EB79F39}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -52,6 +54,10 @@ Global {8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Debug|Any CPU.Build.0 = Debug|Any CPU {8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Release|Any CPU.ActiveCfg = Release|Any CPU {8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Release|Any CPU.Build.0 = Release|Any CPU + {C5774E4F-2930-4B64-8407-77BF7EB79F39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C5774E4F-2930-4B64-8407-77BF7EB79F39}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C5774E4F-2930-4B64-8407-77BF7EB79F39}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C5774E4F-2930-4B64-8407-77BF7EB79F39}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -62,5 +68,6 @@ Global GlobalSection(NestedProjects) = preSolution {C81278DA-DA97-4E55-AB39-4B88565B615D} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF} {8F5A3D1E-562B-4F27-9787-6CB14B35E69E} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF} + {C5774E4F-2930-4B64-8407-77BF7EB79F39} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF} EndGlobalSection EndGlobal diff --git a/src/JobsJobsJobs/Api/App.fs b/src/JobsJobsJobs/Api/App.fs index 887b689..bbcbc72 100644 --- a/src/JobsJobsJobs/Api/App.fs +++ b/src/JobsJobsJobs/Api/App.fs @@ -26,13 +26,12 @@ open Microsoft.Extensions.Logging /// Configure dependency injection let configureServices (svc : IServiceCollection) = - svc.AddGiraffe() - .AddSingleton(SystemClock.Instance) - .AddLogging () - |> ignore + svc.AddGiraffe () |> ignore + svc.AddSingleton SystemClock.Instance |> ignore + svc.AddLogging () |> ignore let svcs = svc.BuildServiceProvider() - let cfg = svcs.GetRequiredService().GetSection "Rethink" - let log = svcs.GetRequiredService().CreateLogger "Data.Startup" + let cfg = svcs.GetRequiredService().GetSection "Rethink" + let log = svcs.GetRequiredService().CreateLogger "Data.Startup" let conn = Data.Startup.createConnection cfg log svc.AddSingleton conn |> ignore Data.Startup.establishEnvironment cfg log conn |> Data.awaitIgnore diff --git a/src/JobsJobsJobs/Api/Data.fs b/src/JobsJobsJobs/Api/Data.fs index 7ede6f9..6075914 100644 --- a/src/JobsJobsJobs/Api/Data.fs +++ b/src/JobsJobsJobs/Api/Data.fs @@ -1,14 +1,11 @@ /// Data access functions for Jobs, Jobs, Jobs module JobsJobsJobs.Api.Data -open JobsJobsJobs.Domain +open FSharp.Control.Tasks open JobsJobsJobs.Domain.Types open Polly open RethinkDb.Driver open RethinkDb.Driver.Net -open Microsoft.Extensions.Configuration -open FSharp.Control.Tasks -open Microsoft.Extensions.Logging /// Shorthand for the RethinkDB R variable (how every command starts) let private r = RethinkDB.R @@ -20,6 +17,7 @@ let awaitIgnore x = x |> Async.AwaitTask |> Async.RunSynchronously |> ignore /// JSON converters used with RethinkDB persistence module Converters = + open JobsJobsJobs.Domain open Microsoft.FSharpLu.Json open Newtonsoft.Json open System @@ -106,8 +104,11 @@ module Table = [] module Startup = + open Microsoft.Extensions.Configuration + open Microsoft.Extensions.Logging + /// Create a RethinkDB connection - let createConnection (cfg : IConfigurationSection) (log : ILogger)= + let createConnection (cfg : IConfigurationSection) (log : ILogger) = // Add all required JSON converters Converters.all () @@ -167,3 +168,106 @@ module Startup = do! ensureIndexes Table.Profile [ "continentId" ] do! ensureIndexes Table.Success [ "citizenId" ] } + + +/// Determine if a record type (not nullable) is null +let toOption x = match x |> box |> isNull with true -> None | false -> Some x + +/// A retry policy where we will reconnect to RethinkDB if it has gone away +let withReconn (conn : IConnection) = + Policy + .Handle() + .RetryAsync(System.Action(fun ex _ -> + printf "Encountered RethinkDB exception: %s" ex.Message + match ex.Message.Contains "socket" with + | true -> + printf "Reconnecting to RethinkDB" + (conn :?> Connection).Reconnect() + | false -> ())) + + +/// Citizen data access functions +[] +module Citizen = + + /// Find a citizen by their ID + let findById (citizenId : CitizenId) conn = task { + let! citizen = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Citizen) + .Get(citizenId) + .RunResultAsync conn) + return toOption citizen + } + + /// Find a citizen by their No Agenda Social username + let findByNaUser (naUser : string) conn = task { + let! citizen = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Citizen) + .GetAll(naUser).OptArg("index", "naUser").Nth(0) + .RunResultAsync conn) + return toOption citizen + } + + /// Add a citizen + let add (citizen : Citizen) conn = task { + let! _ = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Citizen) + .Insert(citizen) + .RunWriteAsync conn) + () + } + + +/// Profile data access functions +[] +module Profile = + + /// Find a profile by citizen ID + let findById (citizenId : CitizenId) conn = task { + let! profile = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Profile) + .Get(citizenId) + .RunResultAsync conn) + return toOption profile + } + + /// Insert or update a profile + let save (profile : Profile) conn = task { + let! _ = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Profile) + .Get(profile.id) + .Replace(profile) + .RunWriteAsync conn) + () + } + + +/// Success story data access functions +[] +module Success = + + /// Find a success report by its ID + let findById (successId : SuccessId) conn = task { + let! success = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Success) + .Get(successId) + .RunResultAsync conn) + return toOption success + } + + /// Insert or update a success story + let save (success : Success) conn = task { + let! _ = + withReconn(conn).ExecuteAsync(fun () -> + r.Table(Table.Success) + .Get(success.id) + .Replace(success) + .RunWriteAsync conn) + () + } diff --git a/src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj b/src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj new file mode 100644 index 0000000..3481d11 --- /dev/null +++ b/src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj @@ -0,0 +1,23 @@ + + + + Exe + net5.0 + 3390;$(WarnOn) + + + + + + + + + + + + + + + + + diff --git a/src/JobsJobsJobs/DataMigrate/Program.fs b/src/JobsJobsJobs/DataMigrate/Program.fs new file mode 100644 index 0000000..a11ea96 --- /dev/null +++ b/src/JobsJobsJobs/DataMigrate/Program.fs @@ -0,0 +1,133 @@ +// Learn more about F# at http://docs.microsoft.com/dotnet/fsharp + +open FSharp.Control.Tasks +open System +open System.Linq +open JobsJobsJobs.Api.Data +open JobsJobsJobs.Domain +open JobsJobsJobs.Server.Data +open Microsoft.EntityFrameworkCore +open Microsoft.Extensions.Hosting +open Microsoft.Extensions.DependencyInjection +open Microsoft.Extensions.Logging +open RethinkDb.Driver.Net + +/// Create the host (reads configuration and initializes both databases) +let createHostBuilder argv = + Host.CreateDefaultBuilder(argv) + .ConfigureServices( + Action ( + fun hostCtx svcs -> + svcs.AddSingleton hostCtx.Configuration |> ignore + // PostgreSQL via EF Core + svcs.AddDbContext(fun options -> + options.UseNpgsql(hostCtx.Configuration.["ConnectionStrings:JobsDb"], + fun o -> o.UseNodaTime() |> ignore) + |> ignore) + |> ignore + // RethinkDB + let cfg = hostCtx.Configuration.GetSection "Rethink" + let log = svcs.BuildServiceProvider().GetRequiredService().CreateLogger "Data" + let conn = Startup.createConnection cfg log + svcs.AddSingleton conn |> ignore + Startup.establishEnvironment cfg log conn |> awaitIgnore + )) + .Build() + +[] +let main argv = + let host = createHostBuilder argv + let r = RethinkDb.Driver.RethinkDB.R + + use db = host.Services.GetRequiredService () + let conn = host.Services.GetRequiredService () + + task { + // Migrate continents + let mutable continentXref = Map.empty + let! continents = db.Continents.AsNoTracking().ToListAsync () + let reContinents = + continents + |> Seq.map (fun c -> + let reContinentId = ContinentId.create () + continentXref <- continentXref.Add (string c.Id, reContinentId) + let it : Types.Continent = { + id = reContinentId + name = c.Name + } + it) + |> List.ofSeq + let! _ = r.Table(Table.Continent).Insert(reContinents).RunWriteAsync conn + + // Migrate citizens + let mutable citizenXref = Map.empty + let! citizens = db.Citizens.AsNoTracking().ToListAsync () + let reCitizens = + citizens + |> Seq.map (fun c -> + let reCitizenId = CitizenId.create () + citizenXref <- citizenXref.Add (string c.Id, reCitizenId) + let it : Types.Citizen = { + id = reCitizenId + naUser = c.NaUser + displayName = Option.ofObj c.DisplayName + realName = Option.ofObj c.RealName + profileUrl = c.ProfileUrl + joinedOn = c.JoinedOn + lastSeenOn = c.LastSeenOn + } + it) + let! _ = r.Table(Table.Citizen).Insert(reCitizens).RunWriteAsync conn + + // Migrate profile information (includes skills) + let! profiles = db.Profiles.AsNoTracking().ToListAsync () + let reProfiles = + profiles + |> Seq.map (fun p -> + let skills = db.Skills.AsNoTracking().Where(fun s -> s.CitizenId = p.Id).ToList () + let reSkills = + skills + |> Seq.map (fun skill -> + let it : Types.Skill = { + id = SkillId.create() + description = skill.Description + notes = Option.ofObj skill.Notes + } + it) + |> List.ofSeq + let it : Types.Profile = { + id = citizenXref.[string p.Id] + seekingEmployment = p.SeekingEmployment + isPublic = p.IsPublic + continentId = continentXref.[string p.ContinentId] + region = p.Region + remoteWork = p.RemoteWork + fullTime = p.FullTime + biography = Types.Text (string p.Biography) + lastUpdatedOn = p.LastUpdatedOn + experience = match p.Experience with null -> None | x -> (string >> Types.Text >> Some) x + skills = reSkills + } + it) + let! _ = r.Table(Table.Profile).Insert(reProfiles).RunWriteAsync conn + + // Migrate success stories + let! successes = db.Successes.AsNoTracking().ToListAsync () + let reSuccesses = + successes + |> Seq.map (fun s -> + let it : Types.Success = { + id = SuccessId.create () + citizenId = citizenXref.[string s.CitizenId] + recordedOn = s.RecordedOn + fromHere = s.FromHere + source = "profile" + story = match s.Story with null -> None | x -> (string >> Types.Text >> Some) x + } + it) + let! _ = r.Table(Table.Success).Insert(reSuccesses).RunWriteAsync conn + () + } + |> awaitIgnore + + 0 \ No newline at end of file diff --git a/src/JobsJobsJobs/Domain/Types.fs b/src/JobsJobsJobs/Domain/Types.fs index fb746d0..9659e10 100644 --- a/src/JobsJobsJobs/Domain/Types.fs +++ b/src/JobsJobsJobs/Domain/Types.fs @@ -10,6 +10,7 @@ open System type CitizenId = CitizenId of Guid /// A user of Jobs, Jobs, Jobs +[] type Citizen = { /// The ID of the user id : CitizenId @@ -32,6 +33,7 @@ type Citizen = { type ContinentId = ContinentId of Guid /// A continent +[] type Continent = { /// The ID of the continent id : ContinentId @@ -48,6 +50,7 @@ type MarkdownString = Text of string type ListingId = ListingId of Guid /// A job listing +[] type Listing = { /// The ID of the job listing id : ListingId @@ -91,6 +94,7 @@ type Skill = { /// A job seeker profile +[] type Profile = { /// The ID of the citizen to whom this profile belongs id : CitizenId @@ -120,6 +124,7 @@ type Profile = { type SuccessId = SuccessId of Guid /// A record of success finding employment +[] type Success = { /// The ID of the success report id : SuccessId