mirror of
https://github.com/bitwarden/server
synced 2026-01-06 18:43:36 +00:00
[PM-28265] storage reconciliation job (#6615)
This commit is contained in:
36
src/Billing/Controllers/JobsController.cs
Normal file
36
src/Billing/Controllers/JobsController.cs
Normal file
@@ -0,0 +1,36 @@
|
||||
using Bit.Billing.Jobs;
|
||||
using Bit.Core.Utilities;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
|
||||
namespace Bit.Billing.Controllers;
|
||||
|
||||
[Route("jobs")]
|
||||
[SelfHosted(NotSelfHostedOnly = true)]
|
||||
[RequireLowerEnvironment]
|
||||
public class JobsController(
|
||||
JobsHostedService jobsHostedService) : Controller
|
||||
{
|
||||
[HttpPost("run/{jobName}")]
|
||||
public async Task<IActionResult> RunJobAsync(string jobName)
|
||||
{
|
||||
if (jobName == nameof(ReconcileAdditionalStorageJob))
|
||||
{
|
||||
await jobsHostedService.RunJobAdHocAsync<ReconcileAdditionalStorageJob>();
|
||||
return Ok(new { message = $"Job {jobName} scheduled successfully" });
|
||||
}
|
||||
|
||||
return BadRequest(new { error = $"Unknown job name: {jobName}" });
|
||||
}
|
||||
|
||||
[HttpPost("stop/{jobName}")]
|
||||
public async Task<IActionResult> StopJobAsync(string jobName)
|
||||
{
|
||||
if (jobName == nameof(ReconcileAdditionalStorageJob))
|
||||
{
|
||||
await jobsHostedService.InterruptAdHocJobAsync<ReconcileAdditionalStorageJob>();
|
||||
return Ok(new { message = $"Job {jobName} queued for cancellation" });
|
||||
}
|
||||
|
||||
return BadRequest(new { error = $"Unknown job name: {jobName}" });
|
||||
}
|
||||
}
|
||||
@@ -10,4 +10,13 @@ public class AliveJob(ILogger<AliveJob> logger) : BaseJob(logger)
|
||||
_logger.LogInformation(Core.Constants.BypassFiltersEventId, null, "Billing service is alive!");
|
||||
return Task.FromResult(0);
|
||||
}
|
||||
|
||||
public static ITrigger GetTrigger()
|
||||
{
|
||||
return TriggerBuilder.Create()
|
||||
.WithIdentity("EveryTopOfTheHourTrigger")
|
||||
.StartNow()
|
||||
.WithCronSchedule("0 0 * * * ?")
|
||||
.Build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,29 +1,27 @@
|
||||
using Bit.Core.Jobs;
|
||||
using Bit.Core.Exceptions;
|
||||
using Bit.Core.Jobs;
|
||||
using Bit.Core.Settings;
|
||||
using Quartz;
|
||||
|
||||
namespace Bit.Billing.Jobs;
|
||||
|
||||
public class JobsHostedService : BaseJobsHostedService
|
||||
public class JobsHostedService(
|
||||
GlobalSettings globalSettings,
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<JobsHostedService> logger,
|
||||
ILogger<JobListener> listenerLogger,
|
||||
ISchedulerFactory schedulerFactory)
|
||||
: BaseJobsHostedService(globalSettings, serviceProvider, logger, listenerLogger)
|
||||
{
|
||||
public JobsHostedService(
|
||||
GlobalSettings globalSettings,
|
||||
IServiceProvider serviceProvider,
|
||||
ILogger<JobsHostedService> logger,
|
||||
ILogger<JobListener> listenerLogger)
|
||||
: base(globalSettings, serviceProvider, logger, listenerLogger) { }
|
||||
private List<JobKey> AdHocJobKeys { get; } = [];
|
||||
private IScheduler? _adHocScheduler;
|
||||
|
||||
public override async Task StartAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var everyTopOfTheHourTrigger = TriggerBuilder.Create()
|
||||
.WithIdentity("EveryTopOfTheHourTrigger")
|
||||
.StartNow()
|
||||
.WithCronSchedule("0 0 * * * ?")
|
||||
.Build();
|
||||
|
||||
Jobs = new List<Tuple<Type, ITrigger>>
|
||||
{
|
||||
new Tuple<Type, ITrigger>(typeof(AliveJob), everyTopOfTheHourTrigger)
|
||||
new(typeof(AliveJob), AliveJob.GetTrigger()),
|
||||
new(typeof(ReconcileAdditionalStorageJob), ReconcileAdditionalStorageJob.GetTrigger())
|
||||
};
|
||||
|
||||
await base.StartAsync(cancellationToken);
|
||||
@@ -33,5 +31,54 @@ public class JobsHostedService : BaseJobsHostedService
|
||||
{
|
||||
services.AddTransient<AliveJob>();
|
||||
services.AddTransient<SubscriptionCancellationJob>();
|
||||
services.AddTransient<ReconcileAdditionalStorageJob>();
|
||||
// add this service as a singleton so we can inject it where needed
|
||||
services.AddSingleton<JobsHostedService>();
|
||||
services.AddHostedService(sp => sp.GetRequiredService<JobsHostedService>());
|
||||
}
|
||||
|
||||
public async Task InterruptAdHocJobAsync<T>(CancellationToken cancellationToken = default) where T : class, IJob
|
||||
{
|
||||
if (_adHocScheduler == null)
|
||||
{
|
||||
throw new InvalidOperationException("AdHocScheduler is null, cannot interrupt ad-hoc job.");
|
||||
}
|
||||
|
||||
var jobKey = AdHocJobKeys.FirstOrDefault(j => j.Name == typeof(T).ToString());
|
||||
if (jobKey == null)
|
||||
{
|
||||
throw new NotFoundException($"Cannot find job key: {typeof(T)}, not running?");
|
||||
}
|
||||
logger.LogInformation("CANCELLING ad-hoc job with key: {JobKey}", jobKey);
|
||||
AdHocJobKeys.Remove(jobKey);
|
||||
await _adHocScheduler.Interrupt(jobKey, cancellationToken);
|
||||
}
|
||||
|
||||
public async Task RunJobAdHocAsync<T>(CancellationToken cancellationToken = default) where T : class, IJob
|
||||
{
|
||||
_adHocScheduler ??= await schedulerFactory.GetScheduler(cancellationToken);
|
||||
|
||||
var jobKey = new JobKey(typeof(T).ToString());
|
||||
|
||||
var currentlyExecuting = await _adHocScheduler.GetCurrentlyExecutingJobs(cancellationToken);
|
||||
if (currentlyExecuting.Any(j => j.JobDetail.Key.Equals(jobKey)))
|
||||
{
|
||||
throw new InvalidOperationException($"Job {jobKey} is already running");
|
||||
}
|
||||
|
||||
AdHocJobKeys.Add(jobKey);
|
||||
|
||||
var job = JobBuilder.Create<T>()
|
||||
.WithIdentity(jobKey)
|
||||
.Build();
|
||||
|
||||
var trigger = TriggerBuilder.Create()
|
||||
.WithIdentity(typeof(T).ToString())
|
||||
.StartNow()
|
||||
.Build();
|
||||
|
||||
logger.LogInformation("Scheduling ad-hoc job with key: {JobKey}", jobKey);
|
||||
|
||||
await _adHocScheduler.ScheduleJob(job, trigger, cancellationToken);
|
||||
}
|
||||
}
|
||||
|
||||
207
src/Billing/Jobs/ReconcileAdditionalStorageJob.cs
Normal file
207
src/Billing/Jobs/ReconcileAdditionalStorageJob.cs
Normal file
@@ -0,0 +1,207 @@
|
||||
using System.Globalization;
|
||||
using System.Text.Json;
|
||||
using Bit.Billing.Services;
|
||||
using Bit.Core;
|
||||
using Bit.Core.Billing.Constants;
|
||||
using Bit.Core.Jobs;
|
||||
using Bit.Core.Services;
|
||||
using Quartz;
|
||||
using Stripe;
|
||||
|
||||
namespace Bit.Billing.Jobs;
|
||||
|
||||
public class ReconcileAdditionalStorageJob(
|
||||
IStripeFacade stripeFacade,
|
||||
ILogger<ReconcileAdditionalStorageJob> logger,
|
||||
IFeatureService featureService) : BaseJob(logger)
|
||||
{
|
||||
private const string _storageGbMonthlyPriceId = "storage-gb-monthly";
|
||||
private const string _storageGbAnnuallyPriceId = "storage-gb-annually";
|
||||
private const string _personalStorageGbAnnuallyPriceId = "personal-storage-gb-annually";
|
||||
private const int _storageGbToRemove = 4;
|
||||
|
||||
protected override async Task ExecuteJobAsync(IJobExecutionContext context)
|
||||
{
|
||||
if (!featureService.IsEnabled(FeatureFlagKeys.PM28265_EnableReconcileAdditionalStorageJob))
|
||||
{
|
||||
logger.LogInformation("Skipping ReconcileAdditionalStorageJob, feature flag off.");
|
||||
return;
|
||||
}
|
||||
|
||||
var liveMode = featureService.IsEnabled(FeatureFlagKeys.PM28265_ReconcileAdditionalStorageJobEnableLiveMode);
|
||||
|
||||
// Execution tracking
|
||||
var subscriptionsFound = 0;
|
||||
var subscriptionsUpdated = 0;
|
||||
var subscriptionsWithErrors = 0;
|
||||
var failures = new List<string>();
|
||||
|
||||
logger.LogInformation("Starting ReconcileAdditionalStorageJob (live mode: {LiveMode})", liveMode);
|
||||
|
||||
var priceIds = new[] { _storageGbMonthlyPriceId, _storageGbAnnuallyPriceId, _personalStorageGbAnnuallyPriceId };
|
||||
|
||||
foreach (var priceId in priceIds)
|
||||
{
|
||||
var options = new SubscriptionListOptions
|
||||
{
|
||||
Limit = 100,
|
||||
Status = StripeConstants.SubscriptionStatus.Active,
|
||||
Price = priceId
|
||||
};
|
||||
|
||||
await foreach (var subscription in stripeFacade.ListSubscriptionsAutoPagingAsync(options))
|
||||
{
|
||||
if (context.CancellationToken.IsCancellationRequested)
|
||||
{
|
||||
logger.LogWarning(
|
||||
"Job cancelled!! Exiting. Progress at time of cancellation: Subscriptions found: {SubscriptionsFound}, " +
|
||||
"Updated: {SubscriptionsUpdated}, Errors: {SubscriptionsWithErrors}{Failures}",
|
||||
subscriptionsFound,
|
||||
liveMode
|
||||
? subscriptionsUpdated
|
||||
: $"(In live mode, would have updated) {subscriptionsUpdated}",
|
||||
subscriptionsWithErrors,
|
||||
failures.Count > 0
|
||||
? $", Failures: {Environment.NewLine}{string.Join(Environment.NewLine, failures)}"
|
||||
: string.Empty
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
if (subscription == null)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.LogInformation("Processing subscription: {SubscriptionId}", subscription.Id);
|
||||
subscriptionsFound++;
|
||||
|
||||
if (subscription.Metadata?.TryGetValue(StripeConstants.MetadataKeys.StorageReconciled2025, out var dateString) == true)
|
||||
{
|
||||
if (DateTime.TryParse(dateString, null, DateTimeStyles.RoundtripKind, out var dateProcessed))
|
||||
{
|
||||
logger.LogInformation("Skipping subscription {SubscriptionId} - already processed on {Date}",
|
||||
subscription.Id,
|
||||
dateProcessed.ToString("f"));
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
var updateOptions = BuildSubscriptionUpdateOptions(subscription, priceId);
|
||||
|
||||
if (updateOptions == null)
|
||||
{
|
||||
logger.LogInformation("Skipping subscription {SubscriptionId} - no updates needed", subscription.Id);
|
||||
continue;
|
||||
}
|
||||
|
||||
subscriptionsUpdated++;
|
||||
|
||||
if (!liveMode)
|
||||
{
|
||||
logger.LogInformation(
|
||||
"Not live mode (dry-run): Would have updated subscription {SubscriptionId} with item changes: {NewLine}{UpdateOptions}",
|
||||
subscription.Id,
|
||||
Environment.NewLine,
|
||||
JsonSerializer.Serialize(updateOptions));
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await stripeFacade.UpdateSubscription(subscription.Id, updateOptions);
|
||||
logger.LogInformation("Successfully updated subscription: {SubscriptionId}", subscription.Id);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
subscriptionsWithErrors++;
|
||||
failures.Add($"Subscription {subscription.Id}: {ex.Message}");
|
||||
logger.LogError(ex, "Failed to update subscription {SubscriptionId}: {ErrorMessage}",
|
||||
subscription.Id, ex.Message);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.LogInformation(
|
||||
"ReconcileAdditionalStorageJob completed. Subscriptions found: {SubscriptionsFound}, " +
|
||||
"Updated: {SubscriptionsUpdated}, Errors: {SubscriptionsWithErrors}{Failures}",
|
||||
subscriptionsFound,
|
||||
liveMode
|
||||
? subscriptionsUpdated
|
||||
: $"(In live mode, would have updated) {subscriptionsUpdated}",
|
||||
subscriptionsWithErrors,
|
||||
failures.Count > 0
|
||||
? $", Failures: {Environment.NewLine}{string.Join(Environment.NewLine, failures)}"
|
||||
: string.Empty
|
||||
);
|
||||
}
|
||||
|
||||
private SubscriptionUpdateOptions? BuildSubscriptionUpdateOptions(
|
||||
Subscription subscription,
|
||||
string targetPriceId)
|
||||
{
|
||||
if (subscription.Items?.Data == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
var updateOptions = new SubscriptionUpdateOptions
|
||||
{
|
||||
ProrationBehavior = StripeConstants.ProrationBehavior.CreateProrations,
|
||||
Metadata = new Dictionary<string, string>
|
||||
{
|
||||
[StripeConstants.MetadataKeys.StorageReconciled2025] = DateTime.UtcNow.ToString("o")
|
||||
},
|
||||
Items = []
|
||||
};
|
||||
|
||||
var hasUpdates = false;
|
||||
|
||||
foreach (var item in subscription.Items.Data.Where(item => item?.Price?.Id == targetPriceId))
|
||||
{
|
||||
hasUpdates = true;
|
||||
var currentQuantity = item.Quantity;
|
||||
|
||||
if (currentQuantity > _storageGbToRemove)
|
||||
{
|
||||
var newQuantity = currentQuantity - _storageGbToRemove;
|
||||
logger.LogInformation(
|
||||
"Subscription {SubscriptionId}: reducing quantity from {CurrentQuantity} to {NewQuantity} for price {PriceId}",
|
||||
subscription.Id,
|
||||
currentQuantity,
|
||||
newQuantity,
|
||||
item.Price.Id);
|
||||
|
||||
updateOptions.Items.Add(new SubscriptionItemOptions
|
||||
{
|
||||
Id = item.Id,
|
||||
Quantity = newQuantity
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
logger.LogInformation("Subscription {SubscriptionId}: deleting storage item with quantity {CurrentQuantity} for price {PriceId}",
|
||||
subscription.Id,
|
||||
currentQuantity,
|
||||
item.Price.Id);
|
||||
|
||||
updateOptions.Items.Add(new SubscriptionItemOptions
|
||||
{
|
||||
Id = item.Id,
|
||||
Deleted = true
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return hasUpdates ? updateOptions : null;
|
||||
}
|
||||
|
||||
public static ITrigger GetTrigger()
|
||||
{
|
||||
return TriggerBuilder.Create()
|
||||
.WithIdentity("EveryMorningTrigger")
|
||||
.StartNow()
|
||||
.WithCronSchedule("0 0 16 * * ?") // 10am CST daily; the pods execute in UTC time
|
||||
.Build();
|
||||
}
|
||||
}
|
||||
@@ -78,6 +78,11 @@ public interface IStripeFacade
|
||||
RequestOptions requestOptions = null,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
IAsyncEnumerable<Subscription> ListSubscriptionsAutoPagingAsync(
|
||||
SubscriptionListOptions options = null,
|
||||
RequestOptions requestOptions = null,
|
||||
CancellationToken cancellationToken = default);
|
||||
|
||||
Task<Subscription> GetSubscription(
|
||||
string subscriptionId,
|
||||
SubscriptionGetOptions subscriptionGetOptions = null,
|
||||
|
||||
@@ -98,6 +98,12 @@ public class StripeFacade : IStripeFacade
|
||||
CancellationToken cancellationToken = default) =>
|
||||
await _subscriptionService.ListAsync(options, requestOptions, cancellationToken);
|
||||
|
||||
public IAsyncEnumerable<Subscription> ListSubscriptionsAutoPagingAsync(
|
||||
SubscriptionListOptions options = null,
|
||||
RequestOptions requestOptions = null,
|
||||
CancellationToken cancellationToken = default) =>
|
||||
_subscriptionService.ListAutoPagingAsync(options, requestOptions, cancellationToken);
|
||||
|
||||
public async Task<Subscription> GetSubscription(
|
||||
string subscriptionId,
|
||||
SubscriptionGetOptions subscriptionGetOptions = null,
|
||||
|
||||
@@ -65,6 +65,7 @@ public static class StripeConstants
|
||||
public const string Region = "region";
|
||||
public const string RetiredBraintreeCustomerId = "btCustomerId_old";
|
||||
public const string UserId = "userId";
|
||||
public const string StorageReconciled2025 = "storage_reconciled_2025";
|
||||
}
|
||||
|
||||
public static class PaymentBehavior
|
||||
|
||||
@@ -197,6 +197,8 @@ public static class FeatureFlagKeys
|
||||
public const string PM26793_FetchPremiumPriceFromPricingService = "pm-26793-fetch-premium-price-from-pricing-service";
|
||||
public const string PM23341_Milestone_2 = "pm-23341-milestone-2";
|
||||
public const string PM26462_Milestone_3 = "pm-26462-milestone-3";
|
||||
public const string PM28265_EnableReconcileAdditionalStorageJob = "pm-28265-enable-reconcile-additional-storage-job";
|
||||
public const string PM28265_ReconcileAdditionalStorageJobEnableLiveMode = "pm-28265-reconcile-additional-storage-job-enable-live-mode";
|
||||
|
||||
/* Key Management Team */
|
||||
public const string ReturnErrorOnExistingKeypair = "return-error-on-existing-keypair";
|
||||
|
||||
24
src/Core/Utilities/RequireLowerEnvironmentAttribute.cs
Normal file
24
src/Core/Utilities/RequireLowerEnvironmentAttribute.cs
Normal file
@@ -0,0 +1,24 @@
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
using Microsoft.AspNetCore.Mvc;
|
||||
using Microsoft.AspNetCore.Mvc.Filters;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
|
||||
namespace Bit.Core.Utilities;
|
||||
|
||||
/// <summary>
|
||||
/// Authorization attribute that restricts controller/action access to Development and QA environments only.
|
||||
/// Returns 404 Not Found in all other environments.
|
||||
/// </summary>
|
||||
public class RequireLowerEnvironmentAttribute() : TypeFilterAttribute(typeof(LowerEnvironmentFilter))
|
||||
{
|
||||
private class LowerEnvironmentFilter(IWebHostEnvironment environment) : IAuthorizationFilter
|
||||
{
|
||||
public void OnAuthorization(AuthorizationFilterContext context)
|
||||
{
|
||||
if (!environment.IsDevelopment() && !environment.IsEnvironment("QA"))
|
||||
{
|
||||
context.Result = new NotFoundResult();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user