Skip to content

Commit

Permalink
feat(voyager): refactor message to be generic over contained data (#1085
Browse files Browse the repository at this point in the history
)

this is in preparation for implementing block polling in the queue.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
	- Added a search functionality to the app.
- **Refactor**
- Removed traits, macro definitions, and commented out type aliases and
associated trait implementations in `data.rs`.
- Altered declarations of exported entities, reorganized imports,
updated function signatures, and refactored event handling logic in
`event.rs`.
- Modified `additionalSrcFilter` in `voyager.nix` to affect the logic
for filtering additional source files.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
benluelo authored Jan 3, 2024
2 parents 14b941d + 0947110 commit 2808249
Show file tree
Hide file tree
Showing 25 changed files with 1,848 additions and 1,972 deletions.
4 changes: 4 additions & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,10 @@
nativeBuildInputs = [ config.treefmt.build.wrapper ]
++ lib.attrsets.attrValues config.treefmt.build.programs;
GOPRIVATE = "github.com/unionlabs/*";

shellHook = ''
alias voy-send-msg='curl localhost:65534/msg -H "content-type: application/json" -d'
'';
};

treefmt =
Expand Down
2 changes: 1 addition & 1 deletion lib/chain-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<T: Clone> Pool<T> {
const RETRY_SECONDS: u64 = 3;

tracing::warn!(
"high traffic in queue of {}, ran out of items! trying again in {RETRY_SECONDS} seconds",
"high traffic in pool of {}, ran out of items! trying again in {RETRY_SECONDS} seconds",
std::any::type_name::<T>()
);

Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 9 additions & 15 deletions lib/pg-queue/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,25 +125,19 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
.await?;
tx.commit().await?;
}
ProcessFlow::Success(new_msgs) => {
if !new_msgs.is_empty() {
let new_ids = query!(
ProcessFlow::Success(maybe_new_msg) => {
if let Some(new_msg) = maybe_new_msg {
let new_row = query!(
"INSERT INTO queue (item)
SELECT * FROM UNNEST($1::JSONB[])
VALUES ($1::JSONB)
RETURNING id",
&*new_msgs
.into_iter()
.map(|t| serde_json::to_value(t).expect(
"queue message should have infallible serialization"
))
.collect::<Vec<_>>(),
serde_json::to_value(new_msg)
.expect("queue message should have infallible serialization")
)
.fetch_all(tx.as_mut())
.fetch_one(tx.as_mut())
.await?;

for row in new_ids {
tracing::debug!(id = row.id, "inserted new messages");
}
tracing::debug!(id = new_row.id, "inserted new message");
}

tx.commit().await?;
Expand All @@ -164,7 +158,7 @@ impl<T: DeserializeOwned + Serialize + Unpin + Send + Sync> Queue<T> {
}

pub enum ProcessFlow<T> {
Success(Vec<T>),
Success(Option<T>),
Requeue,
Fail(String),
}
6 changes: 3 additions & 3 deletions tools/sqlx-cli/sqlx-cli.nix
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
version = "0.7.1";
nativeBuildInputs = [ pkgs.pkg-config ];

buildPhase = "cargo build --release --locked --offline -p ${name}";
buildPhase = "cargo build --release --locked --offline -p ${name} --bin cargo-sqlx";
installPhase = ''
mkdir -p $out/bin
mv target/release/sqlx $out/bin/sqlx
mv target/release/cargo-sqlx $out/bin/cargo-sqlx
'';

buildInputs = [ rust.toolchains.nightly pkgs.openssl ];

src = srcWithVendoredSources { inherit name; originalSrc = "${sqlx}"; };

meta.mainProgram = "sqlx";
meta.mainProgram = "cargo-sqlx";
};
};
}
3 changes: 1 addition & 2 deletions voyager/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![recursion_limit = "256"]
#![feature(trait_alias, extract_if)]
#![feature(trait_alias)]
// #![warn(clippy::pedantic)]
#![allow(
// required due to return_position_impl_trait_in_trait false positives
Expand Down
Loading

0 comments on commit 2808249

Please sign in to comment.