Skip to content

Commit 0f5339f

Browse files
committed
Added RetryFlow sample and retry-option overloads to Flow base-class.
1 parent bb8f773 commit 0f5339f

File tree

6 files changed

+104
-14
lines changed

6 files changed

+104
-14
lines changed

Cleipnir.Flows/Flow.cs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,48 @@ public abstract class BaseFlow
1414
public Utilities Utilities => Workflow.Utilities;
1515
public Messages Messages => Workflow.Messages;
1616
public Effect Effect => Workflow.Effect;
17-
17+
18+
#region Capture explicit id with ResiliencyLevel
19+
1820
public Task<T> Capture<T>(string id, Func<Task<T>> work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
1921
=> Effect.Capture(id, work, resiliencyLevel);
2022
public Task<T> Capture<T>(string id, Func<T> work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
2123
=> Effect.Capture(id, work, resiliencyLevel);
2224
public Task Capture(string id, Func<Task> work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
2325
=> Effect.Capture(id, work, resiliencyLevel);
2426
public Task Capture(string id, Action work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
25-
=> Effect.Capture(id, work, resiliencyLevel);
27+
=> Effect.Capture(id, work, resiliencyLevel);
28+
29+
#endregion
30+
31+
#region Capture explicit id with RetryPolicy
32+
33+
public Task<T> Capture<T>(string id, Func<Task<T>> work, RetryPolicy retryPolicy, bool flush = true)
34+
=> Effect.Capture(id, work, retryPolicy, flush);
35+
public Task<T> Capture<T>(string id, Func<T> work, RetryPolicy retryPolicy, bool flush = true)
36+
=> Effect.Capture(id, work, retryPolicy, flush);
37+
public Task Capture(string id, Func<Task> work, RetryPolicy retryPolicy, bool flush = true)
38+
=> Effect.Capture(id, work, retryPolicy, flush);
39+
public Task Capture(string id, Action work, RetryPolicy retryPolicy, bool flush = true)
40+
=> Effect.Capture(id, work, retryPolicy, flush);
41+
42+
#endregion
43+
44+
#region Capture implicit id with RetryPolicy
45+
46+
public Task<T> Capture<T>(Func<Task<T>> work, RetryPolicy retryPolicy, bool flush = true)
47+
=> Effect.Capture(work, retryPolicy, flush);
48+
public Task<T> Capture<T>(Func<T> work, RetryPolicy retryPolicy, bool flush = true)
49+
=> Effect.Capture(work, retryPolicy, flush);
50+
public Task Capture(Func<Task> work, RetryPolicy retryPolicy, bool flush = true)
51+
=> Effect.Capture(work, retryPolicy, flush);
52+
public Task Capture(Action work, RetryPolicy retryPolicy, bool flush = true)
53+
=> Effect.Capture(work, retryPolicy, flush);
54+
55+
#endregion
2656

57+
#region Capture implicit id with ResiliencyLevel
58+
2759
public Task<T> Capture<T>(Func<Task<T>> work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
2860
=> Effect.Capture(work, resiliencyLevel);
2961
public Task<T> Capture<T>(Func<T> work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
@@ -32,6 +64,8 @@ public Task Capture(Func<Task> work, ResiliencyLevel resiliencyLevel = Resilienc
3264
=> Effect.Capture(work, resiliencyLevel);
3365
public Task Capture(Action work, ResiliencyLevel resiliencyLevel = ResiliencyLevel.AtLeastOnce)
3466
=> Effect.Capture(work, resiliencyLevel);
67+
68+
#endregion
3569

3670
public Task<TMessage> Message<TMessage>() => Workflow.Messages.FirstOfType<TMessage>();
3771
public Task<Option<TMessage>> Message<TMessage>(string timeoutId, DateTime timesOutAt) => Workflow

Cleipnir.Flows/Options.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,17 @@ public class Options
3131
/// Configuration options for Cleipnir
3232
/// </summary>
3333
/// <param name="unhandledExceptionHandler">Callback handler for unhandled flow exceptions</param>
34-
/// <param name="retentionPeriod">Period to keep completed flows before deletion</param>
35-
/// <param name="retentionCleanUpFrequency">Retention clean-up check frequency</param>
36-
/// <param name="leaseLength">Flow lease-length. Leases are automatically renewed</param>
37-
/// <param name="enableWatchdogs">Enable background crashed, interrupted and postponed flow scheduling</param>
38-
/// <param name="watchdogCheckFrequency">Check frequency for eligible crashed, interrupted and postponed flows</param>
39-
/// <param name="messagesPullFrequency">Pull frequency for active/max-waiting messages</param>
40-
/// <param name="messagesDefaultMaxWaitForCompletion">Default wait duration before suspension for messages</param>
41-
/// <param name="delayStartup">Delay watchdog start-up</param>
42-
/// <param name="maxParallelRetryInvocations">Limit the number of watchdog started invocations</param>
43-
/// <param name="serializer">Specify custom serializer</param>
44-
/// <param name="utcNow">Provide custom delegate for providing current utc datetime</param>
34+
/// <param name="retentionPeriod">Period to keep completed flows before deletion. Default infinite.</param>
35+
/// <param name="retentionCleanUpFrequency">Retention clean-up check frequency. Default 1 hour when retention period is not infinite.</param>
36+
/// <param name="leaseLength">Flow lease-length. Leases are automatically renewed. Default 60 seconds.</param>
37+
/// <param name="enableWatchdogs">Enable background crashed, interrupted and postponed flow scheduling. Default true.</param>
38+
/// <param name="watchdogCheckFrequency">Check frequency for eligible crashed, interrupted and postponed flows. Default 1 second.</param>
39+
/// <param name="messagesPullFrequency">Pull frequency for active/max-waiting messages. Default: 250ms</param>
40+
/// <param name="messagesDefaultMaxWaitForCompletion">Default wait duration before suspension for messages. Defaults to none.</param>
41+
/// <param name="delayStartup">Delay watchdog start-up. Defaults to none.</param>
42+
/// <param name="maxParallelRetryInvocations">Limit the number of watchdog started invocations. Default: 100.</param>
43+
/// <param name="serializer">Specify custom serializer. Default built-in json-serializer.</param>
44+
/// <param name="utcNow">Provide custom delegate for providing current utc datetime. Default: () => DateTime.UtcNow</param>
4545
public Options(
4646
Action<FrameworkException>? unhandledExceptionHandler = null,
4747
TimeSpan? retentionPeriod = null,

Samples/Cleipnir.Flows.Samples.Console/Cleipnir.Flows.Sample.ConsoleApp.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
</PropertyGroup>
99

1010
<ItemGroup>
11+
<ProjectReference Include="..\..\Cleipnir.ResilientFunctions\Stores\PostgreSQL\Cleipnir.ResilientFunctions.PostgreSQL\Cleipnir.ResilientFunctions.PostgreSQL.csproj" />
1112
<ProjectReference Include="..\..\SourceGeneration\Cleipnir.Flows.SourceGenerator\Cleipnir.Flows.SourceGenerator.csproj" OutputItemType="Analyzer" ReferenceOutputAssembly="false" />
1213
<ProjectReference Include="..\..\Cleipnir.Flows\Cleipnir.Flows.csproj" />
1314
</ItemGroup>

Samples/Cleipnir.Flows.Samples.Console/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,6 @@ public static class Program
44
{
55
private static async Task Main(string[] args)
66
{
7-
await Middleware.Example.Do();
7+
await Retry.Example.Do();
88
}
99
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using Cleipnir.ResilientFunctions.PostgreSQL;
2+
using Microsoft.Extensions.DependencyInjection;
3+
4+
namespace Cleipnir.Flows.Sample.ConsoleApp.Retry;
5+
6+
public static class Example
7+
{
8+
public static async Task Do()
9+
{
10+
const string connStr = "Server=localhost;Database=retryflows;User Id=postgres;Password=Pa55word!; Include Error Detail=true;";
11+
await DatabaseHelper.RecreateDatabase(connStr);
12+
var store = new PostgreSqlFunctionStore(connStr);
13+
await store.Initialize();
14+
15+
var serviceCollection = new ServiceCollection();
16+
serviceCollection.AddTransient<RetryFlows>();
17+
serviceCollection.AddTransient<RetryFlow>();
18+
19+
var flowsContainer = new FlowsContainer(
20+
store,
21+
serviceCollection.BuildServiceProvider(),
22+
new Options(unhandledExceptionHandler: Console.WriteLine)
23+
);
24+
25+
var flows = new RetryFlows(flowsContainer);
26+
var flowId = "MK-54321";
27+
await flows.Schedule(flowId);
28+
29+
Console.ReadLine();
30+
}
31+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using Cleipnir.ResilientFunctions.Domain;
2+
3+
namespace Cleipnir.Flows.Sample.ConsoleApp.Retry;
4+
5+
[GenerateFlows]
6+
public class RetryFlow : Flow
7+
{
8+
private readonly RetryPolicy _retryPolicy = RetryPolicy.CreateConstantDelay(interval: TimeSpan.FromSeconds(1), suspendThreshold: TimeSpan.Zero);
9+
10+
public override async Task Run()
11+
{
12+
await Capture(async () =>
13+
{
14+
var i = await Effect.CreateOrGet("i", 0);
15+
while (true)
16+
{
17+
Console.WriteLine($"Retrying: {i}");
18+
i++;
19+
await Effect.Upsert("i", i);
20+
throw new TimeoutException();
21+
}
22+
}, _retryPolicy);
23+
}
24+
}

0 commit comments

Comments
 (0)