Add first cut of data migration
This commit is contained in:
parent
6e31f83c82
commit
28b116925b
@ -26,6 +26,8 @@ Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "JobsJobsJobs\Doma
|
||||
EndProject
|
||||
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Api", "JobsJobsJobs\Api\Api.fsproj", "{8F5A3D1E-562B-4F27-9787-6CB14B35E69E}"
|
||||
EndProject
|
||||
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "DataMigrate", "JobsJobsJobs\DataMigrate\DataMigrate.fsproj", "{C5774E4F-2930-4B64-8407-77BF7EB79F39}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
@ -52,6 +54,10 @@ Global
|
||||
{8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{8F5A3D1E-562B-4F27-9787-6CB14B35E69E}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{C5774E4F-2930-4B64-8407-77BF7EB79F39}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{C5774E4F-2930-4B64-8407-77BF7EB79F39}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{C5774E4F-2930-4B64-8407-77BF7EB79F39}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{C5774E4F-2930-4B64-8407-77BF7EB79F39}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
@ -62,5 +68,6 @@ Global
|
||||
GlobalSection(NestedProjects) = preSolution
|
||||
{C81278DA-DA97-4E55-AB39-4B88565B615D} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF}
|
||||
{8F5A3D1E-562B-4F27-9787-6CB14B35E69E} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF}
|
||||
{C5774E4F-2930-4B64-8407-77BF7EB79F39} = {FA833B24-B8F6-4CE6-A044-99257EAC02FF}
|
||||
EndGlobalSection
|
||||
EndGlobal
|
||||
|
@ -26,13 +26,12 @@ open Microsoft.Extensions.Logging
|
||||
|
||||
/// Configure dependency injection
|
||||
let configureServices (svc : IServiceCollection) =
|
||||
svc.AddGiraffe()
|
||||
.AddSingleton<IClock>(SystemClock.Instance)
|
||||
.AddLogging ()
|
||||
|> ignore
|
||||
svc.AddGiraffe () |> ignore
|
||||
svc.AddSingleton<IClock> SystemClock.Instance |> ignore
|
||||
svc.AddLogging () |> ignore
|
||||
let svcs = svc.BuildServiceProvider()
|
||||
let cfg = svcs.GetRequiredService<IConfiguration>().GetSection "Rethink"
|
||||
let log = svcs.GetRequiredService<ILoggerFactory>().CreateLogger "Data.Startup"
|
||||
let cfg = svcs.GetRequiredService<IConfiguration>().GetSection "Rethink"
|
||||
let log = svcs.GetRequiredService<ILoggerFactory>().CreateLogger "Data.Startup"
|
||||
let conn = Data.Startup.createConnection cfg log
|
||||
svc.AddSingleton conn |> ignore
|
||||
Data.Startup.establishEnvironment cfg log conn |> Data.awaitIgnore
|
||||
|
@ -1,14 +1,11 @@
|
||||
/// Data access functions for Jobs, Jobs, Jobs
|
||||
module JobsJobsJobs.Api.Data
|
||||
|
||||
open JobsJobsJobs.Domain
|
||||
open FSharp.Control.Tasks
|
||||
open JobsJobsJobs.Domain.Types
|
||||
open Polly
|
||||
open RethinkDb.Driver
|
||||
open RethinkDb.Driver.Net
|
||||
open Microsoft.Extensions.Configuration
|
||||
open FSharp.Control.Tasks
|
||||
open Microsoft.Extensions.Logging
|
||||
|
||||
/// Shorthand for the RethinkDB R variable (how every command starts)
|
||||
let private r = RethinkDB.R
|
||||
@ -20,6 +17,7 @@ let awaitIgnore x = x |> Async.AwaitTask |> Async.RunSynchronously |> ignore
|
||||
/// JSON converters used with RethinkDB persistence
|
||||
module Converters =
|
||||
|
||||
open JobsJobsJobs.Domain
|
||||
open Microsoft.FSharpLu.Json
|
||||
open Newtonsoft.Json
|
||||
open System
|
||||
@ -106,8 +104,11 @@ module Table =
|
||||
[<RequireQualifiedAccess>]
|
||||
module Startup =
|
||||
|
||||
open Microsoft.Extensions.Configuration
|
||||
open Microsoft.Extensions.Logging
|
||||
|
||||
/// Create a RethinkDB connection
|
||||
let createConnection (cfg : IConfigurationSection) (log : ILogger)=
|
||||
let createConnection (cfg : IConfigurationSection) (log : ILogger) =
|
||||
|
||||
// Add all required JSON converters
|
||||
Converters.all ()
|
||||
@ -167,3 +168,106 @@ module Startup =
|
||||
do! ensureIndexes Table.Profile [ "continentId" ]
|
||||
do! ensureIndexes Table.Success [ "citizenId" ]
|
||||
}
|
||||
|
||||
|
||||
/// Determine if a record type (not nullable) is null
|
||||
let toOption x = match x |> box |> isNull with true -> None | false -> Some x
|
||||
|
||||
/// A retry policy where we will reconnect to RethinkDB if it has gone away
|
||||
let withReconn (conn : IConnection) =
|
||||
Policy
|
||||
.Handle<ReqlDriverError>()
|
||||
.RetryAsync(System.Action<exn, int>(fun ex _ ->
|
||||
printf "Encountered RethinkDB exception: %s" ex.Message
|
||||
match ex.Message.Contains "socket" with
|
||||
| true ->
|
||||
printf "Reconnecting to RethinkDB"
|
||||
(conn :?> Connection).Reconnect()
|
||||
| false -> ()))
|
||||
|
||||
|
||||
/// Citizen data access functions
|
||||
[<RequireQualifiedAccess>]
|
||||
module Citizen =
|
||||
|
||||
/// Find a citizen by their ID
|
||||
let findById (citizenId : CitizenId) conn = task {
|
||||
let! citizen =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Citizen)
|
||||
.Get(citizenId)
|
||||
.RunResultAsync<Citizen> conn)
|
||||
return toOption citizen
|
||||
}
|
||||
|
||||
/// Find a citizen by their No Agenda Social username
|
||||
let findByNaUser (naUser : string) conn = task {
|
||||
let! citizen =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Citizen)
|
||||
.GetAll(naUser).OptArg("index", "naUser").Nth(0)
|
||||
.RunResultAsync<Citizen> conn)
|
||||
return toOption citizen
|
||||
}
|
||||
|
||||
/// Add a citizen
|
||||
let add (citizen : Citizen) conn = task {
|
||||
let! _ =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Citizen)
|
||||
.Insert(citizen)
|
||||
.RunWriteAsync conn)
|
||||
()
|
||||
}
|
||||
|
||||
|
||||
/// Profile data access functions
|
||||
[<RequireQualifiedAccess>]
|
||||
module Profile =
|
||||
|
||||
/// Find a profile by citizen ID
|
||||
let findById (citizenId : CitizenId) conn = task {
|
||||
let! profile =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Profile)
|
||||
.Get(citizenId)
|
||||
.RunResultAsync<Profile> conn)
|
||||
return toOption profile
|
||||
}
|
||||
|
||||
/// Insert or update a profile
|
||||
let save (profile : Profile) conn = task {
|
||||
let! _ =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Profile)
|
||||
.Get(profile.id)
|
||||
.Replace(profile)
|
||||
.RunWriteAsync conn)
|
||||
()
|
||||
}
|
||||
|
||||
|
||||
/// Success story data access functions
|
||||
[<RequireQualifiedAccess>]
|
||||
module Success =
|
||||
|
||||
/// Find a success report by its ID
|
||||
let findById (successId : SuccessId) conn = task {
|
||||
let! success =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Success)
|
||||
.Get(successId)
|
||||
.RunResultAsync<Success> conn)
|
||||
return toOption success
|
||||
}
|
||||
|
||||
/// Insert or update a success story
|
||||
let save (success : Success) conn = task {
|
||||
let! _ =
|
||||
withReconn(conn).ExecuteAsync(fun () ->
|
||||
r.Table(Table.Success)
|
||||
.Get(success.id)
|
||||
.Replace(success)
|
||||
.RunWriteAsync conn)
|
||||
()
|
||||
}
|
||||
|
23
src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj
Normal file
23
src/JobsJobsJobs/DataMigrate/DataMigrate.fsproj
Normal file
@ -0,0 +1,23 @@
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
|
||||
<PropertyGroup>
|
||||
<OutputType>Exe</OutputType>
|
||||
<TargetFramework>net5.0</TargetFramework>
|
||||
<WarnOn>3390;$(WarnOn)</WarnOn>
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<Compile Include="Program.fs" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<ProjectReference Include="..\Server\JobsJobsJobs.Server.csproj" />
|
||||
<ProjectReference Include="..\Api\Api.fsproj" />
|
||||
<ProjectReference Include="..\Domain\Domain.fsproj" />
|
||||
</ItemGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Npgsql" Version="5.0.7" />
|
||||
</ItemGroup>
|
||||
|
||||
</Project>
|
133
src/JobsJobsJobs/DataMigrate/Program.fs
Normal file
133
src/JobsJobsJobs/DataMigrate/Program.fs
Normal file
@ -0,0 +1,133 @@
|
||||
// Learn more about F# at http://docs.microsoft.com/dotnet/fsharp
|
||||
|
||||
open FSharp.Control.Tasks
|
||||
open System
|
||||
open System.Linq
|
||||
open JobsJobsJobs.Api.Data
|
||||
open JobsJobsJobs.Domain
|
||||
open JobsJobsJobs.Server.Data
|
||||
open Microsoft.EntityFrameworkCore
|
||||
open Microsoft.Extensions.Hosting
|
||||
open Microsoft.Extensions.DependencyInjection
|
||||
open Microsoft.Extensions.Logging
|
||||
open RethinkDb.Driver.Net
|
||||
|
||||
/// Create the host (reads configuration and initializes both databases)
|
||||
let createHostBuilder argv =
|
||||
Host.CreateDefaultBuilder(argv)
|
||||
.ConfigureServices(
|
||||
Action<HostBuilderContext, IServiceCollection> (
|
||||
fun hostCtx svcs ->
|
||||
svcs.AddSingleton hostCtx.Configuration |> ignore
|
||||
// PostgreSQL via EF Core
|
||||
svcs.AddDbContext<JobsDbContext>(fun options ->
|
||||
options.UseNpgsql(hostCtx.Configuration.["ConnectionStrings:JobsDb"],
|
||||
fun o -> o.UseNodaTime() |> ignore)
|
||||
|> ignore)
|
||||
|> ignore
|
||||
// RethinkDB
|
||||
let cfg = hostCtx.Configuration.GetSection "Rethink"
|
||||
let log = svcs.BuildServiceProvider().GetRequiredService<ILoggerFactory>().CreateLogger "Data"
|
||||
let conn = Startup.createConnection cfg log
|
||||
svcs.AddSingleton conn |> ignore
|
||||
Startup.establishEnvironment cfg log conn |> awaitIgnore
|
||||
))
|
||||
.Build()
|
||||
|
||||
[<EntryPoint>]
|
||||
let main argv =
|
||||
let host = createHostBuilder argv
|
||||
let r = RethinkDb.Driver.RethinkDB.R
|
||||
|
||||
use db = host.Services.GetRequiredService<JobsDbContext> ()
|
||||
let conn = host.Services.GetRequiredService<IConnection> ()
|
||||
|
||||
task {
|
||||
// Migrate continents
|
||||
let mutable continentXref = Map.empty<string, Types.ContinentId>
|
||||
let! continents = db.Continents.AsNoTracking().ToListAsync ()
|
||||
let reContinents =
|
||||
continents
|
||||
|> Seq.map (fun c ->
|
||||
let reContinentId = ContinentId.create ()
|
||||
continentXref <- continentXref.Add (string c.Id, reContinentId)
|
||||
let it : Types.Continent = {
|
||||
id = reContinentId
|
||||
name = c.Name
|
||||
}
|
||||
it)
|
||||
|> List.ofSeq
|
||||
let! _ = r.Table(Table.Continent).Insert(reContinents).RunWriteAsync conn
|
||||
|
||||
// Migrate citizens
|
||||
let mutable citizenXref = Map.empty<string, Types.CitizenId>
|
||||
let! citizens = db.Citizens.AsNoTracking().ToListAsync ()
|
||||
let reCitizens =
|
||||
citizens
|
||||
|> Seq.map (fun c ->
|
||||
let reCitizenId = CitizenId.create ()
|
||||
citizenXref <- citizenXref.Add (string c.Id, reCitizenId)
|
||||
let it : Types.Citizen = {
|
||||
id = reCitizenId
|
||||
naUser = c.NaUser
|
||||
displayName = Option.ofObj c.DisplayName
|
||||
realName = Option.ofObj c.RealName
|
||||
profileUrl = c.ProfileUrl
|
||||
joinedOn = c.JoinedOn
|
||||
lastSeenOn = c.LastSeenOn
|
||||
}
|
||||
it)
|
||||
let! _ = r.Table(Table.Citizen).Insert(reCitizens).RunWriteAsync conn
|
||||
|
||||
// Migrate profile information (includes skills)
|
||||
let! profiles = db.Profiles.AsNoTracking().ToListAsync ()
|
||||
let reProfiles =
|
||||
profiles
|
||||
|> Seq.map (fun p ->
|
||||
let skills = db.Skills.AsNoTracking().Where(fun s -> s.CitizenId = p.Id).ToList ()
|
||||
let reSkills =
|
||||
skills
|
||||
|> Seq.map (fun skill ->
|
||||
let it : Types.Skill = {
|
||||
id = SkillId.create()
|
||||
description = skill.Description
|
||||
notes = Option.ofObj skill.Notes
|
||||
}
|
||||
it)
|
||||
|> List.ofSeq
|
||||
let it : Types.Profile = {
|
||||
id = citizenXref.[string p.Id]
|
||||
seekingEmployment = p.SeekingEmployment
|
||||
isPublic = p.IsPublic
|
||||
continentId = continentXref.[string p.ContinentId]
|
||||
region = p.Region
|
||||
remoteWork = p.RemoteWork
|
||||
fullTime = p.FullTime
|
||||
biography = Types.Text (string p.Biography)
|
||||
lastUpdatedOn = p.LastUpdatedOn
|
||||
experience = match p.Experience with null -> None | x -> (string >> Types.Text >> Some) x
|
||||
skills = reSkills
|
||||
}
|
||||
it)
|
||||
let! _ = r.Table(Table.Profile).Insert(reProfiles).RunWriteAsync conn
|
||||
|
||||
// Migrate success stories
|
||||
let! successes = db.Successes.AsNoTracking().ToListAsync ()
|
||||
let reSuccesses =
|
||||
successes
|
||||
|> Seq.map (fun s ->
|
||||
let it : Types.Success = {
|
||||
id = SuccessId.create ()
|
||||
citizenId = citizenXref.[string s.CitizenId]
|
||||
recordedOn = s.RecordedOn
|
||||
fromHere = s.FromHere
|
||||
source = "profile"
|
||||
story = match s.Story with null -> None | x -> (string >> Types.Text >> Some) x
|
||||
}
|
||||
it)
|
||||
let! _ = r.Table(Table.Success).Insert(reSuccesses).RunWriteAsync conn
|
||||
()
|
||||
}
|
||||
|> awaitIgnore
|
||||
|
||||
0
|
@ -10,6 +10,7 @@ open System
|
||||
type CitizenId = CitizenId of Guid
|
||||
|
||||
/// A user of Jobs, Jobs, Jobs
|
||||
[<CLIMutable; NoComparison; NoEquality>]
|
||||
type Citizen = {
|
||||
/// The ID of the user
|
||||
id : CitizenId
|
||||
@ -32,6 +33,7 @@ type Citizen = {
|
||||
type ContinentId = ContinentId of Guid
|
||||
|
||||
/// A continent
|
||||
[<CLIMutable; NoComparison; NoEquality>]
|
||||
type Continent = {
|
||||
/// The ID of the continent
|
||||
id : ContinentId
|
||||
@ -48,6 +50,7 @@ type MarkdownString = Text of string
|
||||
type ListingId = ListingId of Guid
|
||||
|
||||
/// A job listing
|
||||
[<CLIMutable; NoComparison; NoEquality>]
|
||||
type Listing = {
|
||||
/// The ID of the job listing
|
||||
id : ListingId
|
||||
@ -91,6 +94,7 @@ type Skill = {
|
||||
|
||||
|
||||
/// A job seeker profile
|
||||
[<CLIMutable; NoComparison; NoEquality>]
|
||||
type Profile = {
|
||||
/// The ID of the citizen to whom this profile belongs
|
||||
id : CitizenId
|
||||
@ -120,6 +124,7 @@ type Profile = {
|
||||
type SuccessId = SuccessId of Guid
|
||||
|
||||
/// A record of success finding employment
|
||||
[<CLIMutable; NoComparison; NoEquality>]
|
||||
type Success = {
|
||||
/// The ID of the success report
|
||||
id : SuccessId
|
||||
|
Loading…
Reference in New Issue
Block a user