From b46f2a83f06f7bb45eacb2b7501d055773c17502 Mon Sep 17 00:00:00 2001 From: "Daniel J. Summers" Date: Tue, 27 Sep 2016 22:14:10 -0500 Subject: [PATCH] A provider Still need to test before building a package --- .gitignore | 1 + .../DistributedRethinkDBCache.fs | 214 ++++++++++++++++++ .../DistributedRethinkDBCacheOptions.fs | 21 ++ .../IServiceCollectionExtensions.fs | 26 +++ src/RethinkDB.DistributedCache/project.json | 32 +++ 5 files changed, 294 insertions(+) create mode 100644 src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs create mode 100644 src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs create mode 100644 src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs create mode 100644 src/RethinkDB.DistributedCache/project.json diff --git a/.gitignore b/.gitignore index f1e3d20..46e6ab0 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ bld/ # Visual Studio 2015 cache/options directory .vs/ +.vscode/ # Uncomment if you have tasks that create the project's static files in wwwroot #wwwroot/ diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs new file mode 100644 index 0000000..9fd047e --- /dev/null +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCache.fs @@ -0,0 +1,214 @@ +namespace RethinkDB.DistributedCache + +open Microsoft.Extensions.Caching.Distributed +open Microsoft.Extensions.Logging +open Microsoft.Extensions.Options +open Newtonsoft.Json +open RethinkDb.Driver +open RethinkDb.Driver.Net +open System +open System.Text +open System.Threading.Tasks + +// H/T: Suave +[] +module AsyncExtensions = + type Microsoft.FSharp.Control.AsyncBuilder with + /// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on + /// a standard .NET task + member x.Bind(t : Task<'T>, f:'T -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f) + /// An extension method that overloads the standard 'Bind' of the 'async' builder. The new overload awaits on + /// a standard .NET task which does not commpute a value + member x.Bind(t : Task, f : unit -> Async<'R>) : Async<'R> = async.Bind(Async.AwaitTask t, f) + +/// Persistence object for a cache entry +type CacheEntry = { + /// The Id for the cache entry + [] + Id : string + /// The payload for the cache entry (as a UTF-8 string) + Payload : string + /// The ticks at which this entry expires + ExpiresAt : int64 + /// The number of seconds in the sliding expiration + SlidingExpiration : int +} + +/// Record to update sliding expiration for an entry +type SlidingExpirationUpdate = { ExpiresAt : int64 } + +/// IDistributedCache implementation utilizing RethinkDB +[] +type DistributedRethinkDBCache(options : IOptions, + log : ILogger) = + + /// RethinkDB + static let r = RethinkDB.R + + /// Whether the environment has been checked to ensure that the database, table, and relevant indexes exist + static let mutable environmentChecked = false + + do + match options with + | null | _ when isNull options.Value -> nullArg "options" + | _ when isNull options.Value.Connection -> nullArg "Connection" + | _ -> () + + /// Options + let opts = options.Value + + /// Shorthand to get the database + let database = match String.IsNullOrEmpty opts.Database with true -> r.Db() | db -> r.Db(db) + + /// Default the table name to "Cache" if it is not provided + let tableName = match String.IsNullOrEmpty opts.Database with true -> "Cache" | _ -> opts.TableName + + /// Shorthand to get the table + let table = database.Table tableName + + /// The name of the cache + let cacheName = + seq { + match String.IsNullOrEmpty opts.Database with true -> () | _ -> yield opts.Database; yield "." + yield tableName + } + |> Seq.reduce (+) + + /// Debug message + let dbug text = + match log.IsEnabled LogLevel.Debug with + | true -> text () |> sprintf "[%s] %s" cacheName |> log.LogDebug + | _ -> () + + /// Make sure the RethinkDB database, table, expiration index exist + let checkEnvironment () = + async { + match environmentChecked with + | true -> dbug <| fun () -> "Skipping environment check because it has already been performed" + | _ -> + dbug <| fun () -> "|> Checking for proper RethinkDB cache environment" + // Database + match opts.Database with + | "" -> dbug <| fun () -> " Skipping database check because it was not specified" + | db -> dbug <| fun () -> sprintf " Checking for database %s existence..." db + let! dbs = r.DbList().RunResultAsync(opts.Connection) + match dbs |> List.contains db with + | true -> () + | _ -> dbug <| fun () -> sprintf " ...creating database %s..." db + do! r.DbCreate(db).RunResultAsync(opts.Connection) + dbug <| fun () -> " ...done" + // Table + dbug <| fun () -> sprintf " Checking for table %s existence..." tableName + let! tables = database.TableList().RunResultAsync(opts.Connection) + match tables |> List.contains tableName with + | true -> () + | _ -> dbug <| fun () -> sprintf " ...creating table %s..." tableName + do! database.TableCreate(tableName).RunResultAsync(opts.Connection) + dbug <| fun () -> " ...done" + // Index + dbug <| fun () -> sprintf " Checking for index %s.ExpiresAt..." tableName + let! indexes = table.IndexList().RunResultAsync(opts.Connection) + match indexes |> List.contains "ExpiresAt" with + | true -> () + | _ -> dbug <| fun () -> sprintf " ...creating index ExpiresAt on table %s..." tableName + do! table.IndexCreate("ExpiresAt").RunResultAsync(opts.Connection) + dbug <| fun () -> " ...done" + dbug <| fun () -> "|> RethinkDB cache environment check complete. Carry on..." + environmentChecked <- true + } + + /// Remove entries from the cache that are expired + let purgeExpired () = + async { + let tix = DateTime.UtcNow.Ticks - 1L + dbug <| fun () -> sprintf "Purging expired entries (<= %i)" tix + do! table.Between(r.Minval, tix).OptArg("index", "ExpiresAt").Delete().RunResultAsync(opts.Connection) + } + + /// Calculate ticks from now for the given number of seconds + let ticksFromNow seconds = DateTime.UtcNow.Ticks + int64 (seconds * 10000000) + + /// Get the cache entry specified + let getCacheEntry (key : string) = + async { + let! entry = table.Get(key).RunResultAsync(opts.Connection) + return entry + } + + /// Refresh (update expiration based on sliding expiration) the cache entry specified + let refreshCacheEntry (entry : CacheEntry) = + async { + match entry.SlidingExpiration with + | 0 -> () + | seconds -> do! table.Get(entry.Id) + .Update({ ExpiresAt = ticksFromNow seconds }) + .RunResultAsync(opts.Connection) + } + + /// Get the payload for the cache entry + let getEntry key = + async { + do! checkEnvironment () + do! purgeExpired () + let! entry = getCacheEntry key + match box entry with + | null -> dbug <| fun () -> sprintf "Cache key %s not found" key + return null + | _ -> dbug <| fun () -> sprintf "Cache key %s found" key + do! refreshCacheEntry entry + return UTF8Encoding.UTF8.GetBytes entry.Payload + } + + /// Update the sliding expiration for a cache entry + let refreshEntry key = + async { + do! checkEnvironment () + let! entry = getCacheEntry key + match box entry with null -> () | _ -> do! refreshCacheEntry entry + do! purgeExpired () + return () + } + + /// Remove the specified cache entry + let removeEntry (key : string) = + async { + do! checkEnvironment () + do! table.Get(key).Delete().RunResultAsync(opts.Connection) + do! purgeExpired () + } + + /// Set the value of a cache entry + let setEntry key payload (options : DistributedCacheEntryOptions) = + async { + do! checkEnvironment () + do! purgeExpired () + let addExpiration entry = + match true with + | _ when options.SlidingExpiration.HasValue -> + { entry with ExpiresAt = ticksFromNow options.SlidingExpiration.Value.Seconds + SlidingExpiration = options.SlidingExpiration.Value.Seconds } + | _ when options.AbsoluteExpiration.HasValue -> + { entry with ExpiresAt = options.AbsoluteExpiration.Value.UtcTicks } + | _ when options.AbsoluteExpirationRelativeToNow.HasValue -> + { entry with ExpiresAt = ticksFromNow options.AbsoluteExpirationRelativeToNow.Value.Seconds } + | _ -> entry + let entry = { Id = key + Payload = UTF8Encoding.UTF8.GetString payload + ExpiresAt = Int64.MaxValue + SlidingExpiration = 0 } + |> addExpiration + do! match box (getCacheEntry key) with + | null -> table.Insert(entry).RunResultAsync(opts.Connection) + | _ -> table.Get(key).Replace(entry).RunResultAsync(opts.Connection) + return () + } + + interface IDistributedCache with + member this.Get key = getEntry key |> Async.RunSynchronously + member this.GetAsync key = getEntry key |> Async.StartAsTask + member this.Refresh key = refreshEntry key |> Async.RunSynchronously + member this.RefreshAsync key = refreshEntry key |> Async.StartAsTask :> Task + member this.Remove key = removeEntry key |> Async.RunSynchronously + member this.RemoveAsync key = removeEntry key |> Async.StartAsTask :> Task + member this.Set (key, value, options) = setEntry key value options |> Async.RunSynchronously + member this.SetAsync (key, value, options) = setEntry key value options |> Async.StartAsTask :> Task diff --git a/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs new file mode 100644 index 0000000..1dc837e --- /dev/null +++ b/src/RethinkDB.DistributedCache/DistributedRethinkDBCacheOptions.fs @@ -0,0 +1,21 @@ +namespace RethinkDB.DistributedCache + +open RethinkDb.Driver.Net + +/// Options to use to configure the RethinkDB cache +[] +type DistributedRethinkDBCacheOptions() = + /// The RethinkDB connection to use for caching operations + member val Connection : IConnection = null with get, set + + /// The RethinkDB database to use (leave blank for connection default) + member val Database = "" with get, set + + /// The RethinkDB table name to use for cache entries (defaults to "Cache") + member val TableName = "" with get, set + + /// Whether this configuration is valid + member this.IsValid () = + seq { + match this.Connection with null -> yield "Connection cannot be null" | _ -> () + } \ No newline at end of file diff --git a/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs b/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs new file mode 100644 index 0000000..2cc350e --- /dev/null +++ b/src/RethinkDB.DistributedCache/IServiceCollectionExtensions.fs @@ -0,0 +1,26 @@ +/// Extensions for to add the RethinkDB cache +[] +[] +module RethinkDB.DistributedCache.IServiceCollectionExtensions + +open Microsoft.Extensions.Caching.Distributed +open Microsoft.Extensions.DependencyInjection +open System + +type IServiceCollection with + + member this.AddDistributedRethinkDBCache(options : Action) = + match options with null -> nullArg "options" | _ -> () + ignore <| this.AddOptions () + ignore <| this.Configure options + ignore <| this.Add (ServiceDescriptor.Transient()) + this + +/// +/// Add RethinkDB options to the services collection +/// +/// An action to set the options for the cache +/// The given for further manipulation +[] +let AddDistributedRethinkDBCache (this : IServiceCollection, options : Action) = + this.AddDistributedRethinkDBCache options \ No newline at end of file diff --git a/src/RethinkDB.DistributedCache/project.json b/src/RethinkDB.DistributedCache/project.json new file mode 100644 index 0000000..93e5bb0 --- /dev/null +++ b/src/RethinkDB.DistributedCache/project.json @@ -0,0 +1,32 @@ +{ + "buildOptions": { + "compile": { + "includeFiles": [ + "DistributedRethinkDBCacheOptions.fs", + "DistributedRethinkDBCache.fs", + "IServiceCollectionExtensions.fs" + ] + }, + "compilerName": "fsc", + "debugType": "portable" + }, + "dependencies": { + "Microsoft.Extensions.Caching.Abstractions": "1.0.0", + "Microsoft.Extensions.Logging": "1.0.0", + "Microsoft.Extensions.Options": "1.0.0", + "Newtonsoft.Json": "9.0.1", + "RethinkDb.Driver": "2.3.15" + }, + "frameworks": { + "netstandard1.6": { + "dependencies": { + "Microsoft.FSharp.Core.netcore": "1.0.0-alpha-160831", + "NETStandard.Library": "1.6.0" + } + } + }, + "tools": { + "dotnet-compile-fsc":"1.0.0-preview2-*" + }, + "version": "0.9.0" +} \ No newline at end of file