以多种方式发送数据,具体取决于您希望如何发送 [英] Send data in multiple ways depending on how you want to send it

查看:72
本文介绍了以多种方式发送数据,具体取决于您希望如何发送的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆要通过将它们包装在一个字节数组中发送到消息队列的键和值.我将所有必须小于50K的键和值组成一个字节数组,然后发送到我们的消息传递队列.

I have bunch of keys and values that I want to send to our messaging queue by packing them in one byte array. I will make one byte array of all the keys and values which should always be less than 50K and then send to our messaging queue.

数据包类:

public final class Packet implements Closeable {
  private static final int MAX_SIZE = 50000;
  private static final int HEADER_SIZE = 36;

  private final byte dataCenter;
  private final byte recordVersion;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;
  private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
  private int pendingItems = 0;

  public Packet(final RecordPartition recordPartition) {
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    final long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  }

  private void addHeader(final ByteBuffer buffer, final int items) {
    buffer.put(dataCenter).put(recordVersion).putInt(items).putInt(buffer.capacity())
        .putLong(address).putLong(addressFrom).putLong(addressOrigin).put(recordsPartition)
        .put(replicated);
  }

  private void sendData() {
    if (itemBuffer.position() == 0) {
      // no data to be sent
      return;
    }
    final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
    addHeader(buffer, pendingItems);
    buffer.put(itemBuffer);
    SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueAsync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array());
    // SendRecord.getInstance().sendToQueueSync(address, buffer.array(), socket);
    itemBuffer.clear();
    pendingItems = 0;
  }

  public void addAndSendJunked(final byte[] key, final byte[] data) {
    if (key.length > 255) {
      return;
    }
    final byte keyLength = (byte) key.length;
    final byte dataLength = (byte) data.length;

    final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
    final int newSize = itemBuffer.position() + additionalSize;
    if (newSize >= (MAX_SIZE - HEADER_SIZE)) {
      sendData();
    }
    if (additionalSize > (MAX_SIZE - HEADER_SIZE)) {
      throw new AppConfigurationException("Size of single item exceeds maximum size");
    }

    final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
    final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
    // data layout
    itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength)
        .put(data);
    pendingItems++;
  }

  @Override
  public void close() {
    if (pendingItems > 0) {
      sendData();
    }
  }
}

下面是我发送数据的方式.到目前为止,我的设计仅允许通过调用上述sendData()方法中的sendToQueueAsync方法异步发送数据.

Below is the way I am sending data. As of now my design only permits to send data asynchronously by calling sendToQueueAsync method in above sendData() method.

  private void validateAndSend(final RecordPartition partition) {
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    final Packet packet = new Packet(partition);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) {
      packet.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8),
          dataHolder.getProcessBytes());
    }
    packet.close();
  }

现在,我需要扩展设计,以便可以三种不同的方式发送数据.用户可以自行决定要以哪种方式发送数据(同步"或异步").

Now I need to extend my design so that I can send data in three different ways. It is up to user to decide which way he wants to send data, either "sync" or "async".

  • 我需要通过调用sender.sendToQueueAsync方法异步发送数据.
  • 或者我需要通过调用sender.sendToQueueSync方法同步发送数据.
  • 或者我需要通过调用sender.sendToQueueSync方法在一个特定的套接字上同步发送数据.在这种情况下,我需要以某种方式传递socket变量,以便sendData知道此变量.
  • I need to send data asynchronously by calling sender.sendToQueueAsync method.
  • or I need to send data synchronously by calling sender.sendToQueueSync method.
  • or I need to send data synchronously but on a particular socket by calling sender.sendToQueueSync method. In this case I need to pass socket variable somehow so that sendData knows about this variable.

SendRecord类:

public class SendRecord {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
  private final Cache<Long, PendingMessage> cache = CacheBuilder.newBuilder().maximumSize(1000000)
      .concurrencyLevel(100).build();

  private static class Holder {
    private static final SendRecord INSTANCE = new SendRecord();
  }

  public static SendRecord getInstance() {
    return Holder.INSTANCE;
  }

  private SendRecord() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        handleRetry();
      }
    }, 0, 1, TimeUnit.SECONDS);
  }

  private void handleRetry() {
    List<PendingMessage> messages = new ArrayList<>(cache.asMap().values());
    for (PendingMessage message : messages) {
      if (message.hasExpired()) {
        if (message.shouldRetry()) {
          message.markResent();
          doSendAsync(message);
        } else {
          cache.invalidate(message.getAddress());
        }
      }
    }
  }

  // called by multiple threads concurrently
  public boolean sendToQueueAsync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, true);
    cache.put(address, m);
    return doSendAsync(m);
  }

  // called by above method and also by handleRetry method
  private boolean doSendAsync(final PendingMessage pendingMessage) {
    Optional<SocketHolder> liveSocket = SocketManager.getInstance().getNextSocket();
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(liveSocket.get().getSocket());
    } finally {
      msg.destroy();
    }
  }

  // called by send method below
  private boolean doSendAsync(final PendingMessage pendingMessage, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(pendingMessage.getEncodedRecords());
    try {
      // this returns instantly
      return msg.send(socket);
    } finally {
      msg.destroy();
    }
  }

  // called by multiple threads to send data synchronously without passing socket
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  // called by a threads to send data synchronously but with socket as the parameter
  public boolean sendToQueueSync(final long address, final byte[] encodedRecords, final Socket socket) {
    PendingMessage m = new PendingMessage(address, encodedRecords, false);
    cache.put(address, m);
    try {
      if (doSendAsync(m, socket)) {
        return m.waitForAck();
      }
      return false;
    } finally {
      cache.invalidate(address);
    }
  }

  public void handleAckReceived(final long address) {
    PendingMessage record = cache.getIfPresent(address);
    if (record != null) {
      record.ackReceived();
      cache.invalidate(address);
    }
  }
}

呼叫者仅会调用以下三种方法之一:

Callers will only call either of below three methods:

    通过传递两个参数
  • sendToQueueAsync
  • 通过传递两个参数
  • sendToQueueSync
  • 通过传递三个参数
  • sendToQueueSync
  • sendToQueueAsync by passing two parameters
  • sendToQueueSync by passing two parameters
  • sendToQueueSync by passing three parameters

我应该如何设计我的PacketSendRecord类,以便可以告诉Packet类该数据需要以上述三种方式之一发送到我的消息传递队列.由用户决定他想以哪种方式将数据发送到消息传递队列.到目前为止,我的Packet类的结构方式,只能以一种方式发送数据.

How should I design my Packet and SendRecord class so that I can tell Packet class that this data needs to be send in either of above three ways to my messaging queue. It is up to user to decide which way he wants to send data to messaging queue. As of now the way my Packet class is structured, it can send data only in one way.

推荐答案

我认为您最好的选择是策略模式(

I think your best option is the Strategy pattern (https://en.wikipedia.org/wiki/Strategy_pattern).

使用此模式,您可以封装每种类型的发送"的行为,例如,AsynchronousSend类,SynchronousSend类和AsynchronousSocketSend类. (您可能会想出更好的名字).然后,Packet类可以基于某种逻辑来决定使用哪个类将数据发送到队列.

Using this pattern, you can encapsulate the behaviour of each type of "send", for example, an AsynchronousSend class, a SynchronousSend class and an AsynchronousSocketSend class. (You could probably come up with better names). The Packet class can then decide, based on some logic, which class to use to send the data to the queue.

这篇关于以多种方式发送数据,具体取决于您希望如何发送的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆