Skip to content

[improve]: Support multiple nodes to access Amoro Rest service in a high availability environment #3567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,21 @@ public static void main(String[] args) {
new Thread(
() -> {
LOG.info("AMS service is shutting down...");
service.dispose();
service.disposeAllService();
LOG.info("AMS service has been shut down");
}));
service.startNoHighAvailableService();
while (true) {
try {
service.waitLeaderShip();
service.startService();
service.startOptimizingService();
service.waitFollowerShip();
// become follower, dispose optimizingService stop
service.stopOptimizingService();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why didn't you put this in service.dispose? or you may change service.dispose in finally block to service.stopOptimizingService ???

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dispose will be close all service (include rest) .if this node become follower,we just close optimizingService,and will be run waitting become Leader service.waitLeaderShip()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the finally block will dispose all services. so you should use stopOptimizingService to repalce dsipose in finally block ?

} catch (Exception e) {
LOG.error("AMS start error", e);
} finally {
service.dispose();
service.disposeAllService();
}
}
} catch (Throwable t) {
Expand All @@ -143,6 +146,14 @@ public static void main(String[] args) {
}
}

private void stopOptimizingService() {
if (optimizingService != null) {
LOG.info("AMS Become follower and Stopping optimizing service...");
optimizingService.dispose();
optimizingService = null;
}
}

public void waitLeaderShip() throws Exception {
haContainer.waitLeaderShip();
}
Expand All @@ -151,14 +162,21 @@ public void waitFollowerShip() throws Exception {
haContainer.waitFollowerShip();
}

public void startService() throws Exception {
public void startNoHighAvailableService() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();

catalogManager = new DefaultCatalogManager(serviceConfig);
tableManager = new DefaultTableManager(serviceConfig, catalogManager);
optimizerManager = new DefaultOptimizerManager(serviceConfig, catalogManager);
terminalManager = new TerminalManager(serviceConfig, catalogManager);

initHttpService();
startHttpService();
registerAmsServiceMetric();
}

public void startOptimizingService() throws Exception {
tableService = new DefaultTableService(serviceConfig, catalogManager);

optimizingService =
Expand All @@ -180,14 +198,9 @@ public void startService() throws Exception {
tableService.initialize();
LOG.info("AMS table service have been initialized");
tableManager.setTableService(tableService);
terminalManager = new TerminalManager(serviceConfig, catalogManager);

initThriftService();
startThriftService();

initHttpService();
startHttpService();
registerAmsServiceMetric();
}

private void addHandlerChain(RuntimeHandlerChain chain) {
Expand All @@ -196,7 +209,7 @@ private void addHandlerChain(RuntimeHandlerChain chain) {
}
}

public void dispose() {
public void disposeAllService() {
if (tableManagementServer != null && tableManagementServer.isServing()) {
LOG.info("Stopping table management server...");
tableManagementServer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void stop() throws IOException {

stopOptimizer();
if (this.serviceContainer != null) {
this.serviceContainer.dispose();
this.serviceContainer.disposeAllService();
}
testHMS.stop();
MoreFiles.deleteRecursively(Paths.get(rootPath), RecursiveDeleteOption.ALLOW_INSECURE);
Expand Down Expand Up @@ -349,7 +349,8 @@ private void startAms() throws Exception {
serviceConfig.set(
AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL,
Duration.ofMillis(1000L));
serviceContainer.startService();
serviceContainer.startNoHighAvailableService();
serviceContainer.startOptimizingService();
break;
} catch (TTransportException e) {
if (e.getCause() instanceof BindException) {
Expand Down