mirror of
https://github.com/bitwarden/server
synced 2025-12-29 14:43:39 +00:00
[PM-17562] Update documentation for event integrations (#5924)
* [PM-17562] Update documentation for event integrations * Fix SonarQube suggestion, bring ASB event listener in line with integration listener * Apply suggestions from code review Co-authored-by: Matt Bishop <mbishop@bitwarden.com> * Updates to README - PR fixes, additional context, tense alignment * Fix the formatting for inlined code snippets * Add links to different sections; remove inline code formatting in favor of single bacticks for JSON --------- Co-authored-by: Matt Bishop <mbishop@bitwarden.com>
This commit is contained in:
@@ -0,0 +1,59 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
|
||||
public AzureServiceBusEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
IAzureServiceBusService serviceBusService,
|
||||
string subscriptionName,
|
||||
GlobalSettings globalSettings,
|
||||
ILogger<AzureServiceBusEventListenerService> logger) : base(handler, logger)
|
||||
{
|
||||
_processor = serviceBusService.CreateProcessor(
|
||||
globalSettings.EventLogging.AzureServiceBus.EventTopicName,
|
||||
subscriptionName,
|
||||
new ServiceBusProcessorOptions());
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += ProcessReceivedMessageAsync;
|
||||
_processor.ProcessErrorAsync += ProcessErrorAsync;
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
await _processor.DisposeAsync();
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
|
||||
{
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
private async Task ProcessReceivedMessageAsync(ProcessMessageEventArgs args)
|
||||
{
|
||||
await ProcessReceivedMessageAsync(Encoding.UTF8.GetString(args.Message.Body), args.Message.MessageId);
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,107 @@
|
||||
#nullable enable
|
||||
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusIntegrationListenerService : BackgroundService
|
||||
{
|
||||
private readonly int _maxRetries;
|
||||
private readonly IAzureServiceBusService _serviceBusService;
|
||||
private readonly IIntegrationHandler _handler;
|
||||
private readonly ServiceBusProcessor _processor;
|
||||
private readonly ILogger<AzureServiceBusIntegrationListenerService> _logger;
|
||||
|
||||
public AzureServiceBusIntegrationListenerService(IIntegrationHandler handler,
|
||||
string topicName,
|
||||
string subscriptionName,
|
||||
int maxRetries,
|
||||
IAzureServiceBusService serviceBusService,
|
||||
ILogger<AzureServiceBusIntegrationListenerService> logger)
|
||||
{
|
||||
_handler = handler;
|
||||
_logger = logger;
|
||||
_maxRetries = maxRetries;
|
||||
_serviceBusService = serviceBusService;
|
||||
|
||||
_processor = _serviceBusService.CreateProcessor(topicName, subscriptionName, new ServiceBusProcessorOptions());
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_processor.ProcessMessageAsync += HandleMessageAsync;
|
||||
_processor.ProcessErrorAsync += ProcessErrorAsync;
|
||||
|
||||
await _processor.StartProcessingAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _processor.StopProcessingAsync(cancellationToken);
|
||||
await _processor.DisposeAsync();
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
internal Task ProcessErrorAsync(ProcessErrorEventArgs args)
|
||||
{
|
||||
_logger.LogError(
|
||||
args.Exception,
|
||||
"An error occurred. Entity Path: {EntityPath}, Error Source: {ErrorSource}",
|
||||
args.EntityPath,
|
||||
args.ErrorSource
|
||||
);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
internal async Task<bool> HandleMessageAsync(string body)
|
||||
{
|
||||
try
|
||||
{
|
||||
var result = await _handler.HandleAsync(body);
|
||||
var message = result.Message;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
// Successful integration. Return true to indicate the message has been handled
|
||||
return true;
|
||||
}
|
||||
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (result.Retryable && message.RetryCount < _maxRetries)
|
||||
{
|
||||
// Publish message to the retry queue. It will be re-published for retry after a delay
|
||||
// Return true to indicate the message has been handled
|
||||
await _serviceBusService.PublishToRetryAsync(message);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
// Non-recoverable failure or exceeded the max number of retries
|
||||
// Return false to indicate this message should be dead-lettered
|
||||
return false;
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Unknown exception - log error, return true so the message will be acknowledged and not resent
|
||||
_logger.LogError(ex, "Unhandled error processing ASB message");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private async Task HandleMessageAsync(ProcessMessageEventArgs args)
|
||||
{
|
||||
var json = args.Message.Body.ToString();
|
||||
if (await HandleMessageAsync(json))
|
||||
{
|
||||
await args.CompleteMessageAsync(args.Message);
|
||||
}
|
||||
else
|
||||
{
|
||||
await args.DeadLetterMessageAsync(args.Message, "Retry limit exceeded or non-retryable");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
using Azure.Messaging.ServiceBus;
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class AzureServiceBusService : IAzureServiceBusService
|
||||
{
|
||||
private readonly ServiceBusClient _client;
|
||||
private readonly ServiceBusSender _eventSender;
|
||||
private readonly ServiceBusSender _integrationSender;
|
||||
|
||||
public AzureServiceBusService(GlobalSettings globalSettings)
|
||||
{
|
||||
_client = new ServiceBusClient(globalSettings.EventLogging.AzureServiceBus.ConnectionString);
|
||||
_eventSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.EventTopicName);
|
||||
_integrationSender = _client.CreateSender(globalSettings.EventLogging.AzureServiceBus.IntegrationTopicName);
|
||||
}
|
||||
|
||||
public ServiceBusProcessor CreateProcessor(string topicName, string subscriptionName, ServiceBusProcessorOptions options)
|
||||
{
|
||||
return _client.CreateProcessor(topicName, subscriptionName, options);
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
MessageId = message.MessageId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async Task PublishToRetryAsync(IIntegrationMessage message)
|
||||
{
|
||||
var json = message.ToJson();
|
||||
|
||||
var serviceBusMessage = new ServiceBusMessage(json)
|
||||
{
|
||||
Subject = message.IntegrationType.ToRoutingKey(),
|
||||
ScheduledEnqueueTime = message.DelayUntilDate ?? DateTime.UtcNow,
|
||||
MessageId = message.MessageId
|
||||
};
|
||||
|
||||
await _integrationSender.SendMessageAsync(serviceBusMessage);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
{
|
||||
var message = new ServiceBusMessage(body)
|
||||
{
|
||||
ContentType = "application/json",
|
||||
MessageId = Guid.NewGuid().ToString()
|
||||
};
|
||||
|
||||
await _eventSender.SendMessageAsync(message);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _eventSender.DisposeAsync();
|
||||
await _integrationSender.DisposeAsync();
|
||||
await _client.DisposeAsync();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,32 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.Models.Data;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
public class EventIntegrationEventWriteService : IEventWriteService, IAsyncDisposable
|
||||
{
|
||||
private readonly IEventIntegrationPublisher _eventIntegrationPublisher;
|
||||
|
||||
public EventIntegrationEventWriteService(IEventIntegrationPublisher eventIntegrationPublisher)
|
||||
{
|
||||
_eventIntegrationPublisher = eventIntegrationPublisher;
|
||||
}
|
||||
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(e);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> events)
|
||||
{
|
||||
var body = JsonSerializer.Serialize(events);
|
||||
await _eventIntegrationPublisher.PublishEventAsync(body: body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await _eventIntegrationPublisher.DisposeAsync();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,85 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text.Json;
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
using Bit.Core.AdminConsole.Utilities;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Models.Data;
|
||||
using Bit.Core.Repositories;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class EventIntegrationHandler<T>(
|
||||
IntegrationType integrationType,
|
||||
IEventIntegrationPublisher eventIntegrationPublisher,
|
||||
IOrganizationIntegrationConfigurationRepository configurationRepository,
|
||||
IUserRepository userRepository,
|
||||
IOrganizationRepository organizationRepository)
|
||||
: IEventMessageHandler
|
||||
{
|
||||
public async Task HandleEventAsync(EventMessage eventMessage)
|
||||
{
|
||||
if (eventMessage.OrganizationId is not Guid organizationId)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var configurations = await configurationRepository.GetConfigurationDetailsAsync(
|
||||
organizationId,
|
||||
integrationType,
|
||||
eventMessage.Type);
|
||||
|
||||
foreach (var configuration in configurations)
|
||||
{
|
||||
var template = configuration.Template ?? string.Empty;
|
||||
var context = await BuildContextAsync(eventMessage, template);
|
||||
var renderedTemplate = IntegrationTemplateProcessor.ReplaceTokens(template, context);
|
||||
var messageId = eventMessage.IdempotencyId ?? Guid.NewGuid();
|
||||
|
||||
var config = configuration.MergedConfiguration.Deserialize<T>()
|
||||
?? throw new InvalidOperationException($"Failed to deserialize to {typeof(T).Name}");
|
||||
|
||||
var message = new IntegrationMessage<T>
|
||||
{
|
||||
IntegrationType = integrationType,
|
||||
MessageId = messageId.ToString(),
|
||||
Configuration = config,
|
||||
RenderedTemplate = renderedTemplate,
|
||||
RetryCount = 0,
|
||||
DelayUntilDate = null
|
||||
};
|
||||
|
||||
await eventIntegrationPublisher.PublishAsync(message);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
|
||||
{
|
||||
foreach (var eventMessage in eventMessages)
|
||||
{
|
||||
await HandleEventAsync(eventMessage);
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IntegrationTemplateContext> BuildContextAsync(EventMessage eventMessage, string template)
|
||||
{
|
||||
var context = new IntegrationTemplateContext(eventMessage);
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresUser(template) && eventMessage.UserId.HasValue)
|
||||
{
|
||||
context.User = await userRepository.GetByIdAsync(eventMessage.UserId.Value);
|
||||
}
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresActingUser(template) && eventMessage.ActingUserId.HasValue)
|
||||
{
|
||||
context.ActingUser = await userRepository.GetByIdAsync(eventMessage.ActingUserId.Value);
|
||||
}
|
||||
|
||||
if (IntegrationTemplateProcessor.TemplateRequiresOrganization(template) && eventMessage.OrganizationId.HasValue)
|
||||
{
|
||||
context.Organization = await organizationRepository.GetByIdAsync(eventMessage.OrganizationId.Value);
|
||||
}
|
||||
|
||||
return context;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class EventRepositoryHandler(
|
||||
[FromKeyedServices("persistent")] IEventWriteService eventWriteService)
|
||||
: IEventMessageHandler
|
||||
{
|
||||
public Task HandleEventAsync(EventMessage eventMessage)
|
||||
{
|
||||
return eventWriteService.CreateAsync(eventMessage);
|
||||
}
|
||||
|
||||
public Task HandleManyEventsAsync(IEnumerable<EventMessage> eventMessages)
|
||||
{
|
||||
return eventWriteService.CreateManyAsync(eventMessages);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.Models.Data;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class EventRouteService(
|
||||
[FromKeyedServices("broadcast")] IEventWriteService broadcastEventWriteService,
|
||||
[FromKeyedServices("storage")] IEventWriteService storageEventWriteService,
|
||||
IFeatureService _featureService) : IEventWriteService
|
||||
{
|
||||
public async Task CreateAsync(IEvent e)
|
||||
{
|
||||
if (_featureService.IsEnabled(FeatureFlagKeys.EventBasedOrganizationIntegrations))
|
||||
{
|
||||
await broadcastEventWriteService.CreateAsync(e);
|
||||
}
|
||||
else
|
||||
{
|
||||
await storageEventWriteService.CreateAsync(e);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task CreateManyAsync(IEnumerable<IEvent> e)
|
||||
{
|
||||
if (_featureService.IsEnabled(FeatureFlagKeys.EventBasedOrganizationIntegrations))
|
||||
{
|
||||
await broadcastEventWriteService.CreateManyAsync(e);
|
||||
}
|
||||
else
|
||||
{
|
||||
await storageEventWriteService.CreateManyAsync(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,375 @@
|
||||
# Design goals
|
||||
|
||||
The main goal of event integrations is to easily enable adding new integrations over time without the need
|
||||
for a lot of custom work to expose events to a new integration. The ability of fan-out offered by AMQP
|
||||
(either in RabbitMQ or in Azure Service Bus) gives us a way to attach any number of new integrations to the
|
||||
existing event system without needing to add special handling. By adding a new listener to the existing
|
||||
pipeline, it gains an independent stream of events without the need for additional broadcast code.
|
||||
|
||||
We want to enable robust handling of failures and retries. By utilizing the two-tier approach
|
||||
([described below](#two-tier-exchange)), we build in support at the service level for retries. When we add
|
||||
new integrations, they can focus solely on the integration-specific logic and reporting status, with all the
|
||||
process of retries and delays managed by the messaging system.
|
||||
|
||||
Another goal is to not only support this functionality in the cloud version, but offer it as well to
|
||||
self-hosted instances. RabbitMQ provides a lightweight way for self-hosted instances to tie into the event system
|
||||
using the same robust architecture for integrations without the need for Azure Service Bus.
|
||||
|
||||
Finally, we want to offer organization admins flexibility and control over what events are significant, where
|
||||
to send events, and the data to be included in the message. The configuration architecture allows Organizations
|
||||
to customize details of a specific integration; see [Integrations and integration
|
||||
configurations](#integrations-and-integration-configurations) below for more details on the configuration piece.
|
||||
|
||||
# Architecture
|
||||
|
||||
The entry point for the event integrations is the `IEventWriteService`. By configuring the
|
||||
`EventIntegrationEventWriteService` as the `EventWriteService`, all events sent to the
|
||||
service are broadcast on the RabbitMQ or Azure Service Bus message exchange. To abstract away
|
||||
the specifics of publishing to a specific AMQP provider, an `IEventIntegrationPublisher`
|
||||
is injected into `EventIntegrationEventWriteService` to handle the publishing of events to the
|
||||
RabbitMQ or Azure Service Bus service.
|
||||
|
||||
## Two-tier exchange
|
||||
|
||||
When `EventIntegrationEventWriteService` publishes, it posts to the first tier of our two-tier
|
||||
approach to handling messages. Each tier is represented in the AMQP stack by a separate exchange
|
||||
(in RabbitMQ terminology) or topic (in Azure Service Bus).
|
||||
|
||||
``` mermaid
|
||||
flowchart TD
|
||||
B1[EventService]
|
||||
B2[EventIntegrationEventWriteService]
|
||||
B3[Event Exchange / Topic]
|
||||
B4[EventRepositoryHandler]
|
||||
B5[WebhookIntegrationHandler]
|
||||
B6[Events in Database / Azure Tables]
|
||||
B7[HTTP Server]
|
||||
B8[SlackIntegrationHandler]
|
||||
B9[Slack]
|
||||
B10[EventIntegrationHandler]
|
||||
B12[Integration Exchange / Topic]
|
||||
|
||||
B1 -->|IEventWriteService| B2 --> B3
|
||||
B3-->|EventListenerService| B4 --> B6
|
||||
B3-->|EventListenerService| B10
|
||||
B3-->|EventListenerService| B10
|
||||
B10 --> B12
|
||||
B12 -->|IntegrationListenerService| B5
|
||||
B12 -->|IntegrationListenerService| B8
|
||||
B5 -->|HTTP POST| B7
|
||||
B8 -->|HTTP POST| B9
|
||||
```
|
||||
|
||||
### Event tier
|
||||
|
||||
In the first tier, events are broadcast in a fan-out to a series of listeners. The message body
|
||||
is a JSON representation of an individual `EventMessage` or an array of `EventMessage`. Handlers at
|
||||
this level are responsible for handling each event or array of events. There are currently two handlers
|
||||
at this level:
|
||||
- `EventRepositoryHandler`
|
||||
- The `EventRepositoryHandler` is responsible for long term storage of events. It receives all events
|
||||
and stores them via an injected `IEventRepository` into the database.
|
||||
- This mirrors the behavior of when event integrations are turned off - cloud stores to Azure Tables
|
||||
and self-hosted is stored to the database.
|
||||
- `EventIntegrationHandler`
|
||||
- The `EventIntegrationHandler` is a generic class that is customized to each integration (via the
|
||||
configuration details of the integration) and is responsible for determining if there's a configuration
|
||||
for this event / organization / integration, fetching that configuration, and parsing the details of the
|
||||
event into a template string.
|
||||
- The `EventIntegrationHandler` uses the injected `IOrganizationIntegrationConfigurationRepository` to pull
|
||||
the specific set of configuration and template based on the event type, organization, and integration type.
|
||||
This configuration is what determines if an integration should be sent, what details are necessary for sending
|
||||
it, and the actual message to send.
|
||||
- The output of `EventIntegrationHandler` is a new `IntegrationMessage`, with the details of this
|
||||
the configuration necessary to interact with the integration and the message to send (with all the event
|
||||
details incorporated), published to the integration level of the message bus.
|
||||
|
||||
### Integration tier
|
||||
|
||||
At the integration level, messages are JSON representations of `IIntegrationMessage` - specifically they
|
||||
will be concrete types of the generic `IntegrationMessage<T>` where `<T>` is the configuration details of the
|
||||
specific integration for which they've been sent. These messages represent the details required for
|
||||
sending a specific event to a specific integration, including handling retries and delays.
|
||||
|
||||
Handlers at the integration level are tied directly to the integration (e.g. `SlackIntegrationHandler`,
|
||||
`WebhookIntegrationHandler`). These handlers take in `IntegrationMessage<T>` and output
|
||||
`IntegrationHandlerResult`, which tells the listener the outcome of the integration (e.g. success / fail,
|
||||
if it can be retried and any minimum delay that should occur). This makes them easy to unit test in isolation
|
||||
without any of the concerns of AMQP or messaging.
|
||||
|
||||
The listeners at this level are responsible for firing off the handler when a new message comes in and then
|
||||
taking the correct action based on the result. Successful results simply acknowledge the message and resolve.
|
||||
Failures will either be sent to the dead letter queue (DLQ) or re-published for retry after the correct amount of delay.
|
||||
|
||||
### Retries
|
||||
|
||||
One of the goals of introducing the integration level is to simplify and enable the process of multiple retries
|
||||
for a specific event integration. For instance, if a service is temporarily down, we don't want one of our handlers
|
||||
blocking the rest of the queue while it waits to retry. In addition, we don't want to retry _all_ integrations for a
|
||||
specific event if only one integration fails nor do we want to re-lookup the configuration details. By splitting
|
||||
out the `IntegrationMessage<T>` with the configuration, message, and details around retries, we can process each
|
||||
event / integration individually and retry easily.
|
||||
|
||||
When the `IntegrationHandlerResult.Success` is set to `false` (indicating that the integration attempt failed) the
|
||||
`Retryable` flag tells the listener whether this failure is temporary or final. If the `Retryable` is `false`, then
|
||||
the message is immediately sent to the DLQ. If it is `true`, the listener uses the `ApplyRetry(DateTime)` method
|
||||
in `IntegrationMessage` which handles both incrementing the `RetryCount` and updating the `DelayUntilDate` using
|
||||
the provided DateTime, but also adding exponential backoff (based on `RetryCount`) and jitter. The listener compares
|
||||
the `RetryCount` in the `IntegrationMessage` to see if it's over the `MaxRetries` defined in Global Settings. If it
|
||||
is over the `MaxRetries`, the message is sent to the DLQ. Otherwise, it is scheduled for retry.
|
||||
|
||||
``` mermaid
|
||||
flowchart TD
|
||||
A[Success == false] --> B{Retryable?}
|
||||
B -- No --> C[Send to Dead Letter Queue DLQ]
|
||||
B -- Yes --> D[Check RetryCount vs MaxRetries]
|
||||
D -->|RetryCount >= MaxRetries| E[Send to Dead Letter Queue DLQ]
|
||||
D -->|RetryCount < MaxRetries| F[Schedule for Retry]
|
||||
```
|
||||
|
||||
Azure Service Bus supports scheduling messages as part of its core functionality. Retries are scheduled to a specific
|
||||
time and then ASB holds the message and publishes it at the correct time.
|
||||
|
||||
#### RabbitMQ retry options
|
||||
|
||||
For RabbitMQ (which will be used by self-host only), we have two different options. The `useDelayPlugin` flag in
|
||||
`GlobalSettings.RabbitMqSettings` determines which one is used. If it is set to `true`, we use the delay plugin. It
|
||||
defaults to `false` which indicates we should use retry queues with a timing check.
|
||||
|
||||
1. Delay plugin
|
||||
- [Delay plugin GitHub repo](https://github.com/rabbitmq/rabbitmq-delayed-message-exchange)
|
||||
- This plugin enables a delayed message exchange in RabbitMQ that supports delaying a message for an amount
|
||||
of time specified in a special header.
|
||||
- This allows us to forego using any retry queues and rely instead on the delay exchange. When a message is
|
||||
marked with the header it gets published to the exchange and the exchange handles all the functionality of
|
||||
holding it until the appropriate time (similar to ASB's built-in support).
|
||||
- The plugin must be setup and enabled before turning this option on (which is why it defaults to off).
|
||||
|
||||
2. Retry queues + timing check
|
||||
- If the delay plugin setting is off, we push the message to a retry queue which has a fixed amount of time before
|
||||
it gets re-published back to the main queue.
|
||||
- When a message comes off the queue, we check to see if the `DelayUntilDate` has already passed.
|
||||
- If it has passed, we then handle the integration normally and retry the request.
|
||||
- If it is still in the future, we put the message back on the retry queue for an additional wait.
|
||||
- While this does use extra processing, it gives us better support for honoring the delays even if the delay plugin
|
||||
isn't enabled. Since this solution is only intended for self-host, it should be a pretty minimal impact with short
|
||||
delays and a small number of retries.
|
||||
|
||||
## Listener / Handler pattern
|
||||
|
||||
To make it easy to support multiple AMQP services (RabbitMQ and Azure Service Bus), the act
|
||||
of listening to the stream of messages is decoupled from the act of responding to a message.
|
||||
|
||||
### Listeners
|
||||
|
||||
- Listeners handle the details of the communication platform (i.e. RabbitMQ and Azure Service Bus).
|
||||
- There is one listener for each platform (RabbitMQ / ASB) for each of the two levels - i.e. one event listener
|
||||
and one integration listener.
|
||||
- Perform all the aspects of setup / teardown, subscription, message acknowledgement, etc. for the messaging platform,
|
||||
but do not directly process any events themselves. Instead, they delegate to the handler with which they
|
||||
are configured.
|
||||
- Multiple instances can be configured to run independently, each with its own handler and
|
||||
subscription / queue.
|
||||
|
||||
### Handlers
|
||||
|
||||
- One handler per queue / subscription (e.g. per integration at the integration level).
|
||||
- Completely isolated from and know nothing of the messaging platform in use. This allows them to be
|
||||
freely reused across different communication platforms.
|
||||
- Perform all aspects of handling an event.
|
||||
- Allows them to be highly testable as they are isolated and decoupled from the more complicated
|
||||
aspects of messaging.
|
||||
|
||||
This combination allows for a configuration inside of `ServiceCollectionExtensions.cs` that pairs
|
||||
instances of the listener service for the currently running messaging platform with any number of
|
||||
handlers. It also allows for quick development of new handlers as they are focused only on the
|
||||
task of handling a specific event.
|
||||
|
||||
## Publishers and Services
|
||||
|
||||
Listeners (and `EventIntegrationHandler`) interact with the messaging system via the `IEventPublisher` interface,
|
||||
which is backed by a RabbitMQ and ASB specific service. By placing most of the messaging platform details in the
|
||||
service layer, we are able to handle common things like configuring the connection, binding or creating a specific
|
||||
queue, etc. in one place. The `IRabbitMqService` and `IAzureServiceBusService` implement the `IEventPublisher`
|
||||
interface and therefore can also handle directly all the message publishing functionality.
|
||||
|
||||
## Integrations and integration configurations
|
||||
|
||||
Organizations can configure integration configurations to send events to different endpoints -- each
|
||||
handler maps to a specific integration and checks for the configuration when it receives an event.
|
||||
Currently, there are integrations / handlers for Slack and webhooks (as mentioned above).
|
||||
|
||||
### `OrganizationIntegration`
|
||||
|
||||
- The top-level object that enables a specific integration for the organization.
|
||||
- Includes any properties that apply to the entire integration across all events.
|
||||
- For Slack, it consists of the token: `{ "token": "xoxb-token-from-slack" }`
|
||||
- For webhooks, it is `null`. However, even though there is no configuration, an organization must
|
||||
have a webhook `OrganizationIntegration` to enable configuration via `OrganizationIntegrationConfiguration`.
|
||||
|
||||
### `OrganizationIntegrationConfiguration`
|
||||
|
||||
- This contains the configurations specific to each `EventType` for the integration.
|
||||
- `Configuration` contains the event-specific configuration.
|
||||
- For Slack, this would contain what channel to send the message to: `{ "channelId": "C123456" }`
|
||||
- For Webhook, this is the URL the request should be sent to: `{ "url": "https://api.example.com" }`
|
||||
- `Template` contains a template string that is expected to be filled in with the contents of the actual event.
|
||||
- The tokens in the string are wrapped in `#` characters. For instance, the UserId would be `#UserId#`.
|
||||
- The `IntegrationTemplateProcessor` does the actual work of replacing these tokens with introspected values from
|
||||
the provided `EventMessage`.
|
||||
- The template does not enforce any structure — it could be a freeform text message to send via Slack, or a
|
||||
JSON body to send via webhook; it is simply stored and used as a string for the most flexibility.
|
||||
|
||||
### `OrganizationIntegrationConfigurationDetails`
|
||||
|
||||
- This is the combination of both the `OrganizationIntegration` and `OrganizationIntegrationConfiguration` into
|
||||
a single object. The combined contents tell the integration's handler all the details needed to send to an
|
||||
external service.
|
||||
- An array of `OrganizationIntegrationConfigurationDetails` is what the `EventIntegrationHandler` fetches from
|
||||
the database to determine what to publish at the integration level.
|
||||
|
||||
# Building a new integration
|
||||
|
||||
These are all the pieces required in the process of building out a new integration. For
|
||||
clarity in naming, these assume a new integration called "Example".
|
||||
|
||||
## IntegrationType
|
||||
|
||||
Add a new type to `IntegrationType` for the new integration.
|
||||
|
||||
## Configuration Models
|
||||
|
||||
The configuration models are the classes that will determine what is stored in the database for
|
||||
`OrganizationIntegration` and `OrganizationIntegrationConfiguration`. The `Configuration` columns are the
|
||||
serialized version of the corresponding objects and represent the coonfiguration details for this integration
|
||||
and event type.
|
||||
|
||||
1. `ExampleIntegration`
|
||||
- Configuration details for the whole integration (e.g. a token in Slack).
|
||||
- Applies to every event type configuration defined for this integration.
|
||||
- Maps to the JSON structure stored in `Configuration` in ``OrganizationIntegration`.
|
||||
2. `ExampleIntegrationConfiguration`
|
||||
- Configuration details that could change from event to event (e.g. channelId in Slack).
|
||||
- Maps to the JSON structure stored in `Configuration` in `OrganizationIntegrationConfiguration`.
|
||||
3. `ExampleIntegrationConfigurationDetails`
|
||||
- Combined configuration of both Integration _and_ IntegrationConfiguration.
|
||||
- This will be the deserialized version of the `MergedConfiguration` in
|
||||
`OrganizationIntegrationConfigurationDetails`.
|
||||
|
||||
## Request Models
|
||||
|
||||
1. Add a new case to the switch method in `OrganizationIntegrationRequestModel.Validate`.
|
||||
2. Add a new case to the switch method in `OrganizationIntegrationConfigurationRequestModel.IsValidForType`.
|
||||
|
||||
## Integration Handler
|
||||
|
||||
e.g. `ExampleIntegrationHandler`
|
||||
- This is where the actual code will go to perform the integration (i.e. send an HTTP request, etc.).
|
||||
- Handlers receive an `IntegrationMessage<T>` where `<T>` is the `ExampleIntegrationConfigurationDetails`
|
||||
defined above. This has the Configuration as well as the rendered template message to be sent.
|
||||
- Handlers return an `IntegrationHandlerResult` with details about if the request - success / failure,
|
||||
if it can be retried, when it should be delayed until, etc.
|
||||
- The scope of the handler is simply to do the integration and report the result.
|
||||
Everything else (such as how many times to retry, when to retry, what to do with failures)
|
||||
is done in the Listener.
|
||||
|
||||
## GlobalSettings
|
||||
|
||||
### RabbitMQ
|
||||
Add the queue names for the integration. These are typically set with a default value so
|
||||
that they will be created when first accessed in code by RabbitMQ.
|
||||
|
||||
1. `ExampleEventQueueName`
|
||||
2. `ExampleIntegrationQueueName`
|
||||
3. `ExampleIntegrationRetryQueueName`
|
||||
|
||||
### Azure Service Bus
|
||||
Add the subscription names to use for ASB for this integration. Similar to RabbitMQ a
|
||||
default value is provided so that we don't require configuring it in secrets but allow
|
||||
it to be overridden. **However**, unlike RabbitMQ these subscriptions must exist prior
|
||||
to the code accessing them. They will not be created on the fly. See [Deploying a new
|
||||
integration](#deploying-a-new-integration) below
|
||||
|
||||
1. `ExmpleEventSubscriptionName`
|
||||
2. `ExmpleIntegrationSubscriptionName`
|
||||
|
||||
#### Service Bus Emulator, local config
|
||||
In order to create ASB resources locally, we need to also update the `servicebusemulator_config.json` file
|
||||
to include any new subscriptions.
|
||||
- Under the existing event topic (`event-logging`) add a subscription for the event level for this
|
||||
new integration (`events-example-subscription`).
|
||||
- Under the existing integration topic (`event-integrations`) add a new subscription for the integration
|
||||
level messages (`integration-example-subscription`).
|
||||
- Copy the correlation filter from the other integration level subscriptions. It should filter based on
|
||||
the `IntegrationType.ToRoutingKey`, or in this example `example`.
|
||||
|
||||
These names added here are what must match the values provided in the secrets or the defaults provided
|
||||
in Global Settings. This must be in place (and the local ASB emulator restarted) before you can use any
|
||||
code locally that accesses ASB resources.
|
||||
|
||||
## ServiceCollectionExtensions
|
||||
In our `ServiceCollectionExtensions`, we pull all the above pieces together to start listeners on each message
|
||||
tier with handlers to process the integration. There are a number of helper methods in here to make this simple
|
||||
to add a new integration - one call per platform.
|
||||
|
||||
Also note that if an integration needs a custom singleton / service defined, the add listeners method is a
|
||||
good place to set that up. For instance, `SlackIntegrationHandler` needs a `SlackService`, so the singleton
|
||||
declaration is right above the add integration method for slack. Same thing for webhooks when it comes to
|
||||
defining a custom HttpClient by name.
|
||||
|
||||
1. In `AddRabbitMqListeners` add the integration:
|
||||
``` csharp
|
||||
services.AddRabbitMqIntegration<ExampleIntegrationConfigurationDetails, ExampleIntegrationHandler>(
|
||||
globalSettings.EventLogging.RabbitMq.ExampleEventsQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.ExampleIntegrationQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.ExampleIntegrationRetryQueueName,
|
||||
globalSettings.EventLogging.RabbitMq.MaxRetries,
|
||||
IntegrationType.Example);
|
||||
```
|
||||
|
||||
2. In `AddAzureServiceBusListeners` add the integration:
|
||||
``` csharp
|
||||
services.AddAzureServiceBusIntegration<ExampleIntegrationConfigurationDetails, ExampleIntegrationHandler>(
|
||||
eventSubscriptionName: globalSettings.EventLogging.AzureServiceBus.ExampleEventSubscriptionName,
|
||||
integrationSubscriptionName: globalSettings.EventLogging.AzureServiceBus.ExampleIntegrationSubscriptionName,
|
||||
integrationType: IntegrationType.Example,
|
||||
globalSettings: globalSettings);
|
||||
```
|
||||
|
||||
# Deploying a new integration
|
||||
|
||||
## RabbitMQ
|
||||
|
||||
RabbitMQ dynamically creates queues and exchanges when they are first accessed in code.
|
||||
Therefore, there is no need to manually create queues when deploying a new integration.
|
||||
They can be created and configured ahead of time, but it's not required. Note that once
|
||||
they are created, if any configurations need to be changed, the queue or exchange must be
|
||||
deleted and recreated.
|
||||
|
||||
## Azure Service Bus
|
||||
|
||||
Unlike RabbitMQ, ASB resources **must** be allocated before the code accesses them and
|
||||
will not be created on the fly. This means that any subscriptions needed for a new
|
||||
integration must be created in ASB before that code is deployed.
|
||||
|
||||
The two subscriptions created above in Global Settings and `servicebusemulator_config.json`
|
||||
need to be created in the Azure portal or CLI for the environment before deploying the
|
||||
code.
|
||||
|
||||
1. `ExmpleEventSubscriptionName`
|
||||
- This subscription is a fan-out subscription from the main event topic.
|
||||
- As such, it will start receiving all the events as soon as it is declared.
|
||||
- This can create a backlog before the integration-specific handler is declared and deployed.
|
||||
- One strategy to avoid this is to create the subscription with a false filter (e.g. `1 = 0`).
|
||||
- This will create the subscription, but the filter will ensure that no messages
|
||||
actually land in the subscription.
|
||||
- Code can be deployed that references the subscription, because the subscription
|
||||
legitimately exists (it is simply empty).
|
||||
- When the code is in place, and we're ready to start receiving messages on the new
|
||||
integration, we simply remove the filter to return the subscription to receiving
|
||||
all messages via fan-out.
|
||||
2. `ExmpleIntegrationSubscriptionName`
|
||||
- This subscription must be created before the new integration code can be deployed.
|
||||
- However, it is not fan-out, but rather a filter based on the `IntegrationType.ToRoutingKey`.
|
||||
- Therefore, it won't start receiving messages until organizations have active configurations.
|
||||
This means there's no risk of building up a backlog by declaring it ahead of time.
|
||||
@@ -0,0 +1,68 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqEventListenerService : EventLoggingListenerService
|
||||
{
|
||||
private readonly Lazy<Task<IChannel>> _lazyChannel;
|
||||
private readonly string _queueName;
|
||||
private readonly IRabbitMqService _rabbitMqService;
|
||||
|
||||
public RabbitMqEventListenerService(
|
||||
IEventMessageHandler handler,
|
||||
string queueName,
|
||||
IRabbitMqService rabbitMqService,
|
||||
ILogger<RabbitMqEventListenerService> logger) : base(handler, logger)
|
||||
{
|
||||
_logger = logger;
|
||||
_queueName = queueName;
|
||||
_rabbitMqService = rabbitMqService;
|
||||
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _rabbitMqService.CreateEventQueueAsync(_queueName, cancellationToken);
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, eventArgs) => { await ProcessReceivedMessageAsync(eventArgs); };
|
||||
|
||||
await channel.BasicConsumeAsync(_queueName, autoAck: true, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs eventArgs)
|
||||
{
|
||||
await ProcessReceivedMessageAsync(
|
||||
Encoding.UTF8.GetString(eventArgs.Body.Span),
|
||||
eventArgs.BasicProperties.MessageId);
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_lazyChannel.IsValueCreated)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
await channel.CloseAsync(cancellationToken);
|
||||
}
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
|
||||
{
|
||||
_lazyChannel.Value.Result.Dispose();
|
||||
}
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqIntegrationListenerService : BackgroundService
|
||||
{
|
||||
private readonly int _maxRetries;
|
||||
private readonly string _queueName;
|
||||
private readonly string _routingKey;
|
||||
private readonly string _retryQueueName;
|
||||
private readonly IIntegrationHandler _handler;
|
||||
private readonly Lazy<Task<IChannel>> _lazyChannel;
|
||||
private readonly IRabbitMqService _rabbitMqService;
|
||||
private readonly ILogger<RabbitMqIntegrationListenerService> _logger;
|
||||
|
||||
public RabbitMqIntegrationListenerService(IIntegrationHandler handler,
|
||||
string routingKey,
|
||||
string queueName,
|
||||
string retryQueueName,
|
||||
int maxRetries,
|
||||
IRabbitMqService rabbitMqService,
|
||||
ILogger<RabbitMqIntegrationListenerService> logger)
|
||||
{
|
||||
_handler = handler;
|
||||
_routingKey = routingKey;
|
||||
_retryQueueName = retryQueueName;
|
||||
_queueName = queueName;
|
||||
_rabbitMqService = rabbitMqService;
|
||||
_logger = logger;
|
||||
_maxRetries = maxRetries;
|
||||
_lazyChannel = new Lazy<Task<IChannel>>(() => _rabbitMqService.CreateChannelAsync());
|
||||
}
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
await _rabbitMqService.CreateIntegrationQueuesAsync(
|
||||
_queueName,
|
||||
_retryQueueName,
|
||||
_routingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
await base.StartAsync(cancellationToken);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
var consumer = new AsyncEventingBasicConsumer(channel);
|
||||
consumer.ReceivedAsync += async (_, ea) =>
|
||||
{
|
||||
await ProcessReceivedMessageAsync(ea, cancellationToken);
|
||||
};
|
||||
|
||||
await channel.BasicConsumeAsync(queue: _queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
internal async Task ProcessReceivedMessageAsync(BasicDeliverEventArgs ea, CancellationToken cancellationToken)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
try
|
||||
{
|
||||
var json = Encoding.UTF8.GetString(ea.Body.Span);
|
||||
|
||||
// Determine if the message came off of the retry queue too soon
|
||||
// If so, place it back on the retry queue
|
||||
var integrationMessage = JsonSerializer.Deserialize<IntegrationMessage>(json);
|
||||
if (integrationMessage is not null &&
|
||||
integrationMessage.DelayUntilDate.HasValue &&
|
||||
integrationMessage.DelayUntilDate.Value > DateTime.UtcNow)
|
||||
{
|
||||
await _rabbitMqService.RepublishToRetryQueueAsync(channel, ea);
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
var result = await _handler.HandleAsync(json);
|
||||
var message = result.Message;
|
||||
|
||||
if (result.Success)
|
||||
{
|
||||
// Successful integration send. Acknowledge message delivery and return
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
return;
|
||||
}
|
||||
|
||||
if (result.Retryable)
|
||||
{
|
||||
// Integration failed, but is retryable - apply delay and check max retries
|
||||
message.ApplyRetry(result.DelayUntilDate);
|
||||
|
||||
if (message.RetryCount < _maxRetries)
|
||||
{
|
||||
// Publish message to the retry queue. It will be re-published for retry after a delay
|
||||
await _rabbitMqService.PublishToRetryAsync(channel, message, cancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Exceeded the max number of retries; fail and send to dead letter queue
|
||||
await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
|
||||
_logger.LogWarning("Max retry attempts reached. Sent to DLQ.");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Fatal error (i.e. not retryable) occurred. Send message to dead letter queue without any retries
|
||||
await _rabbitMqService.PublishToDeadLetterAsync(channel, message, cancellationToken);
|
||||
_logger.LogWarning("Non-retryable failure. Sent to DLQ.");
|
||||
}
|
||||
|
||||
// Message has been sent to retry or dead letter queues.
|
||||
// Acknowledge receipt so Rabbit knows it's been processed
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// Unknown error occurred. Acknowledge so Rabbit doesn't keep attempting. Log the error
|
||||
_logger.LogError(ex, "Unhandled error processing integration message.");
|
||||
await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public override async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
if (_lazyChannel.IsValueCreated)
|
||||
{
|
||||
var channel = await _lazyChannel.Value;
|
||||
await channel.CloseAsync(cancellationToken);
|
||||
}
|
||||
await base.StopAsync(cancellationToken);
|
||||
}
|
||||
|
||||
public override void Dispose()
|
||||
{
|
||||
if (_lazyChannel.IsValueCreated && _lazyChannel.Value.IsCompletedSuccessfully)
|
||||
{
|
||||
_lazyChannel.Value.Result.Dispose();
|
||||
}
|
||||
base.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,244 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Text;
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
using Bit.Core.Enums;
|
||||
using Bit.Core.Settings;
|
||||
using RabbitMQ.Client;
|
||||
using RabbitMQ.Client.Events;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class RabbitMqService : IRabbitMqService
|
||||
{
|
||||
private const string _deadLetterRoutingKey = "dead-letter";
|
||||
|
||||
private readonly ConnectionFactory _factory;
|
||||
private readonly Lazy<Task<IConnection>> _lazyConnection;
|
||||
private readonly string _deadLetterQueueName;
|
||||
private readonly string _eventExchangeName;
|
||||
private readonly string _integrationExchangeName;
|
||||
private readonly int _retryTiming;
|
||||
private readonly bool _useDelayPlugin;
|
||||
|
||||
public RabbitMqService(GlobalSettings globalSettings)
|
||||
{
|
||||
_factory = new ConnectionFactory
|
||||
{
|
||||
HostName = globalSettings.EventLogging.RabbitMq.HostName,
|
||||
UserName = globalSettings.EventLogging.RabbitMq.Username,
|
||||
Password = globalSettings.EventLogging.RabbitMq.Password
|
||||
};
|
||||
_deadLetterQueueName = globalSettings.EventLogging.RabbitMq.IntegrationDeadLetterQueueName;
|
||||
_eventExchangeName = globalSettings.EventLogging.RabbitMq.EventExchangeName;
|
||||
_integrationExchangeName = globalSettings.EventLogging.RabbitMq.IntegrationExchangeName;
|
||||
_retryTiming = globalSettings.EventLogging.RabbitMq.RetryTiming;
|
||||
_useDelayPlugin = globalSettings.EventLogging.RabbitMq.UseDelayPlugin;
|
||||
|
||||
_lazyConnection = new Lazy<Task<IConnection>>(CreateConnectionAsync);
|
||||
}
|
||||
|
||||
public async Task<IChannel> CreateChannelAsync(CancellationToken cancellationToken = default)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
return await connection.CreateChannelAsync(cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task CreateEventQueueAsync(string queueName, CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var channel = await CreateChannelAsync(cancellationToken);
|
||||
await channel.QueueDeclareAsync(queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(queue: queueName,
|
||||
exchange: _eventExchangeName,
|
||||
routingKey: string.Empty,
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task CreateIntegrationQueuesAsync(
|
||||
string queueName,
|
||||
string retryQueueName,
|
||||
string routingKey,
|
||||
CancellationToken cancellationToken = default)
|
||||
{
|
||||
using var channel = await CreateChannelAsync(cancellationToken);
|
||||
var retryRoutingKey = $"{routingKey}-retry";
|
||||
|
||||
// Declare main integration queue
|
||||
await channel.QueueDeclareAsync(
|
||||
queue: queueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null,
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(
|
||||
queue: queueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: routingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
|
||||
if (!_useDelayPlugin)
|
||||
{
|
||||
// Declare retry queue (Configurable TTL, dead-letters back to main queue)
|
||||
// Only needed if NOT using delay plugin
|
||||
await channel.QueueDeclareAsync(queue: retryQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: new Dictionary<string, object?>
|
||||
{
|
||||
{ "x-dead-letter-exchange", _integrationExchangeName },
|
||||
{ "x-dead-letter-routing-key", routingKey },
|
||||
{ "x-message-ttl", _retryTiming }
|
||||
},
|
||||
cancellationToken: cancellationToken);
|
||||
await channel.QueueBindAsync(queue: retryQueueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: retryRoutingKey,
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
public async Task PublishAsync(IIntegrationMessage message)
|
||||
{
|
||||
var routingKey = message.IntegrationType.ToRoutingKey();
|
||||
await using var channel = await CreateChannelAsync();
|
||||
|
||||
var body = Encoding.UTF8.GetBytes(message.ToJson());
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = message.MessageId,
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: routingKey,
|
||||
body: body);
|
||||
}
|
||||
|
||||
public async Task PublishEventAsync(string body)
|
||||
{
|
||||
await using var channel = await CreateChannelAsync();
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = Guid.NewGuid().ToString(),
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _eventExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: string.Empty,
|
||||
body: Encoding.UTF8.GetBytes(body));
|
||||
}
|
||||
|
||||
public async Task PublishToRetryAsync(IChannel channel, IIntegrationMessage message, CancellationToken cancellationToken)
|
||||
{
|
||||
var routingKey = message.IntegrationType.ToRoutingKey();
|
||||
var retryRoutingKey = $"{routingKey}-retry";
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
Persistent = true,
|
||||
MessageId = message.MessageId,
|
||||
Headers = _useDelayPlugin && message.DelayUntilDate.HasValue ?
|
||||
new Dictionary<string, object?>
|
||||
{
|
||||
["x-delay"] = Math.Max((int)(message.DelayUntilDate.Value - DateTime.UtcNow).TotalMilliseconds, 0)
|
||||
} :
|
||||
null
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: _useDelayPlugin ? routingKey : retryRoutingKey,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
body: Encoding.UTF8.GetBytes(message.ToJson()),
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task PublishToDeadLetterAsync(
|
||||
IChannel channel,
|
||||
IIntegrationMessage message,
|
||||
CancellationToken cancellationToken)
|
||||
{
|
||||
var properties = new BasicProperties
|
||||
{
|
||||
MessageId = message.MessageId,
|
||||
Persistent = true
|
||||
};
|
||||
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
mandatory: true,
|
||||
basicProperties: properties,
|
||||
routingKey: _deadLetterRoutingKey,
|
||||
body: Encoding.UTF8.GetBytes(message.ToJson()),
|
||||
cancellationToken: cancellationToken);
|
||||
}
|
||||
|
||||
public async Task RepublishToRetryQueueAsync(IChannel channel, BasicDeliverEventArgs eventArgs)
|
||||
{
|
||||
await channel.BasicPublishAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: eventArgs.RoutingKey,
|
||||
mandatory: true,
|
||||
basicProperties: new BasicProperties(eventArgs.BasicProperties),
|
||||
body: eventArgs.Body);
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_lazyConnection.IsValueCreated)
|
||||
{
|
||||
var connection = await _lazyConnection.Value;
|
||||
await connection.DisposeAsync();
|
||||
}
|
||||
}
|
||||
|
||||
private async Task<IConnection> CreateConnectionAsync()
|
||||
{
|
||||
var connection = await _factory.CreateConnectionAsync();
|
||||
using var channel = await connection.CreateChannelAsync();
|
||||
|
||||
// Declare Exchanges
|
||||
await channel.ExchangeDeclareAsync(exchange: _eventExchangeName, type: ExchangeType.Fanout, durable: true);
|
||||
if (_useDelayPlugin)
|
||||
{
|
||||
await channel.ExchangeDeclareAsync(
|
||||
exchange: _integrationExchangeName,
|
||||
type: "x-delayed-message",
|
||||
durable: true,
|
||||
arguments: new Dictionary<string, object?>
|
||||
{
|
||||
{ "x-delayed-type", "direct" }
|
||||
}
|
||||
);
|
||||
}
|
||||
else
|
||||
{
|
||||
await channel.ExchangeDeclareAsync(exchange: _integrationExchangeName, type: ExchangeType.Direct, durable: true);
|
||||
}
|
||||
|
||||
// Declare dead letter queue for Integration exchange
|
||||
await channel.QueueDeclareAsync(queue: _deadLetterQueueName,
|
||||
durable: true,
|
||||
exclusive: false,
|
||||
autoDelete: false,
|
||||
arguments: null);
|
||||
await channel.QueueBindAsync(queue: _deadLetterQueueName,
|
||||
exchange: _integrationExchangeName,
|
||||
routingKey: _deadLetterRoutingKey);
|
||||
|
||||
return connection;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,21 @@
|
||||
#nullable enable
|
||||
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class SlackIntegrationHandler(
|
||||
ISlackService slackService)
|
||||
: IntegrationHandlerBase<SlackIntegrationConfigurationDetails>
|
||||
{
|
||||
public override async Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<SlackIntegrationConfigurationDetails> message)
|
||||
{
|
||||
await slackService.SendSlackMessageByChannelIdAsync(
|
||||
message.Configuration.token,
|
||||
message.RenderedTemplate,
|
||||
message.Configuration.channelId
|
||||
);
|
||||
|
||||
return new IntegrationHandlerResult(success: true, message: message);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,174 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Net.Http.Headers;
|
||||
using System.Net.Http.Json;
|
||||
using System.Web;
|
||||
using Bit.Core.Models.Slack;
|
||||
using Bit.Core.Settings;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class SlackService(
|
||||
IHttpClientFactory httpClientFactory,
|
||||
GlobalSettings globalSettings,
|
||||
ILogger<SlackService> logger) : ISlackService
|
||||
{
|
||||
private readonly HttpClient _httpClient = httpClientFactory.CreateClient(HttpClientName);
|
||||
private readonly string _clientId = globalSettings.Slack.ClientId;
|
||||
private readonly string _clientSecret = globalSettings.Slack.ClientSecret;
|
||||
private readonly string _scopes = globalSettings.Slack.Scopes;
|
||||
private readonly string _slackApiBaseUrl = globalSettings.Slack.ApiBaseUrl;
|
||||
|
||||
public const string HttpClientName = "SlackServiceHttpClient";
|
||||
|
||||
public async Task<string> GetChannelIdAsync(string token, string channelName)
|
||||
{
|
||||
return (await GetChannelIdsAsync(token, [channelName])).FirstOrDefault() ?? string.Empty;
|
||||
}
|
||||
|
||||
public async Task<List<string>> GetChannelIdsAsync(string token, List<string> channelNames)
|
||||
{
|
||||
var matchingChannelIds = new List<string>();
|
||||
var baseUrl = $"{_slackApiBaseUrl}/conversations.list";
|
||||
var nextCursor = string.Empty;
|
||||
|
||||
do
|
||||
{
|
||||
var uriBuilder = new UriBuilder(baseUrl);
|
||||
var queryParameters = HttpUtility.ParseQueryString(uriBuilder.Query);
|
||||
queryParameters["types"] = "public_channel,private_channel";
|
||||
queryParameters["limit"] = "1000";
|
||||
if (!string.IsNullOrEmpty(nextCursor))
|
||||
{
|
||||
queryParameters["cursor"] = nextCursor;
|
||||
}
|
||||
uriBuilder.Query = queryParameters.ToString();
|
||||
|
||||
var request = new HttpRequestMessage(HttpMethod.Get, uriBuilder.Uri);
|
||||
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
|
||||
var response = await _httpClient.SendAsync(request);
|
||||
var result = await response.Content.ReadFromJsonAsync<SlackChannelListResponse>();
|
||||
|
||||
if (result is { Ok: true })
|
||||
{
|
||||
matchingChannelIds.AddRange(result.Channels
|
||||
.Where(channel => channelNames.Contains(channel.Name))
|
||||
.Select(channel => channel.Id));
|
||||
nextCursor = result.ResponseMetadata.NextCursor;
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogError("Error getting Channel Ids: {Error}", result?.Error ?? "Unknown Error");
|
||||
nextCursor = string.Empty;
|
||||
}
|
||||
|
||||
} while (!string.IsNullOrEmpty(nextCursor));
|
||||
|
||||
return matchingChannelIds;
|
||||
}
|
||||
|
||||
public async Task<string> GetDmChannelByEmailAsync(string token, string email)
|
||||
{
|
||||
var userId = await GetUserIdByEmailAsync(token, email);
|
||||
return await OpenDmChannel(token, userId);
|
||||
}
|
||||
|
||||
public string GetRedirectUrl(string redirectUrl)
|
||||
{
|
||||
return $"https://slack.com/oauth/v2/authorize?client_id={_clientId}&scope={_scopes}&redirect_uri={redirectUrl}";
|
||||
}
|
||||
|
||||
public async Task<string> ObtainTokenViaOAuth(string code, string redirectUrl)
|
||||
{
|
||||
var tokenResponse = await _httpClient.PostAsync($"{_slackApiBaseUrl}/oauth.v2.access",
|
||||
new FormUrlEncodedContent(new[]
|
||||
{
|
||||
new KeyValuePair<string, string>("client_id", _clientId),
|
||||
new KeyValuePair<string, string>("client_secret", _clientSecret),
|
||||
new KeyValuePair<string, string>("code", code),
|
||||
new KeyValuePair<string, string>("redirect_uri", redirectUrl)
|
||||
}));
|
||||
|
||||
SlackOAuthResponse? result;
|
||||
try
|
||||
{
|
||||
result = await tokenResponse.Content.ReadFromJsonAsync<SlackOAuthResponse>();
|
||||
}
|
||||
catch
|
||||
{
|
||||
result = null;
|
||||
}
|
||||
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error obtaining token via OAuth: Unknown error");
|
||||
return string.Empty;
|
||||
}
|
||||
if (!result.Ok)
|
||||
{
|
||||
logger.LogError("Error obtaining token via OAuth: {Error}", result.Error);
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
return result.AccessToken;
|
||||
}
|
||||
|
||||
public async Task SendSlackMessageByChannelIdAsync(string token, string message, string channelId)
|
||||
{
|
||||
var payload = JsonContent.Create(new { channel = channelId, text = message });
|
||||
var request = new HttpRequestMessage(HttpMethod.Post, $"{_slackApiBaseUrl}/chat.postMessage");
|
||||
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
request.Content = payload;
|
||||
|
||||
await _httpClient.SendAsync(request);
|
||||
}
|
||||
|
||||
private async Task<string> GetUserIdByEmailAsync(string token, string email)
|
||||
{
|
||||
var request = new HttpRequestMessage(HttpMethod.Get, $"{_slackApiBaseUrl}/users.lookupByEmail?email={email}");
|
||||
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
var response = await _httpClient.SendAsync(request);
|
||||
var result = await response.Content.ReadFromJsonAsync<SlackUserResponse>();
|
||||
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error retrieving Slack user ID: Unknown error");
|
||||
return string.Empty;
|
||||
}
|
||||
if (!result.Ok)
|
||||
{
|
||||
logger.LogError("Error retrieving Slack user ID: {Error}", result.Error);
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
return result.User.Id;
|
||||
}
|
||||
|
||||
private async Task<string> OpenDmChannel(string token, string userId)
|
||||
{
|
||||
if (string.IsNullOrEmpty(userId))
|
||||
return string.Empty;
|
||||
|
||||
var payload = JsonContent.Create(new { users = userId });
|
||||
var request = new HttpRequestMessage(HttpMethod.Post, $"{_slackApiBaseUrl}/conversations.open");
|
||||
request.Headers.Authorization = new AuthenticationHeaderValue("Bearer", token);
|
||||
request.Content = payload;
|
||||
var response = await _httpClient.SendAsync(request);
|
||||
var result = await response.Content.ReadFromJsonAsync<SlackDmResponse>();
|
||||
|
||||
if (result is null)
|
||||
{
|
||||
logger.LogError("Error opening DM channel: Unknown error");
|
||||
return string.Empty;
|
||||
}
|
||||
if (!result.Ok)
|
||||
{
|
||||
logger.LogError("Error opening DM channel: {Error}", result.Error);
|
||||
return string.Empty;
|
||||
}
|
||||
|
||||
return result.Channel.Id;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,63 @@
|
||||
#nullable enable
|
||||
|
||||
using System.Globalization;
|
||||
using System.Net;
|
||||
using System.Text;
|
||||
using Bit.Core.AdminConsole.Models.Data.EventIntegrations;
|
||||
|
||||
#nullable enable
|
||||
|
||||
namespace Bit.Core.Services;
|
||||
|
||||
public class WebhookIntegrationHandler(IHttpClientFactory httpClientFactory)
|
||||
: IntegrationHandlerBase<WebhookIntegrationConfigurationDetails>
|
||||
{
|
||||
private readonly HttpClient _httpClient = httpClientFactory.CreateClient(HttpClientName);
|
||||
|
||||
public const string HttpClientName = "WebhookIntegrationHandlerHttpClient";
|
||||
|
||||
public override async Task<IntegrationHandlerResult> HandleAsync(IntegrationMessage<WebhookIntegrationConfigurationDetails> message)
|
||||
{
|
||||
var content = new StringContent(message.RenderedTemplate, Encoding.UTF8, "application/json");
|
||||
var response = await _httpClient.PostAsync(message.Configuration.url, content);
|
||||
var result = new IntegrationHandlerResult(success: response.IsSuccessStatusCode, message);
|
||||
|
||||
switch (response.StatusCode)
|
||||
{
|
||||
case HttpStatusCode.TooManyRequests:
|
||||
case HttpStatusCode.RequestTimeout:
|
||||
case HttpStatusCode.InternalServerError:
|
||||
case HttpStatusCode.BadGateway:
|
||||
case HttpStatusCode.ServiceUnavailable:
|
||||
case HttpStatusCode.GatewayTimeout:
|
||||
result.Retryable = true;
|
||||
result.FailureReason = response.ReasonPhrase ?? $"Failure with status code: {(int)response.StatusCode}";
|
||||
|
||||
if (response.Headers.TryGetValues("Retry-After", out var values))
|
||||
{
|
||||
var value = values.FirstOrDefault();
|
||||
if (int.TryParse(value, out var seconds))
|
||||
{
|
||||
// Retry-after was specified in seconds. Adjust DelayUntilDate by the requested number of seconds.
|
||||
result.DelayUntilDate = DateTime.UtcNow.AddSeconds(seconds);
|
||||
}
|
||||
else if (DateTimeOffset.TryParseExact(value,
|
||||
"r", // "r" is the round-trip format: RFC1123
|
||||
CultureInfo.InvariantCulture,
|
||||
DateTimeStyles.AssumeUniversal | DateTimeStyles.AdjustToUniversal,
|
||||
out var retryDate))
|
||||
{
|
||||
// Retry-after was specified as a date. Adjust DelayUntilDate to the specified date.
|
||||
result.DelayUntilDate = retryDate.UtcDateTime;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
result.Retryable = false;
|
||||
result.FailureReason = response.ReasonPhrase ?? $"Failure with status code {(int)response.StatusCode}";
|
||||
break;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user