WSTransceiver.java
// Copyright (c) ZeroC, Inc.
package com.zeroc.Ice;
import java.nio.ByteOrder;
import java.nio.channels.SelectableChannel;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Random;
final class WSTransceiver implements Transceiver {
@Override
public SelectableChannel fd() {
return _delegate.fd();
}
@Override
public void setReadyCallback(ReadyCallback callback) {
_readyCallback = callback;
_delegate.setReadyCallback(callback);
}
@Override
public int initialize(Buffer readBuffer, Buffer writeBuffer) {
//
// Delegate logs exceptions that occur during initialize(), so there's no need to trap them
// here.
//
if (_state == StateInitializeDelegate) {
int op = _delegate.initialize(readBuffer, writeBuffer);
if (op != 0) {
return op;
}
_state = StateConnected;
}
try {
if (_state == StateConnected) {
//
// We don't know how much we'll need to read.
//
_readBuffer.resize(1024, true);
_readBuffer.position(0);
_readBufferPos = 0;
//
// The server waits for the client's upgrade request, the
// client sends the upgrade request.
//
_state = StateUpgradeRequestPending;
if (!_incoming) {
//
// Compose the upgrade request.
//
StringBuffer out = new StringBuffer();
out.append("GET " + _resource + " HTTP/1.1\r\n");
out.append("Host: " + _host);
out.append("\r\n");
out.append("Upgrade: websocket\r\n");
out.append("Connection: Upgrade\r\n");
out.append("Sec-WebSocket-Protocol: " + _iceProtocol + "\r\n");
out.append("Sec-WebSocket-Version: 13\r\n");
out.append("Sec-WebSocket-Key: ");
//
// The value for Sec-WebSocket-Key is a 16-byte random number, encoded with
// Base64.
//
byte[] key = new byte[16];
_rand.nextBytes(key);
_key = Base64.encode(key);
out.append(_key + "\r\n\r\n"); // EOM
_writeBuffer.resize(out.length(), false);
_writeBuffer.position(0);
_writeBuffer.b.put(out.toString().getBytes(_ascii));
_writeBuffer.flip();
}
}
//
// Try to write the client's upgrade request.
//
if (_state == StateUpgradeRequestPending && !_incoming) {
if (_writeBuffer.b.hasRemaining()) {
int s = _delegate.write(_writeBuffer);
if (s != 0) {
return s;
}
}
assert (!_writeBuffer.b.hasRemaining());
_state = StateUpgradeResponsePending;
if (_instance.traceLevel() >= 1) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"sent "
+ protocol()
+ " connection HTTP upgrade request\n"
+ toString());
}
}
while (true) {
if (_readBuffer.b.hasRemaining()) {
int s = _delegate.read(_readBuffer);
if (s == SocketOperation.Write || _readBuffer.b.position() == 0) {
return s;
}
}
//
// Try to read the client's upgrade request or the server's response.
//
if ((_state == StateUpgradeRequestPending && _incoming)
|| (_state == StateUpgradeResponsePending && !_incoming)) {
//
// Check if we have enough data for a complete message.
//
int p = _parser.isCompleteMessage(_readBuffer.b, 0, _readBuffer.b.position());
if (p == -1) {
if (_readBuffer.b.hasRemaining()) {
return SocketOperation.Read;
}
//
// Enlarge the buffer and try to read more.
//
final int oldSize = _readBuffer.b.position();
if (oldSize + 1024 > _instance.messageSizeMax()) {
Ex.throwMemoryLimitException(
oldSize + 1024, _instance.messageSizeMax());
}
_readBuffer.resize(oldSize + 1024, true);
_readBuffer.position(oldSize);
continue; // Try again to read the response/request
}
//
// Set _readBufferPos at the end of the response/request message.
//
_readBufferPos = p;
}
//
// We're done, the client's upgrade request or server's response is read.
//
break;
}
try {
//
// Parse the client's upgrade request.
//
if (_state == StateUpgradeRequestPending && _incoming) {
if (_parser.parse(_readBuffer.b, 0, _readBufferPos)) {
handleRequest(_writeBuffer);
_state = StateUpgradeResponsePending;
} else {
throw new ProtocolException("incomplete request message");
}
}
if (_state == StateUpgradeResponsePending) {
if (_incoming) {
if (_writeBuffer.b.hasRemaining()) {
int s = _delegate.write(_writeBuffer);
if (s != 0) {
return s;
}
}
} else {
//
// Parse the server's response
//
if (_parser.parse(_readBuffer.b, 0, _readBufferPos)) {
handleResponse();
} else {
throw new ProtocolException("incomplete response message");
}
}
}
} catch (WebSocketException ex) {
throw new ProtocolException(ex.getMessage(), ex);
}
_state = StateOpened;
_nextState = StateOpened;
if (_readBufferPos < _readBuffer.b.position()) {
_readyCallback.ready(SocketOperation.Read, true);
}
} catch (LocalException ex) {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
protocol()
+ " connection HTTP upgrade request failed\n"
+ toString()
+ "\n"
+ ex);
}
throw ex;
}
if (_instance.traceLevel() >= 1) {
if (_incoming) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"accepted "
+ protocol()
+ " connection HTTP upgrade request\n"
+ toString());
} else {
_instance
.logger()
.trace(
_instance.traceCategory(),
protocol()
+ " connection HTTP upgrade request accepted\n"
+ toString());
}
}
return SocketOperation.None;
}
@Override
public int closing(boolean initiator, LocalException reason) {
if (_instance.traceLevel() >= 1) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"gracefully closing " + protocol() + " connection\n" + toString());
}
int s = _nextState == StateOpened ? _state : _nextState;
if (s == StateClosingRequestPending && _closingInitiator) {
//
// If we initiated a close connection but also received a
// close connection, we assume we didn't initiated the
// connection and we send the close frame now. This is to
// ensure that if both peers close the connection at the same
// time we don't hang having both peer waiting for the close
// frame of the other.
//
assert (!initiator);
_closingInitiator = false;
return SocketOperation.Write;
} else if (s >= StateClosingRequestPending) {
return SocketOperation.None;
}
_closingInitiator = initiator;
if (reason instanceof CloseConnectionException) {
_closingReason = CLOSURE_NORMAL;
} else if (reason instanceof ObjectAdapterDeactivatedException
|| reason instanceof ObjectAdapterDestroyedException
|| reason instanceof CommunicatorDestroyedException) {
_closingReason = CLOSURE_SHUTDOWN;
} else if (reason instanceof ProtocolException) {
_closingReason = CLOSURE_PROTOCOL_ERROR;
}
if (_state == StateOpened) {
_state = StateClosingRequestPending;
return initiator ? SocketOperation.Read : SocketOperation.Write;
} else {
_nextState = StateClosingRequestPending;
return SocketOperation.None;
}
}
@Override
public void close() {
_delegate.close();
_state = StateClosed;
//
// Clear the buffers now instead of waiting for destruction.
//
_writeBuffer.clear();
_readBuffer.clear();
}
@Override
public EndpointI bind() {
assert false;
return null;
}
@Override
public int write(Buffer buf) {
if (_state < StateOpened) {
if (_state < StateConnected) {
return _delegate.write(buf);
} else {
return _delegate.write(_writeBuffer);
}
}
int s = SocketOperation.None;
do {
if (preWrite(buf)) {
if (_writeState == WriteStateFlush) {
//
// Invoke write() even though there's nothing to write.
//
assert (!buf.b.hasRemaining());
s = _delegate.write(buf);
}
if (s == SocketOperation.None && _writeBuffer.b.hasRemaining()) {
s = _delegate.write(_writeBuffer);
} else if (s == SocketOperation.None
&& _incoming
&& !buf.empty()
&& _writeState == WriteStatePayload) {
s = _delegate.write(buf);
}
}
} while (postWrite(buf, s));
if (s != SocketOperation.None) {
return s;
}
if (_state == StateClosingResponsePending && !_closingInitiator) {
return SocketOperation.Read;
}
return SocketOperation.None;
}
@Override
public int read(Buffer buf) {
if (_state < StateOpened) {
if (_state < StateConnected) {
return _delegate.read(buf);
} else {
if (_delegate.read(_readBuffer) == SocketOperation.Write) {
return SocketOperation.Write;
} else {
return SocketOperation.None;
}
}
}
if (!buf.b.hasRemaining()) {
if (_readBufferPos < _readBuffer.b.position()) {
_readyCallback.ready(SocketOperation.Read, true);
}
return SocketOperation.None;
}
int s = SocketOperation.None;
do {
if (preRead(buf)) {
if (_readState == ReadStatePayload) {
//
// If the payload length is smaller than what remains to be read, we read
// no more than the payload length. The remaining of the buffer will be
// sent over in another frame.
//
int readSz = _readPayloadLength - (buf.b.position() - _readStart);
if (buf.b.remaining() > readSz) {
int size = buf.size();
buf.resize(buf.b.position() + readSz, true);
s = _delegate.read(buf);
buf.resize(size, true);
} else {
s = _delegate.read(buf);
}
} else {
s = _delegate.read(_readBuffer);
}
if (s == SocketOperation.Write) {
postRead(buf);
return s;
}
}
} while (postRead(buf));
if (!buf.b.hasRemaining()) {
if (_readBufferPos < _readBuffer.b.position()) {
_readyCallback.ready(SocketOperation.Read, true);
}
s = SocketOperation.None;
} else {
_readyCallback.ready(SocketOperation.Read, false);
s = SocketOperation.Read;
}
if (((_state == StateClosingRequestPending && !_closingInitiator)
|| (_state == StateClosingResponsePending && _closingInitiator)
|| _state == StatePingPending
|| _state == StatePongPending)
&& _writeState == WriteStateHeader) {
// We have things to write, ask to be notified when writes are ready.
s |= SocketOperation.Write;
}
return s;
}
@Override
public String protocol() {
return _instance.protocol();
}
@Override
public String toString() {
return _delegate.toString();
}
@Override
public String toDetailedString() {
return _delegate.toDetailedString();
}
@Override
public ConnectionInfo getInfo(boolean incoming, String adapterName, String connectionId) {
return new WSConnectionInfo(
_delegate.getInfo(incoming, adapterName, connectionId), _parser.getHeaders());
}
@Override
public void checkSendSize(Buffer buf) {
_delegate.checkSendSize(buf);
}
@Override
public void setBufferSize(int rcvSize, int sndSize) {
_delegate.setBufferSize(rcvSize, sndSize);
}
WSTransceiver(ProtocolInstance instance, Transceiver del, String host, String resource) {
init(instance, del);
_host = host;
_resource = resource;
_incoming = false;
//
// Use a 16KB write buffer size. We use 16KB for the write
// buffer size because all the data needs to be copied to the
// write buffer for the purpose of masking. A 16KB buffer
// appears to be a good compromise to reduce the number of
// socket write calls and not consume too much memory.
//
_writeBufferSize = 16 * 1024;
//
// Write and read buffer size must be large enough to hold the frame header!
//
assert (_writeBufferSize > 256);
assert (_readBufferSize > 256);
}
WSTransceiver(ProtocolInstance instance, Transceiver del) {
init(instance, del);
_host = "";
_resource = "";
_incoming = true;
//
// Write and read buffer size must be large enough to hold the frame header!
//
assert (_writeBufferSize > 256);
assert (_readBufferSize > 256);
}
private void init(ProtocolInstance instance, Transceiver del) {
_instance = instance;
_delegate = del;
_state = StateInitializeDelegate;
_parser = new HttpParser();
_readState = ReadStateOpcode;
_readBuffer = new Buffer(false, ByteOrder.BIG_ENDIAN); // Use network byte order.
_readBufferSize = 1024;
_readLastFrame = true;
_readOpCode = 0;
_readHeaderLength = 0;
_readPayloadLength = 0;
_readMask = new byte[4];
_writeState = WriteStateHeader;
_writeBuffer = new Buffer(false, ByteOrder.BIG_ENDIAN); // Use network byte order.
_writeBufferSize = 1024;
_readMask = new byte[4];
_writeMask = new byte[4];
_key = "";
_pingPayload = new byte[0];
_rand = new Random();
}
private void handleRequest(Buffer responseBuffer) {
//
// HTTP/1.1
//
if (_parser.versionMajor() != 1 || _parser.versionMinor() != 1) {
throw new WebSocketException("unsupported HTTP version");
}
//
// "An |Upgrade| header field containing the value 'websocket',
// treated as an ASCII case-insensitive value."
//
String val = _parser.getHeader("Upgrade", true);
if (val == null) {
throw new WebSocketException("missing value for Upgrade field");
} else if (!"websocket".equals(val)) {
throw new WebSocketException("invalid value `" + val + "' for Upgrade field");
}
//
// "A |Connection| header field that includes the token 'Upgrade',
// treated as an ASCII case-insensitive value.
//
val = _parser.getHeader("Connection", true);
if (val == null) {
throw new WebSocketException("missing value for Connection field");
} else if (val.indexOf("upgrade") == -1) {
throw new WebSocketException("invalid value `" + val + "' for Connection field");
}
//
// "A |Sec-WebSocket-Version| header field, with a value of 13."
//
val = _parser.getHeader("Sec-WebSocket-Version", false);
if (val == null) {
throw new WebSocketException("missing value for WebSocket version");
} else if (!"13".equals(val)) {
throw new WebSocketException("unsupported WebSocket version `" + val + "'");
}
//
// "Optionally, a |Sec-WebSocket-Protocol| header field, with a list
// of values indicating which protocols the client would like to
// speak, ordered by preference."
//
boolean addProtocol = false;
val = _parser.getHeader("Sec-WebSocket-Protocol", true);
if (val != null) {
String[] protocols = StringUtil.splitString(val, ",");
if (protocols == null) {
throw new WebSocketException("invalid value `" + val + "' for WebSocket protocol");
}
for (String p : protocols) {
if (!_iceProtocol.equals(p.trim())) {
throw new WebSocketException(
"unknown value `" + p + "' for WebSocket protocol");
}
addProtocol = true;
}
}
//
// "A |Sec-WebSocket-Key| header field with a base64-encoded
// value that, when decoded, is 16 bytes in length."
//
String key = _parser.getHeader("Sec-WebSocket-Key", false);
if (key == null) {
throw new WebSocketException("missing value for WebSocket key");
}
try {
byte[] decodedKey = Base64.decode(key);
if (decodedKey.length != 16) {
throw new WebSocketException("WebSocket key `" + key + "' has invalid length");
}
} catch (IllegalArgumentException ex) {
throw new WebSocketException("invalid base64 value `" + key + "' for WebSocket key");
}
//
// Retain the target resource.
//
_resource = _parser.uri();
//
// Compose the response.
//
StringBuffer out = new StringBuffer();
out.append("HTTP/1.1 101 Switching Protocols\r\n");
out.append("Upgrade: websocket\r\n");
out.append("Connection: Upgrade\r\n");
if (addProtocol) {
out.append("Sec-WebSocket-Protocol: " + _iceProtocol + "\r\n");
}
//
// The response includes:
//
// "A |Sec-WebSocket-Accept| header field. The value of this
// header field is constructed by concatenating /key/, defined
// above in step 4 in Section 4.2.2, with the string "258EAFA5-
// E914-47DA-95CA-C5AB0DC85B11", taking the SHA-1 hash of this
// concatenated value to obtain a 20-byte value and base64-
// encoding (see Section 4 of [RFC4648]) this 20-byte hash.
//
out.append("Sec-WebSocket-Accept: ");
final String input = key + _wsUUID;
try {
final MessageDigest sha1 = MessageDigest.getInstance("SHA1");
sha1.update(input.getBytes(_ascii));
final byte[] hash = sha1.digest();
out.append(Base64.encode(hash) + "\r\n" + "\r\n"); // EOM
} catch (NoSuchAlgorithmException ex) {
throw new WebSocketException(ex);
}
final byte[] bytes = out.toString().getBytes(_ascii);
assert (bytes.length == out.length());
responseBuffer.resize(bytes.length, false);
responseBuffer.position(0);
responseBuffer.b.put(bytes);
responseBuffer.flip();
}
private void handleResponse() {
String val;
//
// HTTP/1.1
//
if (_parser.versionMajor() != 1 || _parser.versionMinor() != 1) {
throw new WebSocketException("unsupported HTTP version");
}
//
// "If the status code received from the server is not 101, the
// client handles the response per HTTP [RFC2616] procedures. In
// particular, the client might perform authentication if it
// receives a 401 status code; the server might redirect the client
// using a 3xx status code (but clients are not required to follow
// them), etc."
//
if (_parser.status() != 101) {
StringBuffer out = new StringBuffer("unexpected status value " + _parser.status());
if (_parser.reason().length() > 0) {
out.append(":\n" + _parser.reason());
}
throw new WebSocketException(out.toString());
}
//
// "If the response lacks an |Upgrade| header field or the |Upgrade|
// header field contains a value that is not an ASCII case-
// insensitive match for the value "websocket", the client MUST
// _Fail the WebSocket Connection_."
//
val = _parser.getHeader("Upgrade", true);
if (val == null) {
throw new WebSocketException("missing value for Upgrade field");
} else if (!"websocket".equals(val)) {
throw new WebSocketException("invalid value `" + val + "' for Upgrade field");
}
//
// "If the response lacks a |Connection| header field or the
// |Connection| header field doesn't contain a token that is an
// ASCII case-insensitive match for the value "Upgrade", the client
// MUST _Fail the WebSocket Connection_."
//
val = _parser.getHeader("Connection", true);
if (val == null) {
throw new WebSocketException("missing value for Connection field");
} else if (val.indexOf("upgrade") == -1) {
throw new WebSocketException("invalid value `" + val + "' for Connection field");
}
//
// "If the response includes a |Sec-WebSocket-Protocol| header field
// and this header field indicates the use of a subprotocol that was
// not present in the client's handshake (the server has indicated a
// subprotocol not requested by the client), the client MUST _Fail
// the WebSocket Connection_."
//
val = _parser.getHeader("Sec-WebSocket-Protocol", true);
if (val != null && !_iceProtocol.equals(val)) {
throw new WebSocketException("invalid value `" + val + "' for WebSocket protocol");
}
//
// "If the response lacks a |Sec-WebSocket-Accept| header field or
// the |Sec-WebSocket-Accept| contains a value other than the
// base64-encoded SHA-1 of the concatenation of the |Sec-WebSocket-
// Key| (as a string, not base64-decoded) with the string "258EAFA5-
// E914-47DA-95CA-C5AB0DC85B11" but ignoring any leading and
// trailing whitespace, the client MUST _Fail the WebSocket
// Connection_."
//
val = _parser.getHeader("Sec-WebSocket-Accept", false);
if (val == null) {
throw new WebSocketException("missing value for Sec-WebSocket-Accept");
}
try {
final String input = _key + _wsUUID;
final MessageDigest sha1 = MessageDigest.getInstance("SHA1");
sha1.update(input.getBytes(_ascii));
if (!val.equals(Base64.encode(sha1.digest()))) {
throw new WebSocketException("invalid value `" + val + "' for Sec-WebSocket-Accept");
}
} catch (NoSuchAlgorithmException ex) {
throw new WebSocketException(ex);
}
}
private boolean preRead(Buffer buf) {
while (true) {
if (_readState == ReadStateOpcode) {
//
// Is there enough data available to read the opcode?
//
if (!readBuffered(2)) {
return true;
}
//
// Most-significant bit indicates whether this is the
// last frame. Least-significant four bits hold the
// opcode.
//
int ch = _readBuffer.b.get(_readBufferPos++);
if (ch < 0) {
ch += 256;
}
_readOpCode = ch & 0xf;
//
// Remember if last frame if we're going to read a data or
// continuation frame, this is only for protocol
// correctness checking purpose.
//
if (_readOpCode == OP_DATA) {
if (!_readLastFrame) {
throw new ProtocolException("invalid data frame, no FIN on previous frame");
}
_readLastFrame = (ch & FLAG_FINAL) == FLAG_FINAL;
} else if (_readOpCode == OP_CONT) {
if (_readLastFrame) {
throw new ProtocolException(
"invalid continuation frame, previous frame FIN set");
}
_readLastFrame = (ch & FLAG_FINAL) == FLAG_FINAL;
}
ch = _readBuffer.b.get(_readBufferPos++);
if (ch < 0) {
ch += 256;
}
//
// Check the MASK bit. Messages sent by a client must be masked;
// messages sent by a server must not be masked.
//
final boolean masked = (ch & FLAG_MASKED) == FLAG_MASKED;
if (masked != _incoming) {
throw new ProtocolException("invalid masking");
}
//
// Extract the payload length, which can have the following values:
//
// 0-125: The payload length
// 126: The subsequent two bytes contain the payload length
// 127: The subsequent eight bytes contain the payload length
//
_readPayloadLength = ch & 0x7f;
if (_readPayloadLength < 126) {
_readHeaderLength = 0;
} else if (_readPayloadLength == 126) {
_readHeaderLength = 2; // Need to read a 16-bit payload length.
} else {
_readHeaderLength = 8; // Need to read a 64-bit payload length.
}
if (masked) {
_readHeaderLength += 4; // Need to read a 32-bit mask.
}
_readState = ReadStateHeader;
}
if (_readState == ReadStateHeader) {
// Is there enough data available to read the header?
if (_readHeaderLength > 0 && !readBuffered(_readHeaderLength)) {
return true;
}
if (_readPayloadLength == 126) {
_readPayloadLength = _readBuffer.b.getShort(_readBufferPos); // Uses network byte order.
if (_readPayloadLength < 0) {
_readPayloadLength += 65536;
}
_readBufferPos += 2;
} else if (_readPayloadLength == 127) {
long l = _readBuffer.b.getLong(_readBufferPos); // Uses network byte order.
_readBufferPos += 8;
if (l < 0 || l > Integer.MAX_VALUE) {
throw new ProtocolException("invalid WebSocket payload length: " + l);
}
_readPayloadLength = (int) l;
}
// Read the mask if this is an incoming connection.
if (_incoming) {
assert (_readBuffer.b.position() - _readBufferPos >= 4); // We must have needed to read the mask.
for (int i = 0; i < 4; i++) {
_readMask[i] = _readBuffer.b.get(_readBufferPos++); // Copy the mask.
}
}
switch (_readOpCode) {
case OP_TEXT /* Text frame */ -> {
throw new ProtocolException("text frames not supported");
}
case OP_CONT /* Continuation frame */, OP_DATA /* Data frame */ -> {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"received "
+ protocol()
+ (_readOpCode == OP_DATA ? " data" : " continuation")
+ " frame with payload length of "
+ _readPayloadLength
+ " bytes\n"
+ toString());
}
if (_readPayloadLength <= 0) {
throw new ProtocolException("payload length is 0");
}
_readState = ReadStatePayload;
assert (buf.b.hasRemaining());
_readFrameStart = buf.b.position();
}
case OP_CLOSE /* Connection close */ -> {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"received " + protocol() + " connection close frame\n" + toString());
}
int s = _nextState == StateOpened ? _state : _nextState;
if (s == StateClosingRequestPending) {
// If we receive a close frame while we were actually waiting to send one,
// change the role and send a close frame response.
if (!_closingInitiator) {
_closingInitiator = true;
}
if (_state == StateClosingRequestPending) {
_state = StateClosingResponsePending;
} else {
_nextState = StateClosingResponsePending;
}
return false; // No longer interested in reading
} else {
throw new ConnectionLostException();
}
}
case OP_PING -> {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"received "
+ protocol()
+ " connection ping frame\n"
+ toString());
}
_readState = ReadStateControlFrame;
}
case OP_PONG -> {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"received "
+ protocol()
+ " connection pong frame\n"
+ toString());
}
_readState = ReadStateControlFrame;
}
default -> throw new ProtocolException("unsupported opcode: " + _readOpCode);
}
}
if (_readState == ReadStateControlFrame) {
if (_readPayloadLength > 0 && !readBuffered(_readPayloadLength)) {
return true;
}
if (_readPayloadLength > 0 && _readOpCode == OP_PING) {
_pingPayload = new byte[_readPayloadLength];
if (_readBuffer.b.hasArray()) {
System.arraycopy(
_readBuffer.b.array(),
_readBuffer.b.arrayOffset() + _readBufferPos,
_pingPayload,
0,
_readPayloadLength);
} else {
for (int i = 0; i < _readPayloadLength; i++) {
_pingPayload[i] = _readBuffer.b.get(_readBufferPos + i);
}
}
}
_readBufferPos += _readPayloadLength;
_readPayloadLength = 0;
if (_readOpCode == OP_PING) {
if (_state == StateOpened) {
_state = StatePongPending; // Send pong frame now
} else if (_nextState < StatePongPending) {
_nextState = StatePongPending; // Send pong frame next
}
}
//
// We've read the payload of the PING/PONG frame, we're ready
// to read a new frame.
//
_readState = ReadStateOpcode;
}
if (_readState == ReadStatePayload) {
//
// This must be assigned before the check for the buffer. If the buffer is empty
// or already read, postRead will return false.
//
_readStart = buf.b.position();
if (buf.empty() || !buf.b.hasRemaining()) {
return false;
}
int n = Math.min(_readBuffer.b.position() - _readBufferPos, buf.b.remaining());
if (n > _readPayloadLength) {
n = _readPayloadLength;
}
if (n > 0) {
if (buf.b.hasArray() && _readBuffer.b.hasArray()) {
System.arraycopy(
_readBuffer.b.array(),
_readBuffer.b.arrayOffset() + _readBufferPos,
buf.b.array(),
buf.b.arrayOffset() + buf.b.position(),
n);
buf.position(buf.b.position() + n);
} else {
for (int i = 0; i < n; i++) {
buf.b.put(_readBuffer.b.get(_readBufferPos + i));
}
}
_readBufferPos += n;
}
//
// Continue reading if we didn't read the full message or there's more payload data
// to read.
//
return buf.b.hasRemaining() && n < _readPayloadLength;
}
}
}
private boolean postRead(Buffer buf) {
if (_readState != ReadStatePayload) {
return _readStart < _readBuffer.b.position(); // Returns true if data was read.
}
if (_readStart == buf.b.position()) {
return false; // Nothing was read or nothing to read.
}
assert (_readStart < buf.b.position());
if (_incoming) {
//
// Unmask the data we just read.
//
final int pos = buf.b.position();
if (buf.b.hasArray()) {
byte[] arr = buf.b.array();
int offset = buf.b.arrayOffset();
for (int n = _readStart; n < pos; n++) {
arr[n + offset] =
(byte) (arr[n + offset] ^ _readMask[(n - _readFrameStart) % 4]);
}
} else {
for (int n = _readStart; n < pos; n++) {
final byte b = (byte) (buf.b.get(n) ^ _readMask[(n - _readFrameStart) % 4]);
buf.b.put(n, b);
}
}
}
_readPayloadLength -= buf.b.position() - _readStart;
_readStart = buf.b.position();
if (_readPayloadLength == 0) {
//
// We've read the complete payload, we're ready to read a new frame.
//
_readState = ReadStateOpcode;
}
return buf.b.hasRemaining();
}
private boolean preWrite(Buffer buf) {
if (_writeState == WriteStateHeader) {
if (_state == StateOpened) {
if (buf.empty() || !buf.b.hasRemaining()) {
return false;
}
assert (buf.b.position() == 0);
prepareWriteHeader((byte) OP_DATA, buf.size());
_writeState = WriteStatePayload;
} else if (_state == StatePingPending) {
prepareWriteHeader((byte) OP_PING, 0); // Don't send any payload
_writeState = WriteStateControlFrame;
_writeBuffer.flip();
} else if (_state == StatePongPending) {
prepareWriteHeader((byte) OP_PONG, _pingPayload.length);
if (_pingPayload.length > _writeBuffer.b.remaining()) {
final int pos = _writeBuffer.b.position();
_writeBuffer.resize(pos + _pingPayload.length, false);
_writeBuffer.position(pos);
}
_writeBuffer.b.put(_pingPayload);
_pingPayload = new byte[0];
_writeState = WriteStateControlFrame;
_writeBuffer.flip();
} else if ((_state == StateClosingRequestPending && !_closingInitiator)
|| (_state == StateClosingResponsePending && _closingInitiator)) {
prepareWriteHeader((byte) OP_CLOSE, 2);
// Write closing reason
_writeBuffer.b.putShort((short) _closingReason);
if (!_incoming) {
byte b;
int pos = _writeBuffer.b.position() - 2;
b = (byte) (_writeBuffer.b.get(pos) ^ _writeMask[0]);
_writeBuffer.b.put(pos, b);
pos++;
b = (byte) (_writeBuffer.b.get(pos) ^ _writeMask[1]);
_writeBuffer.b.put(pos, b);
}
_writeState = WriteStateControlFrame;
_writeBuffer.flip();
} else {
assert (_state != StateClosed);
return false; // Nothing to write in this state
}
_writePayloadLength = 0;
}
if (_writeState == WriteStatePayload) {
//
// For an outgoing connection, each message must be masked with a random
// 32-bit value, so we copy the entire message into the internal buffer
// for writing. For incoming connections, we just copy the start of the
// message in the internal buffer after the hedaer. If the message is
// larger, the reminder is sent directly from the message buffer to avoid
// copying.
//
if (!_incoming && (_writePayloadLength == 0 || !_writeBuffer.b.hasRemaining())) {
if (!_writeBuffer.b.hasRemaining()) {
_writeBuffer.position(0);
}
int n = buf.b.position();
final int sz = buf.size();
if (buf.b.hasArray() && _writeBuffer.b.hasArray()) {
int pos = _writeBuffer.b.position();
final int count = Math.min(sz - n, _writeBuffer.b.remaining());
final byte[] src = buf.b.array();
final int srcOff = buf.b.arrayOffset();
final byte[] dest = _writeBuffer.b.array();
final int destOff = _writeBuffer.b.arrayOffset();
for (int i = 0; i < count; i++, n++, pos++) {
dest[destOff + pos] = (byte) (src[srcOff + n] ^ _writeMask[n % 4]);
}
_writeBuffer.position(pos);
} else {
for (; n < sz && _writeBuffer.b.hasRemaining(); n++) {
final byte b = (byte) (buf.b.get(n) ^ _writeMask[n % 4]);
_writeBuffer.b.put(b);
}
}
_writePayloadLength = n;
_writeBuffer.flip();
} else if (_writePayloadLength == 0) {
assert _incoming;
if (_writeBuffer.b.hasRemaining()) {
assert (buf.b.position() == 0);
int n = _writeBuffer.b.remaining();
if (buf.b.remaining() > n) {
int limit = buf.b.limit();
buf.limit(n);
_writeBuffer.b.put(buf.b);
buf.limit(limit);
_writePayloadLength = n;
} else {
_writePayloadLength = buf.b.remaining();
_writeBuffer.b.put(buf.b);
}
buf.position(0);
}
_writeBuffer.flip();
}
return true;
} else if (_writeState == WriteStateControlFrame) {
return _writeBuffer.b.hasRemaining();
} else {
assert (_writeState == WriteStateFlush);
return true;
}
}
private boolean postWrite(Buffer buf, int status) {
if (_state > StateOpened && _writeState == WriteStateControlFrame) {
if (!_writeBuffer.b.hasRemaining()) {
if (_state == StatePingPending) {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"sent "
+ protocol()
+ " connection ping frame\n"
+ toString());
}
} else if (_state == StatePongPending) {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"sent "
+ protocol()
+ " connection pong frame\n"
+ toString());
}
} else if ((_state == StateClosingRequestPending && !_closingInitiator)
|| (_state == StateClosingResponsePending && _closingInitiator)) {
if (_instance.traceLevel() >= 2) {
_instance
.logger()
.trace(
_instance.traceCategory(),
"sent "
+ protocol()
+ " connection close frame\n"
+ toString());
}
if (_state == StateClosingRequestPending && !_closingInitiator) {
_writeState = WriteStateHeader;
_state = StateClosingResponsePending;
return false;
} else {
throw new ConnectionLostException();
}
} else if (_state == StateClosed) {
return false;
}
_state = _nextState;
_nextState = StateOpened;
_writeState = WriteStateHeader;
} else {
return status == SocketOperation.None;
}
}
if ((!_incoming || buf.b.position() == 0) && _writePayloadLength > 0) {
if (!_writeBuffer.b.hasRemaining()) {
buf.position(_writePayloadLength);
}
}
if (status == SocketOperation.Write
&& !buf.b.hasRemaining()
&& !_writeBuffer.b.hasRemaining()) {
//
// Our buffers are empty but the delegate needs another call to write().
//
_writeState = WriteStateFlush;
return false;
} else if (!buf.b.hasRemaining()) {
_writeState = WriteStateHeader;
if (_state == StatePingPending
|| _state == StatePongPending
|| (_state == StateClosingRequestPending && !_closingInitiator)
|| (_state == StateClosingResponsePending && _closingInitiator)) {
return true;
}
} else if (_state == StateOpened) {
return status == SocketOperation.None;
}
return false;
}
private boolean readBuffered(int sz) {
if (_readBufferPos == _readBuffer.b.position()) {
_readBuffer.resize(_readBufferSize, true);
_readBufferPos = 0;
_readBuffer.position(0);
} else {
final int available = _readBuffer.b.position() - _readBufferPos;
if (available < sz) {
if (_readBufferPos > 0) {
_readBuffer.limit(_readBuffer.b.position());
_readBuffer.position(_readBufferPos);
_readBuffer.b.compact();
assert (_readBuffer.b.position() == available);
}
_readBuffer.resize(Math.max(_readBufferSize, sz), true);
_readBufferPos = 0;
_readBuffer.position(available);
}
}
_readStart = _readBuffer.b.position();
if (_readBufferPos + sz > _readBuffer.b.position()) {
return false; // Not enough read.
}
assert (_readBuffer.b.position() > _readBufferPos);
return true;
}
private void prepareWriteHeader(byte opCode, int payloadLength) {
//
// We need to prepare the frame header.
//
_writeBuffer.resize(_writeBufferSize, false);
_writeBuffer.limit(_writeBufferSize);
_writeBuffer.position(0);
//
// Set the opcode - this is the one and only data frame.
//
_writeBuffer.b.put((byte) (opCode | FLAG_FINAL));
//
// Set the payload length.
//
if (payloadLength <= 125) {
_writeBuffer.b.put((byte) payloadLength);
} else if (payloadLength > 125 && payloadLength <= 65535) {
//
// Use an extra 16 bits to encode the payload length.
//
_writeBuffer.b.put((byte) 126);
_writeBuffer.b.putShort((short) payloadLength);
} else if (payloadLength > 65535) {
//
// Use an extra 64 bits to encode the payload length.
//
_writeBuffer.b.put((byte) 127);
_writeBuffer.b.putLong(payloadLength);
}
if (!_incoming) {
//
// Add a random 32-bit mask to every outgoing frame, copy the payload data,
// and apply the mask.
//
_writeBuffer.b.put(1, (byte) (_writeBuffer.b.get(1) | FLAG_MASKED));
_rand.nextBytes(_writeMask);
_writeBuffer.b.put(_writeMask);
}
}
private ProtocolInstance _instance;
private Transceiver _delegate;
private String _host;
private String _resource;
private boolean _incoming;
private ReadyCallback _readyCallback;
private static final int StateInitializeDelegate = 0;
private static final int StateConnected = 1;
private static final int StateUpgradeRequestPending = 2;
private static final int StateUpgradeResponsePending = 3;
private static final int StateOpened = 4;
private static final int StatePingPending = 5;
private static final int StatePongPending = 6;
private static final int StateClosingRequestPending = 7;
private static final int StateClosingResponsePending = 8;
private static final int StateClosed = 9;
private int _state;
private int _nextState;
private HttpParser _parser;
private String _key;
private static final int ReadStateOpcode = 0;
private static final int ReadStateHeader = 1;
private static final int ReadStateControlFrame = 2;
private static final int ReadStatePayload = 3;
private int _readState;
private Buffer _readBuffer;
private int _readBufferPos;
private int _readBufferSize;
private boolean _readLastFrame;
private int _readOpCode;
private int _readHeaderLength;
private int _readPayloadLength;
private int _readStart;
private int _readFrameStart;
private byte[] _readMask;
private static final int WriteStateHeader = 0;
private static final int WriteStatePayload = 1;
private static final int WriteStateControlFrame = 2;
private static final int WriteStateFlush = 3;
private int _writeState;
private Buffer _writeBuffer;
private int _writeBufferSize;
private byte[] _writeMask;
private int _writePayloadLength;
private boolean _closingInitiator;
private int _closingReason;
private byte[] _pingPayload;
private Random _rand;
//
// WebSocket opcodes
//
private static final int OP_CONT = 0x0; // Continuation frame
private static final int OP_TEXT = 0x1; // Text frame
private static final int OP_DATA = 0x2; // Data frame
@SuppressWarnings("unused")
private static final int OP_RES_0x3 = 0x3; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0x4 = 0x4; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0x5 = 0x5; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0x6 = 0x6; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0x7 = 0x7; // Reserved
private static final int OP_CLOSE = 0x8; // Connection close
private static final int OP_PING = 0x9; // Ping
private static final int OP_PONG = 0xA; // Pong
@SuppressWarnings("unused")
private static final int OP_RES_0xB = 0xB; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0xC = 0xC; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0xD = 0xD; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0xE = 0xE; // Reserved
@SuppressWarnings("unused")
private static final int OP_RES_0xF = 0xF; // Reserved
private static final int FLAG_FINAL = 0x80; // Last frame
private static final int FLAG_MASKED = 0x80; // Payload is masked
private static final int CLOSURE_NORMAL = 1000;
private static final int CLOSURE_SHUTDOWN = 1001;
private static final int CLOSURE_PROTOCOL_ERROR = 1002;
private static final String _iceProtocol = "ice.zeroc.com";
private static final String _wsUUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
static final Charset _ascii = Charset.forName("US-ASCII");
}