Skip to content

Commit adc205c

Browse files
committed
Created dedicated timeout for task
1 parent 43f4204 commit adc205c

File tree

2 files changed

+13
-3
lines changed

2 files changed

+13
-3
lines changed

src/main/java/tutorial/buildon/aws/streaming/kafka/MyFirstKafkaConnectorConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ public MyFirstKafkaConnectorConfig(final Map<?, ?> originalProps) {
2727
private static final String SECOND_NONREQUIRED_PARAM_DOC = "This is the 2nd non-required parameter";
2828
private static final String SECOND_NONREQUIRED_PARAM_DEFAULT = "bar";
2929

30+
public static final String TASK_SLEEP_TIMEOUT_CONFIG = "task.sleep.timeout";
31+
private static final String TASK_SLEEP_TIMEOUT_DOC = "Sleep timeout used by tasks during each poll";
32+
private static final int TASK_SLEEP_TIMEOUT_DEFAULT = 5000;
33+
3034
public static final String MONITOR_THREAD_TIMEOUT_CONFIG = "monitor.thread.timeout";
3135
private static final String MONITOR_THREAD_TIMEOUT_DOC = "Timeout used by the monitoring thread";
3236
private static final int MONITOR_THREAD_TIMEOUT_DEFAULT = 10000;
@@ -62,6 +66,12 @@ private static void addParams(final ConfigDef configDef) {
6266
SECOND_NONREQUIRED_PARAM_DEFAULT,
6367
Importance.HIGH,
6468
SECOND_NONREQUIRED_PARAM_DOC)
69+
.define(
70+
TASK_SLEEP_TIMEOUT_CONFIG,
71+
Type.INT,
72+
TASK_SLEEP_TIMEOUT_DEFAULT,
73+
Importance.HIGH,
74+
TASK_SLEEP_TIMEOUT_DOC)
6575
.define(
6676
MONITOR_THREAD_TIMEOUT_CONFIG,
6777
Type.INT,

src/main/java/tutorial/buildon/aws/streaming/kafka/MyFirstKafkaConnectorTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class MyFirstKafkaConnectorTask extends SourceTask {
2727
private final Logger log = LoggerFactory.getLogger(MyFirstKafkaConnectorTask.class);
2828

2929
private MyFirstKafkaConnectorConfig config;
30-
private int monitorThreadTimeout;
30+
private int taskSleepTimeout;
3131
private List<String> sources;
3232
private Schema recordSchema;
3333

@@ -39,7 +39,7 @@ public String version() {
3939
@Override
4040
public void start(Map<String, String> properties) {
4141
config = new MyFirstKafkaConnectorConfig(properties);
42-
monitorThreadTimeout = config.getInt(MONITOR_THREAD_TIMEOUT_CONFIG);
42+
taskSleepTimeout = config.getInt(TASK_SLEEP_TIMEOUT_CONFIG);
4343
String sourcesStr = properties.get("sources");
4444
sources = Arrays.asList(sourcesStr.split(","));
4545
recordSchema = SchemaBuilder.struct()
@@ -51,7 +51,7 @@ public void start(Map<String, String> properties) {
5151

5252
@Override
5353
public List<SourceRecord> poll() throws InterruptedException {
54-
Thread.sleep(monitorThreadTimeout / 2);
54+
Thread.sleep(taskSleepTimeout);
5555
List<SourceRecord> records = new ArrayList<>();
5656
for (String source : sources) {
5757
log.info("Polling data from the source '" + source + "'");

0 commit comments

Comments
 (0)