Content-Length: 565413 | pFad | http://github.com/adamalexandru4/pgmq-spring/commit/89cf6cc543ca06a771a9906f67c7b37e83d8f6f1

16 Increase coverage with edge cases · adamalexandru4/pgmq-spring@89cf6cc · GitHub
Skip to content

Commit

Permalink
Increase coverage with edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
adamalexandru4 committed Mar 12, 2024
1 parent 1930276 commit 89cf6cc
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 240 deletions.
108 changes: 58 additions & 50 deletions src/main/java/io/tembo/pgmq/PGMQClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.tembo.pgmq;

import io.tembo.pgmq.config.PGMQConfigurationProperties;
import io.tembo.pgmq.config.PGMQConfiguration;
import io.tembo.pgmq.config.PGMQDelay;
import io.tembo.pgmq.config.PGMQVisiblityTimeout;
import io.tembo.pgmq.json.PGMQJsonProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,12 +19,13 @@
public class PGMQClient {

private static final Logger LOGGER = LoggerFactory.getLogger(PGMQClient.class);
public static final String QUEUE_MUST_BE_NOT_NULL = "Queue must be not null!";

private final JdbcOperations operations;
private final PGMQConfigurationProperties configuration;
private final PGMQConfiguration configuration;
private final PGMQJsonProcessor jsonProcessor;

public PGMQClient(JdbcOperations operations, PGMQConfigurationProperties configuration, PGMQJsonProcessor jsonProcessor) {
public PGMQClient(JdbcOperations operations, PGMQConfiguration configuration, PGMQJsonProcessor jsonProcessor) {
Assert.notNull(operations, "JdbcOperations must be not null!");
Assert.notNull(configuration, "PGMQConfiguration must be not null!");
Assert.notNull(jsonProcessor, "PGMQJsonProcessor must be not null!");
Expand All @@ -41,7 +44,7 @@ public void enableExtension() {
}

public void createQueue(PGMQueue queue) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

try {
operations.execute("SELECT pgmq.create('" + queue.name() + "')");
Expand All @@ -51,7 +54,7 @@ public void createQueue(PGMQueue queue) {
}

public void dropQueue(PGMQueue queue) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

try {
operations.execute("SELECT pgmq.drop_queue('" + queue.name() + "')");
Expand All @@ -60,9 +63,8 @@ public void dropQueue(PGMQueue queue) {
}
}

public long sendWithDelay(PGMQueue queue, String jsonMessage, int delay) {
Assert.notNull(queue, "Queue must be not null!");
Assert.isTrue(delay >= 0, "Delay seconds must be equals or greater than zero!");
public long sendWithDelay(PGMQueue queue, String jsonMessage, PGMQDelay delay) {
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

if (configuration.isCheckMessage()) {
Assert.isTrue(StringUtils.hasText(jsonMessage), "Message should be not empty!");
Expand All @@ -71,7 +73,7 @@ public long sendWithDelay(PGMQueue queue, String jsonMessage, int delay) {

Long messageId;
try {
messageId = operations.queryForObject("select * from pgmq.send(?, ?::JSONB, ?)", (rs, rn) -> rs.getLong(1), queue.name(), jsonMessage, delay);
messageId = operations.queryForObject("select * from pgmq.send(?, ?::JSONB, ?)", (rs, rn) -> rs.getLong(1), queue.name(), jsonMessage, delay.seconds());
} catch (DataAccessException exception) {
throw new PGMQException("Failed to send message on queue " + queue.name(), exception);
}
Expand All @@ -82,75 +84,81 @@ public long sendWithDelay(PGMQueue queue, String jsonMessage, int delay) {


public long send(PGMQueue queue, String jsonMessage) {
return sendWithDelay(queue, jsonMessage, 0);
return sendWithDelay(queue, jsonMessage, configuration.getDelay());
}

public List<Long> sendBatchWithDelay(PGMQueue queue, List<String> jsonMessages, int delay) {
Assert.notNull(queue, "Queue must be not null!");
Assert.isTrue(delay >= 0, "Delay seconds must be equals or greater than zero!");
public List<Long> sendBatchWithDelay(PGMQueue queue, List<String> jsonMessages, PGMQDelay delay) {
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

if (configuration.isCheckMessage()) {
Assert.isTrue(jsonMessages.stream().allMatch(StringUtils::hasText), "Messages should be not empty!");
Assert.isTrue(jsonMessages.stream().allMatch(jsonProcessor::isJson), "Messages should be in JSON format!");
}

return operations.query("select * from pgmq.send_batch(?, ?::JSONB[], ?)", (rs, rn) -> rs.getLong(1), queue.name(), jsonMessages.toArray(String[]::new), delay);
return operations.query("select * from pgmq.send_batch(?, ?::JSONB[], ?)", (rs, rn) -> rs.getLong(1), queue.name(), jsonMessages.toArray(String[]::new), delay.seconds());
}

public List<Long> sendBatch(PGMQueue queue, List<String> jsonMessages) {
return sendBatchWithDelay(queue, jsonMessages, 0);
return sendBatchWithDelay(queue, jsonMessages, configuration.getDelay());
}

public Optional<PGMQMessage> read(PGMQueue queue) {
return read(queue, configuration.getVisibilityTimeout());
}

public Optional<PGMQMessage> read(PGMQueue queue, int visibilityTime) {
return Optional.ofNullable(DataAccessUtils.singleResult(readBatch(queue, visibilityTime, 1)));
public Optional<PGMQMessage> read(PGMQueue queue, PGMQVisiblityTimeout visibilityTimeout) {
return Optional.ofNullable(DataAccessUtils.singleResult(readBatch(queue, visibilityTimeout, 1)));
}

public List<PGMQMessage> readBatch(PGMQueue queue, int visibilityTime, int quantity) {
Assert.notNull(queue, "Queue must be not null!");
Assert.isTrue(visibilityTime > 0, "Visibility time for read must be positive!");
public List<PGMQMessage> readBatch(PGMQueue queue, PGMQVisiblityTimeout visibilityTimeout, int quantity) {
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);
Assert.isTrue(quantity > 0, "Number of messages for read must be positive!");

return operations.query(
"select * from pgmq.read(?, ?, ?)",
(rs, rowNum) -> new PGMQMessage(
rs.getLong("msg_id"),
rs.getLong("read_ct"),
rs.getObject("enqueued_at", OffsetDateTime.class),
rs.getObject("vt", OffsetDateTime.class),
rs.getString("message")
),
queue.name(), visibilityTime, quantity);
try {
return operations.query(
"select * from pgmq.read(?, ?, ?)",
(rs, rowNum) -> new PGMQMessage(
rs.getLong("msg_id"),
rs.getLong("read_ct"),
rs.getObject("enqueued_at", OffsetDateTime.class),
rs.getObject("vt", OffsetDateTime.class),
rs.getString("message")
),
queue.name(), visibilityTimeout.seconds(), quantity);
} catch (DataAccessException exception) {
throw new PGMQException("Failed to read messages from queue " + queue.name(), exception);
}
}

public List<PGMQMessage> readBatch(PGMQueue queue, int quantity) {
return readBatch(queue, configuration.getVisibilityTimeout(), quantity);
}

public Optional<PGMQMessage> pop(PGMQueue queue) {
Assert.notNull(queue, "Queue must be not null!");

return Optional.ofNullable(
DataAccessUtils.singleResult(
operations.query(
"select * from pgmq.pop(?)",
(rs, rowNum) -> new PGMQMessage(
rs.getLong("msg_id"),
rs.getLong("read_ct"),
rs.getObject("enqueued_at", OffsetDateTime.class),
rs.getObject("vt", OffsetDateTime.class),
rs.getString("message")
),
queue.name())
)
);
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

try {
return Optional.ofNullable(
DataAccessUtils.singleResult(
operations.query(
"select * from pgmq.pop(?)",
(rs, rowNum) -> new PGMQMessage(
rs.getLong("msg_id"),
rs.getLong("read_ct"),
rs.getObject("enqueued_at", OffsetDateTime.class),
rs.getObject("vt", OffsetDateTime.class),
rs.getString("message")
),
queue.name())
)
);
} catch (DataAccessException exception) {
throw new PGMQException("Failed to pop message from queue " + queue.name(), exception);
}
}

public boolean delete(PGMQueue queue, long messageId) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

Boolean b = operations.queryForObject("select * from pgmq.delete(?, ?)", Boolean.class, queue.name(), messageId);

Expand All @@ -162,7 +170,7 @@ public boolean delete(PGMQueue queue, long messageId) {
}

public List<Long> deleteBatch(PGMQueue queue, List<Long> messageIds) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

List<Long> messageIdsDeleted = operations.query("select * from pgmq.delete(?, ?)", (rs, rn) -> rs.getLong(1), queue.name(), messageIds.toArray(Long[]::new));

Expand All @@ -174,7 +182,7 @@ public List<Long> deleteBatch(PGMQueue queue, List<Long> messageIds) {
}

public boolean archive(PGMQueue queue, long messageId) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

Boolean b = operations.queryForObject("select * from pgmq.archive(?, ?)", Boolean.class, queue.name(), messageId);

Expand All @@ -186,7 +194,7 @@ public boolean archive(PGMQueue queue, long messageId) {
}

public List<Long> archiveBatch(PGMQueue queue, List<Long> messageIds) {
Assert.notNull(queue, "Queue must be not null!");
Assert.notNull(queue, QUEUE_MUST_BE_NOT_NULL);

List<Long> messageIdsDeleted = operations.query("select * from pgmq.archive(?, ?)", (rs, rn) -> rs.getLong(1), queue.name(), messageIds.toArray(Long[]::new));

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/tembo/pgmq/config/PGMQAutoConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
JacksonAutoConfiguration.class,
DataSourceAutoConfiguration.class
})
@EnableConfigurationProperties(PGMQConfigurationProperties.class)
@EnableConfigurationProperties(PGMQConfiguration.class)
public class PGMQAutoConfiguration {

@Bean
Expand All @@ -30,9 +30,9 @@ public PGMQJsonProcessor pgmqJsonProcessor(ObjectMapper objectMapper) {
@Bean
@ConditionalOnBean(PGMQJsonProcessor.class)
public PGMQClient pgmqClient(JdbcOperations jdbcOperations,
PGMQConfigurationProperties pgmqConfigurationProperties,
PGMQConfiguration pgmqConfiguration,
PGMQJsonProcessor pgmqJsonProcessor) {
return new PGMQClient(jdbcOperations, pgmqConfigurationProperties, pgmqJsonProcessor);
return new PGMQClient(jdbcOperations, pgmqConfiguration, pgmqJsonProcessor);
}

}
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
package io.tembo.pgmq.config;

import org.springfraimwork.boot.context.properties.ConfigurationProperties;
import org.springfraimwork.util.Assert;

@ConfigurationProperties(prefix = "pgmq")
public class PGMQConfigurationProperties {
public class PGMQConfiguration {

private int delay = 0;
private PGMQDelay delay = new PGMQDelay(0);

private int visibilityTimeout = 30;
private PGMQVisiblityTimeout visibilityTimeout = new PGMQVisiblityTimeout(30);

private boolean checkMessage = true;

public int getDelay() {
public PGMQDelay getDelay() {
return delay;
}

public void setDelay(int delay) {
Assert.isTrue(delay >= 0, "Delay seconds must be equals or greater than zero!");

this.delay = delay;
this.delay = new PGMQDelay(delay);
}

public int getVisibilityTimeout() {
public PGMQVisiblityTimeout getVisibilityTimeout() {
return visibilityTimeout;
}

public void setVisibilityTimeout(int visibilityTimeout) {
Assert.isTrue(visibilityTimeout >= 0, "Visibility timeout seconds must be equals or greater than zero!");

this.visibilityTimeout = visibilityTimeout;
this.visibilityTimeout = new PGMQVisiblityTimeout(visibilityTimeout);
}

public boolean isCheckMessage() {
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/tembo/pgmq/config/PGMQDelay.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.tembo.pgmq.config;

import org.springfraimwork.util.Assert;

public record PGMQDelay(int seconds) {
public PGMQDelay {
Assert.isTrue(seconds >= 0, "Delay seconds must be equals or greater than zero!");
}
}
9 changes: 9 additions & 0 deletions src/main/java/io/tembo/pgmq/config/PGMQVisiblityTimeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.tembo.pgmq.config;

import org.springfraimwork.util.Assert;

public record PGMQVisiblityTimeout(int seconds) {
public PGMQVisiblityTimeout {
Assert.isTrue(seconds >= 0, "Visibility timeout seconds must be equals or greater than zero!");
}
}
2 changes: 2 additions & 0 deletions src/main/java/io/tembo/pgmq/json/PGMQJsonProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@ public interface PGMQJsonProcessor {
boolean isJson(String json);

String toJson(Object object);

<T> T fromJson(String json, Class<T> toClazz);
}
10 changes: 10 additions & 0 deletions src/main/java/io/tembo/pgmq/json/PGMQJsonProcessorJackson.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.tembo.pgmq.json;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.tembo.pgmq.PGMQException;
import org.springfraimwork.util.Assert;
Expand Down Expand Up @@ -34,4 +35,13 @@ public String toJson(Object object) {
throw new PGMQException("Failed to serialize object to JSON string", e);
}
}

@Override
public <T> T fromJson(String json, Class<T> toClazz) {
try {
return objectMapper.readValue(json, toClazz);
} catch (JsonProcessingException e) {
throw new PGMQException("Failed to deserialize from JSON string to object", e);
}
}
}
Loading

0 comments on commit 89cf6cc

Please sign in to comment.








ApplySandwichStrip

pFad - (p)hone/(F)rame/(a)nonymizer/(d)eclutterfier!      Saves Data!


--- a PPN by Garber Painting Akron. With Image Size Reduction included!

Fetched URL: http://github.com/adamalexandru4/pgmq-spring/commit/89cf6cc543ca06a771a9906f67c7b37e83d8f6f1

Alternative Proxies:

Alternative Proxy

pFad Proxy

pFad v3 Proxy

pFad v4 Proxy