-
Notifications
You must be signed in to change notification settings - Fork 671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NOISSUE - Refactor messaging #1141
Conversation
http/api/transport.go
Outdated
Protocol: protocol, | ||
Channel: chanID, | ||
Subtopic: subtopic, | ||
Payload: payload, | ||
Created: created, | ||
Occured: time.Now().UnixNano(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this revert?
Codecov Report
@@ Coverage Diff @@
## master #1141 +/- ##
==========================================
- Coverage 75.33% 75.32% -0.02%
==========================================
Files 101 101
Lines 6853 6857 +4
==========================================
+ Hits 5163 5165 +2
- Misses 1333 1334 +1
- Partials 357 358 +1
Continue to review full report at Codecov.
|
message.proto
Outdated
@@ -12,5 +11,5 @@ message Message { | |||
string publisher = 3; | |||
string protocol = 4; | |||
bytes payload = 5; | |||
google.protobuf.Timestamp created = 6; | |||
int64 occured = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a comment - that it represents nano
coap/api/transport.go
Outdated
Channel: chanID, | ||
Subtopic: subtopic, | ||
Publisher: publisher, | ||
Protocol: protocol, | ||
Payload: msg.Payload, | ||
Created: created, | ||
Occured: time.Now().UnixNano(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this Occured
... Created
is very usual in these occasions, but it is true that payload could have been created before (on device). However, Creted
here relates to the Mainflux Message{}
entity.
What is the opinion of other @mainflux/maintainers?
cmd/cassandra-writer/main.go
Outdated
@@ -69,19 +70,21 @@ func main() { | |||
log.Fatalf(err.Error()) | |||
} | |||
|
|||
b, err := broker.New(cfg.natsURL) | |||
nc, err := nats.Connect(cfg.natsURL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main question is - do we want this really? That for every new broker we put this connecting logic in every adapter? Why not solving once and for all in a shared package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't change a lot of even if we do that. If we move connection logic to, for example, pubsub/nats
method NewPub(..)
, we'll need to:
- return an error alongside with the implementation
- enrich returned interface with
Close
method, so that we can call defer(ifc.Close()) - check returned error
if err != nil ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be hard to have a generic full wrapper? I would prefer that then having a partial wrapping for every broker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we can do that. It won't help too much, but it will mask the underlying broker specifics (nats.Connect).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting there... Starts to look really good!
http/api/transport.go
Outdated
Protocol: protocol, | ||
Channel: chanID, | ||
Subtopic: subtopic, | ||
Payload: payload, | ||
Created: created, | ||
Occurred: occurred, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer Created
. Please change.
message.proto
Outdated
bytes payload = 5; | ||
// Timestamp the Message occured in the system. | ||
// By the default, occured represents Unix nanoseconds timestamp. | ||
int64 occurred = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will not accept timestamp change in this PR. And it should be created
.
messaging/message.proto
Outdated
@@ -12,5 +12,5 @@ message Message { | |||
string publisher = 3; | |||
string protocol = 4; | |||
bytes payload = 5; | |||
google.protobuf.Timestamp created = 6; | |||
google.protobuf.Timestamp occurred = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created
cmd/coap/main.go
Outdated
thingsapi "github.com/mainflux/mainflux/things/api/auth/grpc" | ||
broker "github.com/nats-io/nats.go" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would remove the alias here
cmd/twins/main.go
Outdated
return | ||
} | ||
|
||
err := ps.Subscribe("channels.>", func(msg messaging.Message) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep here SubjectAllChannels
?
cmd/cassandra-writer/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would use another alias. messaging
or messengerNats
cmd/coap/main.go
Outdated
"github.com/mainflux/mainflux/coap" | ||
"github.com/mainflux/mainflux/coap/api" | ||
logger "github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/http/main.go
Outdated
adapter "github.com/mainflux/mainflux/http" | ||
"github.com/mainflux/mainflux/http/api" | ||
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/influxdb-writer/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/mongodb-writer/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/mqtt/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
"github.com/mainflux/mainflux/messaging" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed to messaging
, but here I need both messaging
and messaging/nats
. Same for Twins
service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not then calling it nats.NewPublisher
- it will be clearer from the code that this is a specific NATS-based implementation of our interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH, actually the best would be probably something like messagingNats.NewPublisher
, to distinguish from ordinary NATS package and not to be confused with functions from NATS driver.
I can accept pusub
alias as well, just a proposition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed named import by addressing this one: https://github.com/mainflux/mainflux/pull/1141#discussion_r415905298.
cmd/opcua/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/postgres-writer/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
cmd/twins/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
"github.com/mainflux/mainflux/messaging" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neatpicking ;)
coap/api/transport.go
Outdated
@@ -225,21 +225,21 @@ func receive(svc coap.Service, msg *gocoap.Message) *gocoap.Message { | |||
return res | |||
} | |||
|
|||
created, err := ptypes.TimestampProto(time.Now()) | |||
occured, err := ptypes.TimestampProto(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer calling this var timestamp
or just t
. Mixing "occured" and "created" is confusing. Here you are just taking a simple timestamp recording, so t
would fit nicely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed back to created
. We call it like that in all the other services.
Nitpicking :) |
This branch needs update as well |
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
cmd/mqtt/main.go
Outdated
"github.com/mainflux/mainflux/logger" | ||
"github.com/mainflux/mainflux/messaging" | ||
pubsub "github.com/mainflux/mainflux/messaging/nats" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not then calling it nats.NewPublisher
- it will be clearer from the code that this is a specific NATS-based implementation of our interface.
Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin <[email protected]>
messaging/nats/pubsub.go
Outdated
return | ||
} | ||
if err := h(msg); err != nil { | ||
ps.logger.Warn(fmt.Sprintf("Failed handle Mainflux message: %s", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Failed to handle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great now. Approved!
Signed-off-by: Dušan Borovčanin <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Merged! Great PR, thanks a lot @dusanb94! |
* Refactor messaging Signed-off-by: Dušan Borovčanin <[email protected]> * Rename SubscribeHandler to MessageHandler Signed-off-by: Dušan Borovčanin <[email protected]> * Remove `Auth` event logs Signed-off-by: Dušan Borovčanin <[email protected]> * Update message pubsub APi Signed-off-by: Dušan Borovčanin <[email protected]> * Fix topics handling Signed-off-by: Dušan Borovčanin <[email protected]> * Update CoAP adapter Signed-off-by: Dušan Borovčanin <[email protected]> * Update Twins service Signed-off-by: Dušan Borovčanin <[email protected]> * Update LoRa adapter Signed-off-by: Dušan Borovčanin <[email protected]> * Update OPC UA adapter Signed-off-by: Dušan Borovčanin <[email protected]> * Remove broker package Package `broker` is conceptually renamed to package `nats`. Signed-off-by: Dušan Borovčanin <[email protected]> * Update makefile Signed-off-by: Dušan Borovčanin <[email protected]> * Add comment explanation Signed-off-by: Dušan Borovčanin <[email protected]> * Fix MQTT adapter Signed-off-by: Dušan Borovčanin <[email protected]> * Fix typo Signed-off-by: Dušan Borovčanin <[email protected]> * Move NATS pub/sub implementation to pubsub pkg Signed-off-by: Dušan Borovčanin <[email protected]> * Remove an empty line in main methods Signed-off-by: Dušan Borovčanin <[email protected]> * Move messaging-related code to messaging package Signed-off-by: Dušan Borovčanin <[email protected]> * Fix Twins mocks Signed-off-by: Dušan Borovčanin <[email protected]> * Change Occurred back to Created Signed-off-by: Dušan Borovčanin <[email protected]> * Fix tranformer test Signed-off-by: Dušan Borovčanin <[email protected]> * Fix message proto commands Signed-off-by: Dušan Borovčanin <[email protected]> * Replace string literal with constant Signed-off-by: Dušan Borovčanin <[email protected]> * Remove alias from main method Signed-off-by: Dušan Borovčanin <[email protected]> * Change messaging pubsub alias Signed-off-by: Dušan Borovčanin <[email protected]> * Rename occured to created Signed-off-by: Dušan Borovčanin <[email protected]> * Handle NATS connection in the NATS PubSub Signed-off-by: Dušan Borovčanin <[email protected]> * Rename n to pub/pubSub Signed-off-by: Dušan Borovčanin <[email protected]> * Fix typos Signed-off-by: Dušan Borovčanin <[email protected]>
Signed-off-by: Dušan Borovčanin [email protected]
What does this do?
This pull request reorganizes message publishers, subscribers, and the Mainflux Message structure.
Which issue(s) does this PR fix/relate to?
There is no such issue.
List any changes that modify/break current functionality
Writers
Start
method and protocol adapters factory methods signatures are changed.Have you included tests for your changes?
Yes, tests are updated according to code updates.
Did you document any new/modified functionality?
No external API changed, so there was no need to update docs.