Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NOISSUE - Add health check for MQTT broker #1305

Merged
merged 16 commits into from
Jan 5, 2021

Conversation

blokovi
Copy link
Contributor

@blokovi blokovi commented Dec 15, 2020

Signed-off-by: Ivan Milosevic [email protected]

What does this do?

If health-check path is set, mqtt adapter will wait that path is up and returning 200 status code. For VerneMQ health-check path is /health so it's default in docker composition where VerneMQ is used.

Which issue(s) does this PR fix/relate to?

Related to issue in mProxy repo

@blokovi blokovi requested a review from a team as a code owner December 15, 2020 12:56
@codecov-io
Copy link

codecov-io commented Dec 15, 2020

Codecov Report

Merging #1305 (844b760) into master (e326494) will decrease coverage by 0.02%.
The diff coverage is n/a.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #1305      +/-   ##
==========================================
- Coverage   59.81%   59.79%   -0.03%     
==========================================
  Files         113      113              
  Lines        8797     8797              
==========================================
- Hits         5262     5260       -2     
- Misses       3078     3080       +2     
  Partials      457      457              
Impacted Files Coverage Δ
things/service.go 53.40% <0.00%> (-1.14%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update e326494...844b760. Read the comment docs.

drasko
drasko previously approved these changes Dec 16, 2020
Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@blokovi blokovi marked this pull request as draft December 16, 2020 15:44
@blokovi blokovi force-pushed the mqtthealthcheck branch 2 times, most recently from 44b8271 to adf5550 Compare December 16, 2020 19:29
@blokovi blokovi marked this pull request as ready for review December 16, 2020 19:29
Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, very nice

cmd/mqtt/main.go Outdated
@@ -122,6 +126,30 @@ func main() {
log.Fatalf(err.Error())
}

if cfg.mqttTargetHealthCheck != "" {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be in the go routine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, good candidate for goroutine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But wait - this is done only on init, and mProxy is blocked here until broker is up. However, I agree that we can continue to check health constantly in the background and restart mProxy if VerneMQ is down.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look this approach seems interesting.

The mentioned go-sundheit library allows you to schedule an async check in a go routine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restarting mProxy when VerneMQ is down will be done by kubernetes. There is PR in devops repo that defines liveness probe for mProxy container with checking health checker on VerneMQ. If it's not healthy, both containers (VerneMQ and mProxy) will be restarted by kubernetes.
It would be redundant and probably conflicting to have same same job done here in code and by orchestration tool.

cmd/mqtt/main.go Outdated
@@ -122,6 +126,30 @@ func main() {
log.Fatalf(err.Error())
}

if cfg.mqttTargetHealthCheck != "" {
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, good candidate for goroutine.

dborovcanin
dborovcanin previously approved these changes Dec 17, 2020
mteodor
mteodor previously approved these changes Dec 18, 2020
@blokovi blokovi dismissed stale reviews from mteodor and dborovcanin via 19bae66 December 18, 2020 11:21
mteodor
mteodor previously approved these changes Dec 18, 2020
@@ -47,7 +47,7 @@ EXPOSE 1883 8883 8080 44053 4369 8888 \

VOLUME ["/vernemq/log", "/vernemq/data", "/vernemq/etc"]

HEALTHCHECK CMD vernemq ping | grep -q pong
HEALTHCHECK CMD wget --no-verbose --tries=1 --spider http://localhost:8888/health || exit 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use curl here instead of wget?

Copy link
Contributor Author

@blokovi blokovi Dec 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternative with curl could be:
HEALTHCHECK CMD [[ "$(curl -o /dev/null -s -w "%{http_code}\n" http://localhost:8888/health)" == "200" ]]

The command’s exit status indicates the health status of the container. The possible values are:
0: success - the container is healthy and ready for use
1: unhealthy - the container is not working correctly

That's why this command compare if return http code with curl is "200"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer curl command. It looks more complicated on a first look, but it explains better what it does.

But I would prefer even more following @jpetazzo's instructions with curl -f.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

cmd/mqtt/main.go Outdated
defMQTTTargetHost = "0.0.0.0"
defMQTTTargetPort = "1883"
defMQTTForwarderTimeout = "30s" // 30 seconds
healthCheckSleep = 1 * time.Second
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK not to have this configurable, but I'd extract it to a separate constant (const mqttHealthCheckSleep = time.Second).

cmd/mqtt/main.go Outdated
}
logger.Info(fmt.Sprintf("Broker not ready, status code: %d, body: %s", res.StatusCode, body))
}
time.Sleep(healthCheckSleep)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a backoff to prevent this loop from running indefinitely:

if i == maxHealthCheckRetries {
    logger.Error("MQTT healthcheck limit exceeded, exiting.")
    os.Exit(1)
}
time.Sleep(i * mqttHealthcheckSleep)

This can come in handy in the case of running service natively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this whole construct, it looks clunky.

Can you please use https://github.com/cenkalti/backoff and solve this with Retry() function.

Then if Retry() succeeds you can try reading the body, and if this fails just restart (that means broker is ready, but there were some more serious error. And this usually should not happen).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply extracting to a separate function and calling it here will help.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like that we try with https://github.com/cenkalti/backoff in order to prepare terrain for the eventual changing of other services.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to cenkalti/backoff

cmd/mqtt/main.go Outdated
for i := 1; i <= maxHealthCheckRetries; i++ {
res, err := http.Get(cfg.mqttTargetHealthCheck)
if err != nil {
logger.Info(fmt.Sprintf("Broker not ready: %s ", err.Error()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we do continue here?

Copy link
Contributor Author

@blokovi blokovi Dec 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, without continue rest of the loop body is skipped anyway, except sleep on the end.
With continue next iteration would be right away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is exactly what I meant - that the rest of the operations in this loop body should be skipped. Because if you can not do HTTP GET, do you really need to check the result? And do you really need to read HTTP body (remember that GET failed)? And then do you need to close the Body (remember, GET failed, and you did not read anything)?

cmd/mqtt/main.go Outdated
}
logger.Info(fmt.Sprintf("Broker not ready, status code: %d, body: %s", res.StatusCode, body))
}
time.Sleep(healthCheckSleep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this whole construct, it looks clunky.

Can you please use https://github.com/cenkalti/backoff and solve this with Retry() function.

Then if Retry() succeeds you can try reading the body, and if this fails just restart (that means broker is ready, but there were some more serious error. And this usually should not happen).

cmd/mqtt/main.go Outdated
for i := 1; i <= maxHealthCheckRetries; i++ {
res, err := http.Get(cfg.mqttTargetHealthCheck)
if err != nil {
logger.Info(fmt.Sprintf("Broker not ready: %s ", err.Error()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is exactly what I meant - that the rest of the operations in this loop body should be skipped. Because if you can not do HTTP GET, do you really need to check the result? And do you really need to read HTTP body (remember that GET failed)? And then do you need to close the Body (remember, GET failed, and you did not read anything)?

cmd/mqtt/main.go Outdated
}
logger.Info(fmt.Sprintf("Broker not ready, status code: %d, body: %s", res.StatusCode, body))
}
if i == maxHealthCheckRetries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be probably moved to the top of the loop.

Copy link
Contributor Author

@blokovi blokovi Dec 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is exactly what I meant - that the rest of the operations in this loop body should be skipped. Because if you can not do HTTP GET, do you really need to check the result? And do you really need to read HTTP body (remember that GET failed)? And then do you need to close the Body (remember, GET failed, and you did not read anything)?

Rest of code is already skipped - except sleep.
All operation you mention here is in if res != nil where res is result of http.Get. If that fails (and that is normal because adapter will be up before broker) then sleep and retry again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be probably moved to the top of the loop.

Moving it to the top would do unnecessary sleep in situation when success is in firts iteration.

Copy link
Contributor

@drasko drasko Dec 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question was - why are you even checking res if you already had an err in http.Get(cfg.mqttTargetHealthCheck)? It's like hoping that res will somehow be valid, although there was an error in the GET function itself.

cmd/mqtt/main.go Outdated
}
logger.Info(fmt.Sprintf("Broker not ready, status code: %d, body: %s", res.StatusCode, body))
}
time.Sleep(healthCheckSleep)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like that we try with https://github.com/cenkalti/backoff in order to prepare terrain for the eventual changing of other services.

cmd/mqtt/main.go Outdated
@@ -122,6 +130,35 @@ func main() {
log.Fatalf(err.Error())
}

if cfg.mqttTargetHealthCheck != "" {
for i := 1; i <= maxHealthCheckRetries; i++ {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are already checking the condition below, you can ignore it here for i:=1; ;i++ {...}. Also, please consider increasing the sleep period in each iteration by multiplying it by i. In that case, maxHealthCheckRetries should probably be less than 10 (5?) to prevent long waits. This block can be extracted to a separate function so that we don't pollute the main.

cmd/mqtt/main.go Outdated
@@ -122,6 +130,35 @@ func main() {
log.Fatalf(err.Error())
}

if cfg.mqttTargetHealthCheck != "" {
for i := 1; i <= maxHealthCheckRetries; i++ {
res, err := http.Get(cfg.mqttTargetHealthCheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New res object is created on every iteration, so use defer to close it's body. Defers will be intelligent enough to know to which object they apply at the end of the function.

blokovi added 12 commits January 4, 2021 15:53
Signed-off-by: Ivan Milosevic <[email protected]>
use constants

Signed-off-by: Ivan Milosevic <[email protected]>
new version of verne image

Signed-off-by: Ivan Milosevic <[email protected]>
Signed-off-by: Ivan Milosevic <[email protected]>
Signed-off-by: Ivan Milosevic <[email protected]>
Signed-off-by: Ivan Milosevic <[email protected]>
Signed-off-by: Ivan Milosevic <[email protected]>
cmd/mqtt/main.go Outdated

// An operation that may fail.
operation := func() error {
res, err := http.Get(cfg.mqttTargetHealthCheck)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting this function outside of main and just use it in backoff.RetryNotify call.

Signed-off-by: Ivan Milosevic <[email protected]>
@blokovi
Copy link
Contributor Author

blokovi commented Jan 5, 2021

bunch of files are removed from vendor directory after
go mod vendor

Signed-off-by: Ivan Milosevic <[email protected]>
dborovcanin
dborovcanin previously approved these changes Jan 5, 2021
Signed-off-by: Ivan Milosevic <[email protected]>
Copy link
Contributor

@drasko drasko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -47,7 +47,7 @@ EXPOSE 1883 8883 8080 44053 4369 8888 \

VOLUME ["/vernemq/log", "/vernemq/data", "/vernemq/etc"]

HEALTHCHECK CMD vernemq ping | grep -q pong
HEALTHCHECK CMD curl -s -f http://localhost:8888/health || false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a comment here: if we changed VerneMQ Dockerfile, we'll need to build and upload new image. I am not sure of the current CI covers the VerneMQ - if not, please add it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants