1
0
mirror of https://github.com/ncblakely/GiantsTools synced 2024-11-22 14:15:37 +01:00
GiantsTools/Giants.Services/Store/CosmosDbServerRegistryStore.cs

184 lines
7.4 KiB
C#
Raw Normal View History

2020-08-09 01:31:16 +02:00
namespace Giants.Services
{
using System;
using System.Collections.Concurrent;
2020-08-09 01:31:16 +02:00
using System.Collections.Generic;
using System.Linq;
2020-08-09 01:31:16 +02:00
using System.Linq.Expressions;
using System.Threading.Tasks;
using Giants.Services.Core;
2020-08-09 01:31:16 +02:00
using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Caching.Memory;
2020-08-09 01:31:16 +02:00
using Microsoft.Extensions.Configuration;
2020-08-09 02:26:41 +02:00
using Microsoft.Extensions.Logging;
2020-08-09 01:31:16 +02:00
public class CosmosDbServerRegistryStore : IServerRegistryStore
{
2020-08-09 02:26:41 +02:00
private readonly ILogger<CosmosDbServerRegistryStore> logger;
2020-08-09 01:31:16 +02:00
private readonly IConfiguration configuration;
private readonly IMemoryCache memoryCache;
private readonly IDateTimeProvider dateTimeProvider;
private readonly TimeSpan timeoutPeriod;
2020-08-09 01:31:16 +02:00
private CosmosDbClient client;
2020-08-12 07:15:13 +02:00
private const int ServerRefreshIntervalInMinutes = 1;
2020-08-09 01:31:16 +02:00
2020-08-09 02:26:41 +02:00
public CosmosDbServerRegistryStore(
ILogger<CosmosDbServerRegistryStore> logger,
IConfiguration configuration,
IMemoryCache memoryCache,
IDateTimeProvider dateTimeProvider)
2020-08-09 01:31:16 +02:00
{
2020-08-09 02:26:41 +02:00
this.logger = logger;
2020-08-09 01:31:16 +02:00
this.configuration = configuration;
this.memoryCache = memoryCache;
this.dateTimeProvider = dateTimeProvider;
this.timeoutPeriod = TimeSpan.FromMinutes(Convert.ToDouble(this.configuration["ServerTimeoutPeriodInMinutes"]));
2020-08-09 01:31:16 +02:00
}
public async Task<IEnumerable<ServerInfo>> GetServerInfos(
2020-08-09 02:26:41 +02:00
Expression<Func<ServerInfo, bool>> whereExpression = null,
string partitionKey = null)
2020-08-09 01:31:16 +02:00
{
ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
IQueryable<ServerInfo> serverInfoQuery = serverInfo
.Values
.AsQueryable();
if (whereExpression != null)
{
serverInfoQuery = serverInfoQuery.Where(whereExpression);
}
return serverInfoQuery.Where(c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod)
.ToList();
2020-08-09 01:31:16 +02:00
}
2020-08-09 02:26:41 +02:00
public async Task<IEnumerable<TSelect>> GetServerInfos<TSelect>(
Expression<Func<ServerInfo, TSelect>> selectExpression,
Expression<Func<ServerInfo, bool>> whereExpression = null,
string partitionKey = null)
{
ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
IQueryable<ServerInfo> serverInfoQuery = serverInfo
.Values
.AsQueryable();
2020-08-09 02:52:26 +02:00
if (serverInfoQuery != null)
{
serverInfoQuery = serverInfoQuery.Where(whereExpression);
}
return serverInfoQuery.Where(c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod)
.Select(selectExpression)
.ToList();
2020-08-09 02:26:41 +02:00
}
2020-08-09 01:31:16 +02:00
public async Task<ServerInfo> GetServerInfo(string ipAddress)
{
2020-08-09 02:52:26 +02:00
ArgumentUtility.CheckStringForNullOrEmpty(ipAddress, nameof(ipAddress));
ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
if (serverInfo.ContainsKey(ipAddress))
{
try
{
return serverInfo[ipAddress];
}
catch (Exception e)
{
this.logger.LogInformation("Cached server for {IPAddress} no longer found: {Exception}", ipAddress, e.ToString());
// May have been removed from the cache by another thread. Ignore and query DB instead.
}
}
2020-08-09 01:31:16 +02:00
return await this.client.GetItemById<ServerInfo>(ipAddress);
}
public async Task UpsertServerInfo(ServerInfo serverInfo)
{
2020-08-09 02:52:26 +02:00
ArgumentUtility.CheckForNull(serverInfo, nameof(serverInfo));
// Check cache before we write to store
ConcurrentDictionary<string, ServerInfo> allServerInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
2020-08-12 07:15:13 +02:00
ServerInfo existingServerInfo = allServerInfo[serverInfo.HostIpAddress];
// DDOS protection: skip write to Cosmos if parameters have not changed,
// or it's not been long enough.
if (existingServerInfo.Equals(serverInfo)
&& Math.Abs((existingServerInfo.LastHeartbeat - serverInfo.LastHeartbeat).TotalMinutes) < ServerRefreshIntervalInMinutes)
{
this.logger.LogInformation("State for {IPAddress} is unchanged. Skipping write to store.", serverInfo.HostIpAddress);
return;
}
else
{
this.logger.LogInformation("State for {IPAddress} has changed. Committing update to store.", serverInfo.HostIpAddress);
}
}
2020-08-09 02:26:41 +02:00
await this.client.UpsertItem(
item: serverInfo,
partitionKey: new PartitionKey(serverInfo.DocumentType));
this.logger.LogInformation("Updating cache for request from {IPAddress}.", serverInfo.HostIpAddress);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
allServerInfo[serverInfo.HostIpAddress] = serverInfo;
}
else
{
allServerInfo.TryAdd(serverInfo.HostIpAddress, serverInfo);
}
2020-08-09 02:26:41 +02:00
}
2020-08-10 07:11:10 +02:00
public async Task DeleteServer(string id, string partitionKey = null)
{
await this.client.DeleteItem<ServerInfo>(id, partitionKey);
// Remove from cache
ConcurrentDictionary<string, ServerInfo> allServerInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
allServerInfo.TryRemove(id, out ServerInfo _);
2020-08-10 07:11:10 +02:00
}
2020-08-09 02:26:41 +02:00
public async Task DeleteServers(IEnumerable<string> ids, string partitionKey = null)
{
2020-08-09 02:52:26 +02:00
ArgumentUtility.CheckEnumerableForNullOrEmpty(ids, nameof(ids));
2020-08-09 02:26:41 +02:00
foreach (string id in ids)
{
this.logger.LogInformation("Deleting server for host IP {IPAddress}", id);
await this.DeleteServer(id, partitionKey);
2020-08-09 02:26:41 +02:00
}
2020-08-09 01:31:16 +02:00
}
public async Task Initialize()
{
this.client = new CosmosDbClient(
connectionString: this.configuration["CosmosDbEndpoint"],
authKeyOrResourceToken: this.configuration["CosmosDbKey"],
databaseId: this.configuration["DatabaseId"],
containerId: this.configuration["ContainerId"]);
await this.client.Initialize();
}
private async Task<ConcurrentDictionary<string, ServerInfo>> PopulateCache(ICacheEntry entry)
{
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(1);
IDictionary<string, ServerInfo> serverInfo =
(await this.client.GetItems<ServerInfo>())
.ToDictionary(x => x.HostIpAddress, y => y);
return new ConcurrentDictionary<string, ServerInfo>(serverInfo);
}
2020-08-09 01:31:16 +02:00
}
2020-08-09 02:26:41 +02:00
}