CosmosDb store implementation.

This commit is contained in:
Nick Blakely 2020-08-08 16:31:16 -07:00
parent ee0fddbf6d
commit aa16e3507c
22 changed files with 327 additions and 55 deletions

1
.gitignore vendored
View File

@ -9,6 +9,7 @@
*.user
*.userosscache
*.sln.docstates
appsettings.Development.json
# User-specific files (MonoDevelop/Xamarin Studio)
*.userprefs

View File

@ -0,0 +1,11 @@
namespace Giants.Services.Core
{
using System;
public class DefaultDateTimeProvider : IDateTimeProvider
{
public static DefaultDateTimeProvider Instance { get; } = new DefaultDateTimeProvider();
public DateTime UtcNow => DateTime.UtcNow;
}
}

View File

@ -0,0 +1,9 @@
namespace Giants.Services.Core.Entities
{
public interface IIdentifiable
{
string id { get; }
string DocumentType { get; }
}
}

View File

@ -1,11 +1,16 @@
namespace Giants.Services
{
using System;
using Giants.Services.Core.Entities;
public class ServerInfo : DataContract.ServerInfo
public class ServerInfo : DataContract.ServerInfo, IIdentifiable
{
public string id => HostIpAddress;
public string HostIpAddress { get; set; }
public DateTime LastHeartbeat { get; set; }
public string DocumentType => nameof(ServerInfo);
}
}

View File

@ -0,0 +1,9 @@
namespace Giants.Services.Core
{
using System;
public interface IDateTimeProvider
{
DateTime UtcNow { get; }
}
}

View File

@ -1,13 +1,16 @@
namespace Giants.Services
{
using Giants.Services.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
public static class ServicesModule
{
public static void RegisterServices(IServiceCollection services)
public static void RegisterServices(IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton<IServerRegistryService, ServerRegistryService>();
services.AddSingleton<IServerRegistryStore, InMemoryServerRegistryStore>();
services.AddSingleton<IServerRegistryStore, CosmosDbServerRegistryStore>();
services.AddSingleton<IDateTimeProvider, DefaultDateTimeProvider>();
}
}
}

View File

@ -6,6 +6,8 @@
<ItemGroup>
<PackageReference Include="AutoMapper" Version="10.0.0" />
<PackageReference Include="Microsoft.Azure.Cosmos" Version="3.12.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.1.6" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.6" />
</ItemGroup>

View File

@ -10,6 +10,6 @@
{
Task<IEnumerable<ServerInfo>> GetAllServers();
Task AddServer(IPAddress ipAddress, ServerInfo server);
Task AddServer(ServerInfo server);
}
}

View File

@ -1,17 +0,0 @@
namespace Giants.Services
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
public interface IServerRegistryStore
{
public Task<IEnumerable<ServerInfo>> GetAllServerInfos();
public Task<ServerInfo> GetServerInfo(IPAddress ipAddress);
public Task UpsertServerInfo(IPAddress ipAddress, ServerInfo serverInfo);
}
}

View File

@ -2,34 +2,52 @@
{
using System;
using System.Collections.Generic;
using System.Net;
using System.Linq;
using System.Threading.Tasks;
using Giants.Services.Core;
using Microsoft.Extensions.Configuration;
public class ServerRegistryService : IServerRegistryService
{
private readonly IServerRegistryStore registryStore;
private readonly IConfiguration configuration;
private readonly IDateTimeProvider dateTimeProvider;
private readonly TimeSpan timeoutPeriod;
private readonly int maxServerCount;
public ServerRegistryService(IServerRegistryStore registryStore)
public ServerRegistryService(
IServerRegistryStore registryStore,
IConfiguration configuration,
IDateTimeProvider dateTimeProvider)
{
this.registryStore = registryStore;
this.configuration = configuration;
this.dateTimeProvider = dateTimeProvider;
this.timeoutPeriod = TimeSpan.FromMinutes(Convert.ToDouble(this.configuration["ServerTimeoutPeriodInMinutes"]));
this.maxServerCount = Convert.ToInt32(this.configuration["MaxServerCount"]);
}
public async Task AddServer(
IPAddress ipAddress,
ServerInfo server)
{
ServerInfo existingServer = await this.registryStore.GetServerInfo(ipAddress ?? throw new ArgumentNullException(nameof(ipAddress)));
if (existingServer != null)
if (server == null)
{
throw new ArgumentNullException(nameof(server));
}
await this.registryStore.UpsertServerInfo(ipAddress, server ?? throw new ArgumentNullException(nameof(server)));
if (string.IsNullOrEmpty(server.HostIpAddress))
{
throw new ArgumentException(nameof(server.HostIpAddress));
}
await this.registryStore.UpsertServerInfo(server ?? throw new ArgumentNullException(nameof(server)));
}
public async Task<IEnumerable<ServerInfo>> GetAllServers()
{
return await this.registryStore.GetAllServerInfos();
return (await this.registryStore
.GetServerInfos(c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod))
.Take(this.maxServerCount);
}
}
}

View File

@ -0,0 +1,135 @@
namespace Giants.Services
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Giants.Services.Core.Entities;
using Microsoft.Azure.Cosmos;
using Microsoft.Azure.Cosmos.Linq;
public class CosmosDbClient
{
private readonly string connectionString;
private readonly string authKeyOrResourceToken;
private readonly string databaseId;
private readonly string containerId;
private CosmosClient client;
private Container container;
public CosmosDbClient(
string connectionString,
string authKeyOrResourceToken,
string databaseId,
string containerId)
{
this.connectionString = connectionString;
this.authKeyOrResourceToken = authKeyOrResourceToken;
this.databaseId = databaseId;
this.containerId = containerId;
}
public async Task<IEnumerable<TSelect>> GetItems<T, TSelect>(
Expression<Func<T, TSelect>> selectExpression,
string partitionKey,
Expression<Func<T, bool>> whereExpression = null)
where T : IIdentifiable
{
IQueryable<T> query = this.container
.GetItemLinqQueryable<T>(requestOptions: new QueryRequestOptions
{
PartitionKey = new PartitionKey(partitionKey),
});
if (whereExpression != null)
{
query = query.Where(whereExpression);
}
var feedIteratorQuery = query
.Select(selectExpression)
.ToFeedIterator();
var items = new List<TSelect>();
while (feedIteratorQuery.HasMoreResults)
{
var results = await feedIteratorQuery.ReadNextAsync();
foreach (var result in results)
{
items.Add(result);
}
}
return items;
}
public async Task<IEnumerable<T>> GetItems<T>(
Expression<Func<T, bool>> whereExpression = null,
string partitionKey = null)
where T : IIdentifiable
{
if (partitionKey == null)
{
partitionKey = typeof(T).Name;
}
IQueryable<T> query = this.container
.GetItemLinqQueryable<T>(requestOptions: new QueryRequestOptions
{
PartitionKey = new PartitionKey(partitionKey),
});
if (whereExpression != null)
{
query = query.Where(whereExpression);
}
var feedIteratorQuery = query
.ToFeedIterator();
var items = new List<T>();
while (feedIteratorQuery.HasMoreResults)
{
var results = await feedIteratorQuery.ReadNextAsync();
foreach (var result in results)
{
items.Add(result);
}
}
return items;
}
public async Task<T> GetItemById<T>(string id, string partitionKey = null)
where T : IIdentifiable
{
return (await this.GetItems<T>(t => t.id == id, partitionKey)).FirstOrDefault();
}
public async Task UpsertItem<T>(
T item,
PartitionKey? partitionKey = null,
ItemRequestOptions itemRequestOptions = null)
where T : IIdentifiable
{
await this.container.UpsertItemAsync(item, partitionKey, itemRequestOptions);
}
public async Task Initialize(string partitionKeyPath = null)
{
this.client = new CosmosClient(
this.connectionString,
this.authKeyOrResourceToken);
var databaseResponse = await this.client.CreateDatabaseIfNotExistsAsync(databaseId);
var containerResponse = await databaseResponse.Database.CreateContainerIfNotExistsAsync(new ContainerProperties()
{
Id = containerId,
PartitionKeyPath = partitionKeyPath ?? "/DocumentType"
});
this.container = containerResponse.Container;
}
}
}

View File

@ -0,0 +1,47 @@
namespace Giants.Services
{
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Configuration;
public class CosmosDbServerRegistryStore : IServerRegistryStore
{
private readonly IConfiguration configuration;
private CosmosDbClient client;
public CosmosDbServerRegistryStore(IConfiguration configuration)
{
this.configuration = configuration;
}
public async Task<IEnumerable<ServerInfo>> GetServerInfos(
Expression<Func<ServerInfo, bool>> whereExpression = null)
{
return await this.client.GetItems<ServerInfo>(whereExpression);
}
public async Task<ServerInfo> GetServerInfo(string ipAddress)
{
return await this.client.GetItemById<ServerInfo>(ipAddress);
}
public async Task UpsertServerInfo(ServerInfo serverInfo)
{
await this.client.UpsertItem(serverInfo, new PartitionKey(serverInfo.DocumentType));
}
public async Task Initialize()
{
this.client = new CosmosDbClient(
connectionString: this.configuration["CosmosDbEndpoint"],
authKeyOrResourceToken: this.configuration["CosmosDbKey"],
databaseId: this.configuration["DatabaseId"],
containerId: this.configuration["ContainerId"]);
await this.client.Initialize();
}
}
}

View File

@ -0,0 +1,18 @@
namespace Giants.Services
{
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Threading.Tasks;
public interface IServerRegistryStore
{
public Task Initialize();
public Task<IEnumerable<ServerInfo>> GetServerInfos(Expression<Func<ServerInfo, bool>> whereExpression = null);
public Task<ServerInfo> GetServerInfo(string ipAddress);
public Task UpsertServerInfo(ServerInfo serverInfo);
}
}

View File

@ -1,22 +1,25 @@
namespace Giants.Services
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Net;
using System.Threading.Tasks;
public class InMemoryServerRegistryStore : IServerRegistryStore
{
private ConcurrentDictionary<IPAddress, ServerInfo> servers = new ConcurrentDictionary<IPAddress, ServerInfo>();
private ConcurrentDictionary<string, ServerInfo> servers = new ConcurrentDictionary<string, ServerInfo>();
public Task<IEnumerable<ServerInfo>> GetAllServerInfos()
public Task<IEnumerable<ServerInfo>> GetServerInfos(
Expression<Func<ServerInfo, bool>> whereExpression = null)
{
return Task.FromResult(
this.servers.Values.AsEnumerable());
}
public Task<ServerInfo> GetServerInfo(IPAddress ipAddress)
public Task<ServerInfo> GetServerInfo(string ipAddress)
{
if (servers.ContainsKey(ipAddress))
{
@ -26,9 +29,16 @@
return Task.FromResult((ServerInfo)null);
}
public Task UpsertServerInfo(IPAddress ipAddress, ServerInfo serverInfo)
public Task Initialize()
{
servers.TryAdd(ipAddress, serverInfo);
this.servers.Clear();
return Task.CompletedTask;
}
public Task UpsertServerInfo(ServerInfo serverInfo)
{
this.servers.TryAdd(serverInfo.HostIpAddress, serverInfo);
return Task.CompletedTask;
}

View File

@ -59,7 +59,7 @@ namespace Giants.Web.Controllers
serverInfoEntity.HostIpAddress = requestIpAddress.ToString();
serverInfoEntity.LastHeartbeat = DateTime.UtcNow;
await this.serverRegistryService.AddServer(requestIpAddress, serverInfoEntity);
await this.serverRegistryService.AddServer(serverInfoEntity);
}
}
}

View File

@ -0,0 +1,30 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Giants.Services;
using Microsoft.Extensions.Hosting;
namespace Giants.WebApi
{
public class InitializerHostedService : IHostedService
{
private readonly IServerRegistryStore serverRegistryStore;
public InitializerHostedService(IServerRegistryStore serverRegistryStore)
{
this.serverRegistryStore = serverRegistryStore;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await this.serverRegistryStore.Initialize();
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

View File

@ -1,14 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace Giants.Web
{
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Hosting;
public class Program
{
public static void Main(string[] args)

View File

@ -1,5 +1,6 @@
using AutoMapper;
using Giants.Services;
using Giants.WebApi;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Mvc.Infrastructure;
@ -26,8 +27,9 @@ namespace Giants.Web
services.AddHttpContextAccessor();
services.TryAddSingleton<IActionContextAccessor, ActionContextAccessor>();
services.AddHostedService<InitializerHostedService>();
ServicesModule.RegisterServices(services);
ServicesModule.RegisterServices(services, Configuration);
IMapper mapper = Services.Mapper.GetMapper();
services.AddSingleton(mapper);

View File

@ -1,9 +0,0 @@
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}

View File

@ -6,5 +6,9 @@
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"DatabaseId": "DefaultDatabase",
"ContainerId": "DefaultContainer",
"ServerTimeoutPeriodInMinutes": "7",
"MaxServerCount": 1000
}