Move caching for servers to store layer.

This commit is contained in:
Nick Blakely 2020-08-11 21:03:02 -07:00
parent d8a107b5ca
commit e44d685d55
3 changed files with 104 additions and 65 deletions

View File

@ -16,24 +16,16 @@
private readonly ILogger<ServerRegistryService> logger; private readonly ILogger<ServerRegistryService> logger;
private readonly IServerRegistryStore registryStore; private readonly IServerRegistryStore registryStore;
private readonly IConfiguration configuration; private readonly IConfiguration configuration;
private readonly IDateTimeProvider dateTimeProvider;
private readonly IMemoryCache memoryCache;
private readonly TimeSpan timeoutPeriod;
private readonly int maxServerCount; private readonly int maxServerCount;
public ServerRegistryService( public ServerRegistryService(
ILogger<ServerRegistryService> logger, ILogger<ServerRegistryService> logger,
IServerRegistryStore registryStore, IServerRegistryStore registryStore,
IConfiguration configuration, IConfiguration configuration)
IDateTimeProvider dateTimeProvider,
IMemoryCache memoryCache)
{ {
this.logger = logger; this.logger = logger;
this.registryStore = registryStore; this.registryStore = registryStore;
this.configuration = configuration; this.configuration = configuration;
this.dateTimeProvider = dateTimeProvider;
this.memoryCache = memoryCache;
this.timeoutPeriod = TimeSpan.FromMinutes(Convert.ToDouble(this.configuration["ServerTimeoutPeriodInMinutes"]));
this.maxServerCount = Convert.ToInt32(this.configuration["MaxServerCount"]); this.maxServerCount = Convert.ToInt32(this.configuration["MaxServerCount"]);
} }
@ -48,41 +40,13 @@
throw new ArgumentException($"Unsupported game name {serverInfo.GameName}", nameof(serverInfo)); throw new ArgumentException($"Unsupported game name {serverInfo.GameName}", nameof(serverInfo));
} }
// Check cache before we write to store
ConcurrentDictionary<string, ServerInfo> allServerInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
if (allServerInfo[serverInfo.HostIpAddress].Equals(serverInfo))
{
this.logger.LogInformation("State for {IPAddress} is unchanged. Skipping write to store.", serverInfo.HostIpAddress);
return;
}
else
{
this.logger.LogInformation("State for {IPAddress} has changed. Committing update to store.", serverInfo.HostIpAddress);
}
}
await this.registryStore.UpsertServerInfo(serverInfo); await this.registryStore.UpsertServerInfo(serverInfo);
this.logger.LogInformation("Updating cache for request from {IPAddress}.", serverInfo.HostIpAddress);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
allServerInfo[serverInfo.HostIpAddress] = serverInfo;
}
else
{
allServerInfo.TryAdd(serverInfo.HostIpAddress, serverInfo);
}
} }
public async Task<IEnumerable<ServerInfo>> GetAllServers() public async Task<IEnumerable<ServerInfo>> GetAllServers()
{ {
ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache); return (await this.registryStore.GetServerInfos())
.Take(this.maxServerCount);
return serverInfo.Values;
} }
public async Task DeleteServer(string ipAddress) public async Task DeleteServer(string ipAddress)
@ -96,17 +60,5 @@
await this.registryStore.DeleteServer(serverInfo.id); await this.registryStore.DeleteServer(serverInfo.id);
} }
} }
private async Task<ConcurrentDictionary<string, ServerInfo>> PopulateCache(ICacheEntry entry)
{
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(1);
IDictionary<string, ServerInfo> serverInfo = (await this.registryStore
.GetServerInfos(whereExpression: c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod))
.Take(this.maxServerCount)
.ToDictionary(x => x.HostIpAddress, y => y);
return new ConcurrentDictionary<string, ServerInfo>(serverInfo);
}
} }
} }

View File

@ -1,10 +1,14 @@
namespace Giants.Services namespace Giants.Services
{ {
using System; using System;
using System.Collections.Concurrent;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Threading.Tasks; using System.Threading.Tasks;
using Giants.Services.Core;
using Microsoft.Azure.Cosmos; using Microsoft.Azure.Cosmos;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -12,21 +16,41 @@
{ {
private readonly ILogger<CosmosDbServerRegistryStore> logger; private readonly ILogger<CosmosDbServerRegistryStore> logger;
private readonly IConfiguration configuration; private readonly IConfiguration configuration;
private readonly IMemoryCache memoryCache;
private readonly IDateTimeProvider dateTimeProvider;
private readonly TimeSpan timeoutPeriod;
private CosmosDbClient client; private CosmosDbClient client;
public CosmosDbServerRegistryStore( public CosmosDbServerRegistryStore(
ILogger<CosmosDbServerRegistryStore> logger, ILogger<CosmosDbServerRegistryStore> logger,
IConfiguration configuration) IConfiguration configuration,
IMemoryCache memoryCache,
IDateTimeProvider dateTimeProvider)
{ {
this.logger = logger; this.logger = logger;
this.configuration = configuration; this.configuration = configuration;
this.memoryCache = memoryCache;
this.dateTimeProvider = dateTimeProvider;
this.timeoutPeriod = TimeSpan.FromMinutes(Convert.ToDouble(this.configuration["ServerTimeoutPeriodInMinutes"]));
} }
public async Task<IEnumerable<ServerInfo>> GetServerInfos( public async Task<IEnumerable<ServerInfo>> GetServerInfos(
Expression<Func<ServerInfo, bool>> whereExpression = null, Expression<Func<ServerInfo, bool>> whereExpression = null,
string partitionKey = null) string partitionKey = null)
{ {
return await this.client.GetItems<ServerInfo>(whereExpression); ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
IQueryable<ServerInfo> serverInfoQuery = serverInfo
.Values
.AsQueryable();
if (whereExpression != null)
{
serverInfoQuery = serverInfoQuery.Where(whereExpression);
}
return serverInfoQuery.Where(c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod)
.ToList();
} }
public async Task<IEnumerable<TSelect>> GetServerInfos<TSelect>( public async Task<IEnumerable<TSelect>> GetServerInfos<TSelect>(
@ -34,18 +58,40 @@
Expression<Func<ServerInfo, bool>> whereExpression = null, Expression<Func<ServerInfo, bool>> whereExpression = null,
string partitionKey = null) string partitionKey = null)
{ {
ArgumentUtility.CheckForNull(selectExpression, nameof(selectExpression)); ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
return await this.client.GetItems<ServerInfo, TSelect>( IQueryable<ServerInfo> serverInfoQuery = serverInfo
selectExpression: selectExpression, .Values
whereExpression: whereExpression, .AsQueryable();
partitionKey: partitionKey);
if (serverInfoQuery != null)
{
serverInfoQuery = serverInfoQuery.Where(whereExpression);
}
return serverInfoQuery.Where(c => c.LastHeartbeat > this.dateTimeProvider.UtcNow - this.timeoutPeriod)
.Select(selectExpression)
.ToList();
} }
public async Task<ServerInfo> GetServerInfo(string ipAddress) public async Task<ServerInfo> GetServerInfo(string ipAddress)
{ {
ArgumentUtility.CheckStringForNullOrEmpty(ipAddress, nameof(ipAddress)); ArgumentUtility.CheckStringForNullOrEmpty(ipAddress, nameof(ipAddress));
ConcurrentDictionary<string, ServerInfo> serverInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
if (serverInfo.ContainsKey(ipAddress))
{
try
{
return serverInfo[ipAddress];
}
catch (Exception e)
{
this.logger.LogInformation("Cached server for {IPAddress} no longer found: {Exception}", ipAddress, e.ToString());
// May have been removed from the cache by another thread. Ignore and query DB instead.
}
}
return await this.client.GetItemById<ServerInfo>(ipAddress); return await this.client.GetItemById<ServerInfo>(ipAddress);
} }
@ -53,14 +99,45 @@
{ {
ArgumentUtility.CheckForNull(serverInfo, nameof(serverInfo)); ArgumentUtility.CheckForNull(serverInfo, nameof(serverInfo));
// Check cache before we write to store
ConcurrentDictionary<string, ServerInfo> allServerInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
if (allServerInfo[serverInfo.HostIpAddress].Equals(serverInfo))
{
this.logger.LogInformation("State for {IPAddress} is unchanged. Skipping write to store.", serverInfo.HostIpAddress);
return;
}
else
{
this.logger.LogInformation("State for {IPAddress} has changed. Committing update to store.", serverInfo.HostIpAddress);
}
}
await this.client.UpsertItem( await this.client.UpsertItem(
item: serverInfo, item: serverInfo,
partitionKey: new PartitionKey(serverInfo.DocumentType)); partitionKey: new PartitionKey(serverInfo.DocumentType));
this.logger.LogInformation("Updating cache for request from {IPAddress}.", serverInfo.HostIpAddress);
if (allServerInfo.ContainsKey(serverInfo.HostIpAddress))
{
allServerInfo[serverInfo.HostIpAddress] = serverInfo;
}
else
{
allServerInfo.TryAdd(serverInfo.HostIpAddress, serverInfo);
}
} }
public async Task DeleteServer(string id, string partitionKey = null) public async Task DeleteServer(string id, string partitionKey = null)
{ {
await this.client.DeleteItem<ServerInfo>(id, partitionKey); await this.client.DeleteItem<ServerInfo>(id, partitionKey);
// Remove from cache
ConcurrentDictionary<string, ServerInfo> allServerInfo = await this.memoryCache.GetOrCreateAsync(CacheKeys.ServerInfo, this.PopulateCache);
allServerInfo.TryRemove(id, out ServerInfo _);
} }
public async Task DeleteServers(IEnumerable<string> ids, string partitionKey = null) public async Task DeleteServers(IEnumerable<string> ids, string partitionKey = null)
@ -71,7 +148,7 @@
{ {
this.logger.LogInformation("Deleting server for host IP {IPAddress}", id); this.logger.LogInformation("Deleting server for host IP {IPAddress}", id);
await this.client.DeleteItem<ServerInfo>(id, partitionKey); await this.DeleteServer(id, partitionKey);
} }
} }
@ -85,5 +162,16 @@
await this.client.Initialize(); await this.client.Initialize();
} }
private async Task<ConcurrentDictionary<string, ServerInfo>> PopulateCache(ICacheEntry entry)
{
entry.AbsoluteExpirationRelativeToNow = TimeSpan.FromMinutes(1);
IDictionary<string, ServerInfo> serverInfo =
(await this.client.GetItems<ServerInfo>())
.ToDictionary(x => x.HostIpAddress, y => y);
return new ConcurrentDictionary<string, ServerInfo>(serverInfo);
}
} }
} }

View File

@ -1,5 +1,4 @@
{ {
"$schema": "http://json.schemastore.org/launchsettings.json",
"iisSettings": { "iisSettings": {
"windowsAuthentication": false, "windowsAuthentication": false,
"anonymousAuthentication": true, "anonymousAuthentication": true,
@ -8,11 +7,11 @@
"sslPort": 44304 "sslPort": 44304
} }
}, },
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": { "profiles": {
"IIS Express": { "IIS Express": {
"commandName": "IISExpress", "commandName": "IISExpress",
"launchBrowser": true, "launchBrowser": true,
"launchUrl": "weatherforecast",
"environmentVariables": { "environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development" "ASPNETCORE_ENVIRONMENT": "Development"
} }
@ -21,10 +20,10 @@
"commandName": "Project", "commandName": "Project",
"launchBrowser": true, "launchBrowser": true,
"launchUrl": "weatherforecast", "launchUrl": "weatherforecast",
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"environmentVariables": { "environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development" "ASPNETCORE_ENVIRONMENT": "Development"
} },
"applicationUrl": "https://localhost:5001;http://localhost:5000"
} }
} }
} }