mirror of
https://github.com/bitwarden/server
synced 2025-12-23 11:43:23 +00:00
[PM-17562] Add Azure Service Bus support for event integration retries (#5880)
* [PM-17562] Add Azure Service Bus support for event integration retries * Cleanup AzureServiceBusIntegrationListenerService.cs; add nullable * Removed IntegrationHandlerBase* since it is no longer used (We removed the subclasses previously) * Changed strategy to assume ApplyRetry always gives us a non-null DelayUntilDate; Added test to confirm as well
This commit is contained in:
@@ -20,7 +20,7 @@ public class AzureServiceBusEventListenerService : EventLoggingListenerService
|
||||
string subscriptionName) : base(handler)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.TopicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
_processor = _client.CreateProcessor(globalSettings.EventLogging.AzureServiceBus.EventTopicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ public class AzureServiceBusEventWriteService : IEventWriteService, IAsyncDispos
|
||||
public AzureServiceBusEventWriteService(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.TopicName);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
|
||||
@@ -0,0 +1,101 @@
|
||||
#nullable enable
|
||||
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusIntegrationListenerService : BackgroundService
|
||||
{
|
||||
private readonly int _maxRetries;
|
||||
private readonly string _subscriptionName;
|
||||
private readonly string _topicName;
|
||||
private readonly IIntegrationHandler _handler;
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
private readonly ServiceBusSender _sender;
|
||||
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger;
|
||||
|
||||
public AzureServiceBusIntegrationListenerService(
|
||||
IIntegrationHandler handler,
|
||||
string subscriptionName,
|
||||
GlobalSettings globalSettings,
|
||||
ILogger<AzureServiceBusIntegrationListenerService> logger)
|
||||
{
|
||||
_handler = handler;
|
||||
_logger = logger;
|
||||
_maxRetries = globalSettings.EventLogging.AzureServiceBus.MaxRetries;
|
||||
_topicName = globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName;
|
||||
_subscriptionName = subscriptionName;
|
||||
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_processor = _client.CreateProcessor(_topicName, _subscriptionName, new ServiceBusProcessorOptions());
|
||||
_sender = _client.CreateSender(_topicName);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += HandleMessageAsync;
|
||||
_processor.ProcessErrorAsync += args =>
|
||||
{
|
||||
_logger.LogError(args.Exception, "Azure Service Bus error");
|
||||
return Task.CompletedTask;
|
||||
};
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
await _processor.DisposeAsync();
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
|
||||
{
|
||||
var json = args.Message.Body.ToString();
|
||||
|
||||
try
|
||||
{
|
||||
var result = await _handler.HandleAsync(json);
|
||||
var message = result.Message;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
return;
|
||||
}
|
||||
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (result.Retryable && message.RetryCount < _maxRetries)
|
||||
{
|
||||
var scheduledTime = (DateTime)message.DelayUntilDate!;
|
||||
var retryMsg = new ServiceBusMessage(message.ToJson())
|
||||
{
|
||||
Subject = args.Message.Subject,
|
||||
ScheduledEnqueueTime = scheduledTime
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(retryMsg);
|
||||
}
|
||||
else
|
||||
{
|
||||
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable");
|
||||
return;
|
||||
}
|
||||
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Unhandled error processing ASB message");
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusIntegrationPublisher : IIntegrationPublisher, IAsyncDisposable
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _sender;
|
||||
|
||||
public AzureServiceBusIntegrationPublisher(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_sender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
};
|
||||
|
||||
await _sender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _sender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
||||
@@ -1,66 +0,0 @@
|
||||
using System.Text.Json.Nodes;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.AdminConsole.Utilities;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public abstract class IntegrationEventHandlerBase(
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository)
|
||||
: IEventMessageHandler
|
||||
{
|
||||
public async Task HandleEventAsync(EventMessage eventMessage)
|
||||
{
|
||||
var organizationId = eventMessage.OrganizationId ?? Guid.Empty;
|
||||
var configurations = await configurationRepository.GetConfigurationDetailsAsync(
|
||||
organizationId,
|
||||
GetIntegrationType(),
|
||||
eventMessage.Type);
|
||||
|
||||
foreach (var configuration in configurations)
|
||||
{
|
||||
var context = await BuildContextAsync(eventMessage, configuration.Template);
|
||||
var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(configuration.Template, context);
|
||||
|
||||
await ProcessEventIntegrationAsync(configuration.MergedConfiguration, renderedTemplate);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
|
||||
{
|
||||
foreach (var eventMessage in eventMessages)
|
||||
{
|
||||
await HandleEventAsync(eventMessage);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IntegrationTemplateContext> BuildContextAsync(EventMessage eventMessage, string template)
|
||||
{
|
||||
var context = new IntegrationTemplateContext(eventMessage);
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresUser(template) && eventMessage.UserId.HasValue)
|
||||
{
|
||||
context.User = await userRepository.GetByIdAsync(eventMessage.UserId.Value);
|
||||
}
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresActingUser(template) && eventMessage.ActingUserId.HasValue)
|
||||
{
|
||||
context.ActingUser = await userRepository.GetByIdAsync(eventMessage.ActingUserId.Value);
|
||||
}
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresOrganization(template) && eventMessage.OrganizationId.HasValue)
|
||||
{
|
||||
context.Organization = await organizationRepository.GetByIdAsync(eventMessage.OrganizationId.Value);
|
||||
}
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
protected abstract IntegrationType GetIntegrationType();
|
||||
|
||||
protected abstract Task ProcessEventIntegrationAsync(JsonObject mergedConfiguration, string renderedTemplate);
|
||||
}
|
||||
@@ -1,35 +0,0 @@
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class SlackEventHandler(
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository,
|
||||
ISlackService slackService)
|
||||
: IntegrationEventHandlerBase(userRepository, organizationRepository, configurationRepository)
|
||||
{
|
||||
protected override IntegrationType GetIntegrationType() => IntegrationType.Slack;
|
||||
|
||||
protected override async Task ProcessEventIntegrationAsync(JsonObject mergedConfiguration,
|
||||
string renderedTemplate)
|
||||
{
|
||||
var config = mergedConfiguration.Deserialize<SlackIntegrationConfigurationDetails>();
|
||||
if (config is null)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await slackService.SendSlackMessageByChannelIdAsync(
|
||||
config.token,
|
||||
renderedTemplate,
|
||||
config.channelId
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,38 +0,0 @@
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.Json.Nodes;
|
||||
using Bit.Core.AdminConsole.Models.Data.Integrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class WebhookEventHandler(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository)
|
||||
: IntegrationEventHandlerBase(userRepository, organizationRepository, configurationRepository)
|
||||
{
|
||||
private readonly HttpClient _httpClient = httpClientFactory.CreateClient(HttpClientName);
|
||||
|
||||
public const string HttpClientName = "WebhookEventHandlerHttpClient";
|
||||
|
||||
protected override IntegrationType GetIntegrationType() => IntegrationType.Webhook;
|
||||
|
||||
protected override async Task ProcessEventIntegrationAsync(JsonObject mergedConfiguration,
|
||||
string renderedTemplate)
|
||||
{
|
||||
var config = mergedConfiguration.Deserialize<WebhookIntegrationConfigurationDetails>();
|
||||
if (config is null || string.IsNullOrEmpty(config.url))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var content = new StringContent(renderedTemplate, Encoding.UTF8, "application/json");
|
||||
var response = await _httpClient.PostAsync(config.url, content);
|
||||
response.EnsureSuccessStatusCode();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user