mirror of
https://github.com/bitwarden/server
synced 2026-01-08 11:33:26 +00:00
[PM-23845] Update cache service to handle concurrency (#6170)
This commit is contained in:
@@ -0,0 +1,10 @@
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public interface IApplicationCacheServiceBusMessaging
|
||||
{
|
||||
Task NotifyOrganizationAbilityUpsertedAsync(Organization organization);
|
||||
Task NotifyOrganizationAbilityDeletedAsync(Guid organizationId);
|
||||
Task NotifyProviderAbilityDeletedAsync(Guid providerId);
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
using Bit.Core.AdminConsole.Entities.Provider;
|
||||
using Bit.Core.AdminConsole.Models.Data.Provider;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public interface IVCurrentInMemoryApplicationCacheService
|
||||
{
|
||||
Task<IDictionary<Guid, OrganizationAbility>> GetOrganizationAbilitiesAsync();
|
||||
#nullable enable
|
||||
Task<OrganizationAbility?> GetOrganizationAbilityAsync(Guid orgId);
|
||||
#nullable disable
|
||||
Task<IDictionary<Guid, ProviderAbility>> GetProviderAbilitiesAsync();
|
||||
Task UpsertOrganizationAbilityAsync(Organization organization);
|
||||
Task UpsertProviderAbilityAsync(Provider provider);
|
||||
Task DeleteOrganizationAbilityAsync(Guid organizationId);
|
||||
Task DeleteProviderAbilityAsync(Guid providerId);
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
using Bit.Core.AdminConsole.Entities.Provider;
|
||||
using Bit.Core.AdminConsole.Models.Data.Provider;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public interface IVNextInMemoryApplicationCacheService
|
||||
{
|
||||
Task<IDictionary<Guid, OrganizationAbility>> GetOrganizationAbilitiesAsync();
|
||||
#nullable enable
|
||||
Task<OrganizationAbility?> GetOrganizationAbilityAsync(Guid orgId);
|
||||
#nullable disable
|
||||
Task<IDictionary<Guid, ProviderAbility>> GetProviderAbilitiesAsync();
|
||||
Task UpsertOrganizationAbilityAsync(Organization organization);
|
||||
Task UpsertProviderAbilityAsync(Provider provider);
|
||||
Task DeleteOrganizationAbilityAsync(Guid organizationId);
|
||||
Task DeleteProviderAbilityAsync(Guid providerId);
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public class NoOpApplicationCacheMessaging : IApplicationCacheServiceBusMessaging
|
||||
{
|
||||
public Task NotifyOrganizationAbilityUpsertedAsync(Organization organization)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NotifyOrganizationAbilityDeletedAsync(Guid organizationId)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public Task NotifyProviderAbilityDeletedAsync(Guid providerId)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
using Bit.Core.Utilities;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public class ServiceBusApplicationCacheMessaging : IApplicationCacheServiceBusMessaging
|
||||
{
|
||||
private readonly ServiceBusSender _topicMessageSender;
|
||||
private readonly string _subName;
|
||||
|
||||
public ServiceBusApplicationCacheMessaging(
|
||||
GlobalSettings globalSettings)
|
||||
{
|
||||
_subName = CoreHelpers.GetApplicationCacheServiceBusSubscriptionName(globalSettings);
|
||||
var serviceBusClient = new ServiceBusClient(globalSettings.ServiceBus.ConnectionString);
|
||||
_topicMessageSender = serviceBusClient.CreateSender(globalSettings.ServiceBus.ApplicationCacheTopicName);
|
||||
}
|
||||
|
||||
public async Task NotifyOrganizationAbilityUpsertedAsync(Organization organization)
|
||||
{
|
||||
var message = new ServiceBusMessage
|
||||
{
|
||||
Subject = _subName,
|
||||
ApplicationProperties =
|
||||
{
|
||||
{ "type", (byte)ApplicationCacheMessageType.UpsertOrganizationAbility },
|
||||
{ "id", organization.Id },
|
||||
}
|
||||
};
|
||||
await _topicMessageSender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async Task NotifyOrganizationAbilityDeletedAsync(Guid organizationId)
|
||||
{
|
||||
var message = new ServiceBusMessage
|
||||
{
|
||||
Subject = _subName,
|
||||
ApplicationProperties =
|
||||
{
|
||||
{ "type", (byte)ApplicationCacheMessageType.DeleteOrganizationAbility },
|
||||
{ "id", organizationId },
|
||||
}
|
||||
};
|
||||
await _topicMessageSender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async Task NotifyProviderAbilityDeletedAsync(Guid providerId)
|
||||
{
|
||||
var message = new ServiceBusMessage
|
||||
{
|
||||
Subject = _subName,
|
||||
ApplicationProperties =
|
||||
{
|
||||
{ "type", (byte)ApplicationCacheMessageType.DeleteProviderAbility },
|
||||
{ "id", providerId },
|
||||
}
|
||||
};
|
||||
await _topicMessageSender.SendMessageAsync(message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,137 @@
|
||||
using System.Collections.Concurrent;
|
||||
using Bit.Core.AdminConsole.Entities;
|
||||
using Bit.Core.AdminConsole.Entities.Provider;
|
||||
using Bit.Core.AdminConsole.Models.Data.Provider;
|
||||
using Bit.Core.AdminConsole.Repositories;
|
||||
using Bit.Core.Models.Data.Organizations;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
namespace Bit.Core.AdminConsole.AbilitiesCache;
|
||||
|
||||
public class VNextInMemoryApplicationCacheService(
|
||||
IOrganizationRepository organizationRepository,
|
||||
IProviderRepository providerRepository,
|
||||
TimeProvider timeProvider) : IVNextInMemoryApplicationCacheService
|
||||
{
|
||||
private ConcurrentDictionary<Guid, OrganizationAbility> _orgAbilities = new();
|
||||
private readonly SemaphoreSlim _orgInitLock = new(1, 1);
|
||||
private DateTimeOffset _lastOrgAbilityRefresh = DateTimeOffset.MinValue;
|
||||
|
||||
private ConcurrentDictionary<Guid, ProviderAbility> _providerAbilities = new();
|
||||
private readonly SemaphoreSlim _providerInitLock = new(1, 1);
|
||||
private DateTimeOffset _lastProviderAbilityRefresh = DateTimeOffset.MinValue;
|
||||
|
||||
private readonly TimeSpan _refreshInterval = TimeSpan.FromMinutes(10);
|
||||
|
||||
public virtual async Task<IDictionary<Guid, OrganizationAbility>> GetOrganizationAbilitiesAsync()
|
||||
{
|
||||
await InitOrganizationAbilitiesAsync();
|
||||
return _orgAbilities;
|
||||
}
|
||||
|
||||
public async Task<OrganizationAbility?> GetOrganizationAbilityAsync(Guid organizationId)
|
||||
{
|
||||
(await GetOrganizationAbilitiesAsync())
|
||||
.TryGetValue(organizationId, out var organizationAbility);
|
||||
return organizationAbility;
|
||||
}
|
||||
|
||||
public virtual async Task<IDictionary<Guid, ProviderAbility>> GetProviderAbilitiesAsync()
|
||||
{
|
||||
await InitProviderAbilitiesAsync();
|
||||
return _providerAbilities;
|
||||
}
|
||||
|
||||
public virtual async Task UpsertProviderAbilityAsync(Provider provider)
|
||||
{
|
||||
await InitProviderAbilitiesAsync();
|
||||
_providerAbilities.AddOrUpdate(
|
||||
provider.Id,
|
||||
static (_, provider) => new ProviderAbility(provider),
|
||||
static (_, _, provider) => new ProviderAbility(provider),
|
||||
provider);
|
||||
}
|
||||
|
||||
public virtual async Task UpsertOrganizationAbilityAsync(Organization organization)
|
||||
{
|
||||
await InitOrganizationAbilitiesAsync();
|
||||
|
||||
_orgAbilities.AddOrUpdate(
|
||||
organization.Id,
|
||||
static (_, organization) => new OrganizationAbility(organization),
|
||||
static (_, _, organization) => new OrganizationAbility(organization),
|
||||
organization);
|
||||
}
|
||||
|
||||
public virtual Task DeleteOrganizationAbilityAsync(Guid organizationId)
|
||||
{
|
||||
_orgAbilities.TryRemove(organizationId, out _);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
public virtual Task DeleteProviderAbilityAsync(Guid providerId)
|
||||
{
|
||||
_providerAbilities.TryRemove(providerId, out _);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task InitOrganizationAbilitiesAsync() =>
|
||||
await InitAbilitiesAsync<OrganizationAbility>(
|
||||
dict => _orgAbilities = dict,
|
||||
() => _lastOrgAbilityRefresh,
|
||||
dt => _lastOrgAbilityRefresh = dt,
|
||||
_orgInitLock,
|
||||
async () => await organizationRepository.GetManyAbilitiesAsync(),
|
||||
_refreshInterval,
|
||||
ability => ability.Id);
|
||||
|
||||
private async Task InitProviderAbilitiesAsync() =>
|
||||
await InitAbilitiesAsync<ProviderAbility>(
|
||||
dict => _providerAbilities = dict,
|
||||
() => _lastProviderAbilityRefresh,
|
||||
dateTime => _lastProviderAbilityRefresh = dateTime,
|
||||
_providerInitLock,
|
||||
async () => await providerRepository.GetManyAbilitiesAsync(),
|
||||
_refreshInterval,
|
||||
ability => ability.Id);
|
||||
|
||||
|
||||
private async Task InitAbilitiesAsync<TAbility>(
|
||||
Action<ConcurrentDictionary<Guid, TAbility>> setCache,
|
||||
Func<DateTimeOffset> getLastRefresh,
|
||||
Action<DateTimeOffset> setLastRefresh,
|
||||
SemaphoreSlim @lock,
|
||||
Func<Task<IEnumerable<TAbility>>> fetchFunc,
|
||||
TimeSpan refreshInterval,
|
||||
Func<TAbility, Guid> getId)
|
||||
{
|
||||
if (SkipRefresh())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await @lock.WaitAsync();
|
||||
try
|
||||
{
|
||||
if (SkipRefresh())
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var sources = await fetchFunc();
|
||||
var abilities = new ConcurrentDictionary<Guid, TAbility>(
|
||||
sources.ToDictionary(getId));
|
||||
setCache(abilities);
|
||||
setLastRefresh(timeProvider.GetUtcNow());
|
||||
}
|
||||
finally
|
||||
{
|
||||
@lock.Release();
|
||||
}
|
||||
|
||||
bool SkipRefresh()
|
||||
{
|
||||
return timeProvider.GetUtcNow() - getLastRefresh() <= refreshInterval;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user