mirror of
https://github.com/bitwarden/server
synced 2025-12-26 05:03:18 +00:00
Platform/pm 2535/upgrade to azure messaging servicebus (#3102)
* `dotnet add package Azure.Messaging.ServiceBus` 🤖 * Move to Azure.Messaging.ServiceBus * `dotnet restore --locked-mode --force-evaluate` 🤖 Remove Microsoft.Azure.ServiceBus * `dotnet restore --locked-mode --force-evaluate` 🤖 * Include broker filter * `dotnet restore --locked-mode --force-evaluate` 🤖
This commit is contained in:
@@ -1,10 +1,10 @@
|
||||
using Bit.Core.Enums;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Azure.Messaging.ServiceBus.Administration;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Repositories;
|
||||
using Bit.Core.Services;
|
||||
using Bit.Core.Settings;
|
||||
using Bit.Core.Utilities;
|
||||
using Microsoft.Azure.ServiceBus;
|
||||
using Microsoft.Azure.ServiceBus.Management;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
@@ -15,10 +15,14 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
private readonly InMemoryServiceBusApplicationCacheService _applicationCacheService;
|
||||
private readonly IOrganizationRepository _organizationRepository;
|
||||
protected readonly ILogger<ApplicationCacheHostedService> _logger;
|
||||
private readonly SubscriptionClient _subscriptionClient;
|
||||
private readonly ManagementClient _managementClient;
|
||||
private readonly ServiceBusClient _serviceBusClient;
|
||||
private readonly ServiceBusReceiver _subscriptionReceiver;
|
||||
private readonly ServiceBusAdministrationClient _serviceBusAdministrationClient;
|
||||
private readonly string _subName;
|
||||
private readonly string _topicName;
|
||||
private CancellationTokenSource _cts;
|
||||
private Task _executingTask;
|
||||
|
||||
|
||||
public ApplicationCacheHostedService(
|
||||
IApplicationCacheService applicationCacheService,
|
||||
@@ -31,53 +35,73 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
_applicationCacheService = applicationCacheService as InMemoryServiceBusApplicationCacheService;
|
||||
_organizationRepository = organizationRepository;
|
||||
_logger = logger;
|
||||
_managementClient = new ManagementClient(globalSettings.ServiceBus.ConnectionString);
|
||||
_subscriptionClient = new SubscriptionClient(globalSettings.ServiceBus.ConnectionString,
|
||||
_topicName, _subName);
|
||||
_serviceBusClient = new ServiceBusClient(globalSettings.ServiceBus.ConnectionString);
|
||||
_subscriptionReceiver = _serviceBusClient.CreateReceiver(_topicName, _subName);
|
||||
_serviceBusAdministrationClient = new ServiceBusAdministrationClient(globalSettings.ServiceBus.ConnectionString);
|
||||
}
|
||||
|
||||
public virtual async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
try
|
||||
{
|
||||
await _managementClient.CreateSubscriptionAsync(new SubscriptionDescription(_topicName, _subName)
|
||||
await _serviceBusAdministrationClient.CreateSubscriptionAsync(new CreateSubscriptionOptions(_topicName, _subName)
|
||||
{
|
||||
DefaultMessageTimeToLive = TimeSpan.FromDays(14),
|
||||
LockDuration = TimeSpan.FromSeconds(30),
|
||||
EnableDeadLetteringOnFilterEvaluationExceptions = true,
|
||||
EnableDeadLetteringOnMessageExpiration = true,
|
||||
}, new RuleDescription("default", new SqlFilter($"sys.Label != '{_subName}'")));
|
||||
}
|
||||
catch (MessagingEntityAlreadyExistsException) { }
|
||||
_subscriptionClient.RegisterMessageHandler(ProcessMessageAsync,
|
||||
new MessageHandlerOptions(ExceptionReceivedHandlerAsync)
|
||||
DeadLetteringOnMessageExpiration = true,
|
||||
}, new CreateRuleOptions
|
||||
{
|
||||
MaxConcurrentCalls = 2,
|
||||
AutoComplete = false,
|
||||
});
|
||||
Filter = new SqlRuleFilter($"sys.label != '{_subName}'")
|
||||
}, cancellationToken);
|
||||
}
|
||||
catch (ServiceBusException e)
|
||||
when (e.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
|
||||
{ }
|
||||
|
||||
_cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
|
||||
_executingTask = ExecuteAsync(_cts.Token);
|
||||
}
|
||||
|
||||
public virtual async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _subscriptionClient.CloseAsync();
|
||||
await _subscriptionReceiver.CloseAsync(cancellationToken);
|
||||
await _serviceBusClient.DisposeAsync();
|
||||
_cts.Cancel();
|
||||
try
|
||||
{
|
||||
await _managementClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
|
||||
await _serviceBusAdministrationClient.DeleteSubscriptionAsync(_topicName, _subName, cancellationToken);
|
||||
}
|
||||
catch { }
|
||||
await _executingTask;
|
||||
}
|
||||
|
||||
public virtual void Dispose()
|
||||
{ }
|
||||
|
||||
private async Task ProcessMessageAsync(Message message, CancellationToken cancellationToken)
|
||||
private async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (message.Label != _subName && _applicationCacheService != null)
|
||||
await foreach (var message in _subscriptionReceiver.ReceiveMessagesAsync(cancellationToken))
|
||||
{
|
||||
switch ((ApplicationCacheMessageType)message.UserProperties["type"])
|
||||
try
|
||||
{
|
||||
await ProcessMessageAsync(message, cancellationToken);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
_logger.LogError(e, "Error processing messages in ApplicationCacheHostedService");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ProcessMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken)
|
||||
{
|
||||
if (message.Subject != _subName && _applicationCacheService != null)
|
||||
{
|
||||
switch ((ApplicationCacheMessageType)message.ApplicationProperties["type"])
|
||||
{
|
||||
case ApplicationCacheMessageType.UpsertOrganizationAbility:
|
||||
var upsertedOrgId = (Guid)message.UserProperties["id"];
|
||||
var upsertedOrgId = (Guid)message.ApplicationProperties["id"];
|
||||
var upsertedOrg = await _organizationRepository.GetByIdAsync(upsertedOrgId);
|
||||
if (upsertedOrg != null)
|
||||
{
|
||||
@@ -86,7 +110,7 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
break;
|
||||
case ApplicationCacheMessageType.DeleteOrganizationAbility:
|
||||
await _applicationCacheService.BaseDeleteOrganizationAbilityAsync(
|
||||
(Guid)message.UserProperties["id"]);
|
||||
(Guid)message.ApplicationProperties["id"]);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
@@ -94,13 +118,7 @@ public class ApplicationCacheHostedService : IHostedService, IDisposable
|
||||
}
|
||||
if (!cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
await _subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
|
||||
await _subscriptionReceiver.CompleteMessageAsync(message, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
private Task ExceptionReceivedHandlerAsync(ExceptionReceivedEventArgs args)
|
||||
{
|
||||
_logger.LogError(args.Exception, "Message handler encountered an exception.");
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user