1
0
mirror of https://github.com/bitwarden/help synced 2025-12-22 11:13:14 +00:00

Promote to Master (#748)

* initial commit

* adding quotes for the array error

* Create Gemfile

* Create Gemfile.lock

* add .nvmrc and .node-version

* removed /article from URL

* update links to work with netlify

* more fixed links

* link fixes

* update bad links

* Update netlify.toml

toml test for redirects

* article redirect

* link fixes

* Update index.html

* Update netlify.toml

* Update _config.yml

* Update netlify.toml

* Update netlify.toml

* Update netlify.toml

* Update netlify.toml

* Update netlify.toml

* add article back into URL for launch

* Update netlify.toml

* Update netlify.toml

* add order to categories front matter

* Update netlify.toml

* update

* sidemenu update

* Revert "sidemenu update"

This reverts commit 5441c3d35c.

* update order prop

* Navbar updates per Gary and compiler warnings

* font/style tweaks

* Update sidebar.html

* Stage Release Documentation (#739)

* initial drafts

* rewrite Custom Fields article to prioritize new context-menu option & better organize ancillary information

* edit

* edit

* Custom Field Context Menu & CAPTCHA item in release notes

* SSO relink event

* update rn

* small edits

* improve release notes titles

* fix side menu

* Edits courtest of mportune!

* update order

* link fixes

* link cleanup

* image updates and a link

* fix trailing slash

Co-authored-by: DanHillesheim <79476558+DanHillesheim@users.noreply.github.com>
This commit is contained in:
fred_the_tech_writer
2021-09-21 13:21:11 -04:00
committed by GitHub
parent 63f78e8979
commit 906e2ca0dd
3304 changed files with 386714 additions and 8864 deletions

View File

@@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry kind="src" path="src"/>
<classpathentry excluding="src/" kind="src" path=""/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="con" path="org.eclipse.jdt.junit.JUNIT_CONTAINER/4"/>
<classpathentry kind="output" path="src"/>
</classpath>

View File

@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>em_reactor</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>

View File

@@ -0,0 +1,613 @@
/**
* $Id$
*
* Author:: Francis Cianfrocca (gmail: blackhedd)
* Homepage:: http://rubyeventmachine.com
* Date:: 15 Jul 2007
*
* See EventMachine and EventMachine::Connection for documentation and
* usage examples.
*
*
*----------------------------------------------------------------------------
*
* Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
* Gmail: blackhedd
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of either: 1) the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version; or 2) Ruby's License.
*
* See the file COPYING for complete licensing information.
*
*---------------------------------------------------------------------------
*
*
*/
package com.rubyeventmachine;
import java.io.*;
import java.nio.channels.*;
import java.util.*;
import java.nio.*;
import java.net.*;
import java.util.concurrent.atomic.*;
import java.security.*;
public class EmReactor {
public final int EM_TIMER_FIRED = 100;
public final int EM_CONNECTION_READ = 101;
public final int EM_CONNECTION_UNBOUND = 102;
public final int EM_CONNECTION_ACCEPTED = 103;
public final int EM_CONNECTION_COMPLETED = 104;
public final int EM_LOOPBREAK_SIGNAL = 105;
public final int EM_CONNECTION_NOTIFY_READABLE = 106;
public final int EM_CONNECTION_NOTIFY_WRITABLE = 107;
public final int EM_SSL_HANDSHAKE_COMPLETED = 108;
public final int EM_SSL_VERIFY = 109;
public final int EM_PROXY_TARGET_UNBOUND = 110;
public final int EM_PROXY_COMPLETED = 111;
public final int EM_PROTO_SSLv2 = 2;
public final int EM_PROTO_SSLv3 = 4;
public final int EM_PROTO_TLSv1 = 8;
public final int EM_PROTO_TLSv1_1 = 16;
public final int EM_PROTO_TLSv1_2 = 32;
private Selector mySelector;
private TreeMap<Long, ArrayList<Long>> Timers;
private HashMap<Long, EventableChannel> Connections;
private HashMap<Long, ServerSocketChannel> Acceptors;
private ArrayList<Long> NewConnections;
private ArrayList<Long> UnboundConnections;
private ArrayList<EventableSocketChannel> DetachedConnections;
private boolean bRunReactor;
private long BindingIndex;
private AtomicBoolean loopBreaker;
private ByteBuffer myReadBuffer;
private int timerQuantum;
public EmReactor() {
Timers = new TreeMap<Long, ArrayList<Long>>();
Connections = new HashMap<Long, EventableChannel>();
Acceptors = new HashMap<Long, ServerSocketChannel>();
NewConnections = new ArrayList<Long>();
UnboundConnections = new ArrayList<Long>();
DetachedConnections = new ArrayList<EventableSocketChannel>();
BindingIndex = 0;
loopBreaker = new AtomicBoolean();
loopBreaker.set(false);
myReadBuffer = ByteBuffer.allocate(32*1024); // don't use a direct buffer. Ruby doesn't seem to like them.
timerQuantum = 98;
}
/**
* This is a no-op stub, intended to be overridden in user code.
*/
public void eventCallback (long sig, int eventType, ByteBuffer data, long data2) {
System.out.println ("Default callback: "+sig+" "+eventType+" "+data+" "+data2);
}
public void eventCallback (long sig, int eventType, ByteBuffer data) {
eventCallback (sig, eventType, data, 0);
}
public void run() {
try {
mySelector = Selector.open();
bRunReactor = true;
} catch (IOException e) {
throw new RuntimeException ("Could not open selector", e);
}
while (bRunReactor) {
runLoopbreaks();
if (!bRunReactor) break;
runTimers();
if (!bRunReactor) break;
removeUnboundConnections();
checkIO();
addNewConnections();
processIO();
}
close();
}
void addNewConnections() {
ListIterator<EventableSocketChannel> iter = DetachedConnections.listIterator(0);
while (iter.hasNext()) {
EventableSocketChannel ec = iter.next();
ec.cleanup();
}
DetachedConnections.clear();
ListIterator<Long> iter2 = NewConnections.listIterator(0);
while (iter2.hasNext()) {
long b = iter2.next();
EventableChannel ec = Connections.get(b);
if (ec != null) {
try {
ec.register();
} catch (ClosedChannelException e) {
UnboundConnections.add (ec.getBinding());
}
}
}
NewConnections.clear();
}
void removeUnboundConnections() {
ListIterator<Long> iter = UnboundConnections.listIterator(0);
while (iter.hasNext()) {
long b = iter.next();
EventableChannel ec = Connections.remove(b);
if (ec != null) {
eventCallback (b, EM_CONNECTION_UNBOUND, null);
ec.close();
EventableSocketChannel sc = (EventableSocketChannel) ec;
if (sc != null && sc.isAttached())
DetachedConnections.add (sc);
}
}
UnboundConnections.clear();
}
void checkIO() {
long timeout;
if (NewConnections.size() > 0) {
timeout = -1;
} else if (!Timers.isEmpty()) {
long now = new Date().getTime();
long k = Timers.firstKey();
long diff = k-now;
if (diff <= 0)
timeout = -1; // don't wait, just poll once
else
timeout = diff;
} else {
timeout = 0; // wait indefinitely
}
try {
if (timeout == -1)
mySelector.selectNow();
else
mySelector.select(timeout);
} catch (IOException e) {
e.printStackTrace();
}
}
void processIO() {
Iterator<SelectionKey> it = mySelector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey k = it.next();
it.remove();
if (k.isConnectable())
isConnectable(k);
else if (k.isAcceptable())
isAcceptable(k);
else {
if (k.isWritable())
isWritable(k);
if (k.isReadable())
isReadable(k);
}
}
}
void isAcceptable (SelectionKey k) {
ServerSocketChannel ss = (ServerSocketChannel) k.channel();
SocketChannel sn;
long b;
for (int n = 0; n < 10; n++) {
try {
sn = ss.accept();
if (sn == null)
break;
} catch (IOException e) {
e.printStackTrace();
k.cancel();
ServerSocketChannel server = Acceptors.remove(k.attachment());
if (server != null)
try{ server.close(); } catch (IOException ex) {};
break;
}
try {
sn.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
continue;
}
b = createBinding();
EventableSocketChannel ec = new EventableSocketChannel (sn, b, mySelector);
Connections.put (b, ec);
NewConnections.add (b);
eventCallback (((Long)k.attachment()).longValue(), EM_CONNECTION_ACCEPTED, null, b);
}
}
void isReadable (SelectionKey k) {
EventableChannel ec = (EventableChannel) k.attachment();
long b = ec.getBinding();
if (ec.isWatchOnly()) {
if (ec.isNotifyReadable())
eventCallback (b, EM_CONNECTION_NOTIFY_READABLE, null);
} else {
myReadBuffer.clear();
try {
ec.readInboundData (myReadBuffer);
myReadBuffer.flip();
if (myReadBuffer.limit() > 0)
eventCallback (b, EM_CONNECTION_READ, myReadBuffer);
} catch (IOException e) {
UnboundConnections.add (b);
}
}
}
void isWritable (SelectionKey k) {
EventableChannel ec = (EventableChannel) k.attachment();
long b = ec.getBinding();
if (ec.isWatchOnly()) {
if (ec.isNotifyWritable())
eventCallback (b, EM_CONNECTION_NOTIFY_WRITABLE, null);
}
else {
try {
if (!ec.writeOutboundData())
UnboundConnections.add (b);
} catch (IOException e) {
UnboundConnections.add (b);
}
}
}
void isConnectable (SelectionKey k) {
EventableSocketChannel ec = (EventableSocketChannel) k.attachment();
long b = ec.getBinding();
try {
if (ec.finishConnecting())
eventCallback (b, EM_CONNECTION_COMPLETED, null);
else
UnboundConnections.add (b);
} catch (IOException e) {
UnboundConnections.add (b);
}
}
void close() {
try {
if (mySelector != null)
mySelector.close();
} catch (IOException e) {}
mySelector = null;
// run down open connections and sockets.
Iterator<ServerSocketChannel> i = Acceptors.values().iterator();
while (i.hasNext()) {
try {
i.next().close();
} catch (IOException e) {}
}
// 29Sep09: We create an ArrayList of the existing connections, then iterate over
// that to call unbind on them. This is because an unbind can trigger a reconnect,
// which will add to the Connections HashMap, causing a ConcurrentModificationException.
// XXX: The correct behavior here would be to latch the various reactor methods to return
// immediately if the reactor is shutting down.
ArrayList<EventableChannel> conns = new ArrayList<EventableChannel>();
Iterator<EventableChannel> i2 = Connections.values().iterator();
while (i2.hasNext()) {
EventableChannel ec = i2.next();
if (ec != null) {
conns.add (ec);
}
}
Connections.clear();
ListIterator<EventableChannel> i3 = conns.listIterator(0);
while (i3.hasNext()) {
EventableChannel ec = i3.next();
eventCallback (ec.getBinding(), EM_CONNECTION_UNBOUND, null);
ec.close();
EventableSocketChannel sc = (EventableSocketChannel) ec;
if (sc != null && sc.isAttached())
DetachedConnections.add (sc);
}
ListIterator<EventableSocketChannel> i4 = DetachedConnections.listIterator(0);
while (i4.hasNext()) {
EventableSocketChannel ec = i4.next();
ec.cleanup();
}
DetachedConnections.clear();
}
void runLoopbreaks() {
if (loopBreaker.getAndSet(false)) {
eventCallback (0, EM_LOOPBREAK_SIGNAL, null);
}
}
public void stop() {
bRunReactor = false;
signalLoopbreak();
}
void runTimers() {
long now = new Date().getTime();
while (!Timers.isEmpty()) {
long k = Timers.firstKey();
if (k > now)
break;
ArrayList<Long> callbacks = Timers.get(k);
Timers.remove(k);
// Fire all timers at this timestamp
ListIterator<Long> iter = callbacks.listIterator(0);
while (iter.hasNext()) {
eventCallback (0, EM_TIMER_FIRED, null, iter.next().longValue());
}
}
}
public long installOneshotTimer (long milliseconds) {
long s = createBinding();
long deadline = new Date().getTime() + milliseconds;
if (Timers.containsKey(deadline)) {
Timers.get(deadline).add(s);
} else {
ArrayList<Long> callbacks = new ArrayList<Long>();
callbacks.add(s);
Timers.put(deadline, callbacks);
}
return s;
}
public long startTcpServer (SocketAddress sa) throws EmReactorException {
try {
ServerSocketChannel server = ServerSocketChannel.open();
server.configureBlocking(false);
server.socket().bind (sa);
long s = createBinding();
Acceptors.put(s, server);
server.register(mySelector, SelectionKey.OP_ACCEPT, s);
return s;
} catch (IOException e) {
throw new EmReactorException ("unable to open socket acceptor: " + e.toString());
}
}
public long startTcpServer (String address, int port) throws EmReactorException {
return startTcpServer (new InetSocketAddress (address, port));
}
public void stopTcpServer (long signature) throws IOException {
ServerSocketChannel server = Acceptors.remove(signature);
if (server != null)
server.close();
else
throw new RuntimeException ("failed to close unknown acceptor");
}
public long openUdpSocket (InetSocketAddress address) throws IOException {
// TODO, don't throw an exception out of here.
DatagramChannel dg = DatagramChannel.open();
dg.configureBlocking(false);
dg.socket().bind(address);
long b = createBinding();
EventableChannel ec = new EventableDatagramChannel (dg, b, mySelector);
dg.register(mySelector, SelectionKey.OP_READ, ec);
Connections.put(b, ec);
return b;
}
public long openUdpSocket (String address, int port) throws IOException {
return openUdpSocket (new InetSocketAddress (address, port));
}
public void sendData (long sig, ByteBuffer bb) throws IOException {
Connections.get(sig).scheduleOutboundData( bb );
}
public void sendData (long sig, byte[] data) throws IOException {
sendData (sig, ByteBuffer.wrap(data));
}
public void setCommInactivityTimeout (long sig, long mills) {
Connections.get(sig).setCommInactivityTimeout (mills);
}
public void sendDatagram (long sig, byte[] data, int length, String recipAddress, int recipPort) {
sendDatagram (sig, ByteBuffer.wrap(data), recipAddress, recipPort);
}
public void sendDatagram (long sig, ByteBuffer bb, String recipAddress, int recipPort) {
(Connections.get(sig)).scheduleOutboundDatagram( bb, recipAddress, recipPort);
}
public long connectTcpServer (String address, int port) {
return connectTcpServer(null, 0, address, port);
}
public long connectTcpServer (String bindAddr, int bindPort, String address, int port) {
long b = createBinding();
try {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
if (bindAddr != null)
sc.socket().bind(new InetSocketAddress (bindAddr, bindPort));
EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);
if (sc.connect (new InetSocketAddress (address, port))) {
// Connection returned immediately. Can happen with localhost connections.
// WARNING, this code is untested due to lack of available test conditions.
// Ought to be be able to come here from a localhost connection, but that
// doesn't happen on Linux. (Maybe on FreeBSD?)
// The reason for not handling this until we can test it is that we
// really need to return from this function WITHOUT triggering any EM events.
// That's because until the user code has seen the signature we generated here,
// it won't be able to properly dispatch them. The C++ EM deals with this
// by setting pending mode as a flag in ALL eventable descriptors and making
// the descriptor select for writable. Then, it can send UNBOUND and
// CONNECTION_COMPLETED on the next pass through the loop, because writable will
// fire.
throw new RuntimeException ("immediate-connect unimplemented");
}
else {
ec.setConnectPending();
Connections.put (b, ec);
NewConnections.add (b);
}
} catch (IOException e) {
// Can theoretically come here if a connect failure can be determined immediately.
// I don't know how to make that happen for testing purposes.
throw new RuntimeException ("immediate-connect unimplemented: " + e.toString());
}
return b;
}
public void closeConnection (long sig, boolean afterWriting) {
EventableChannel ec = Connections.get(sig);
if (ec != null)
if (ec.scheduleClose (afterWriting))
UnboundConnections.add (sig);
}
long createBinding() {
return ++BindingIndex;
}
public void signalLoopbreak() {
loopBreaker.set(true);
if (mySelector != null)
mySelector.wakeup();
}
public void startTls (long sig) throws NoSuchAlgorithmException, KeyManagementException {
Connections.get(sig).startTls();
}
public void setTimerQuantum (int mills) {
if (mills < 5 || mills > 2500)
throw new RuntimeException ("attempt to set invalid timer-quantum value: "+mills);
timerQuantum = mills;
}
public Object[] getPeerName (long sig) {
EventableChannel channel = Connections.get(sig);
if (channel != null) {
return Connections.get(sig).getPeerName();
}
else {
ServerSocketChannel acceptor = Acceptors.get(sig);
return new Object[] { acceptor.socket().getLocalPort(),
acceptor.socket().getInetAddress().getHostAddress() };
}
}
public Object[] getSockName (long sig) {
EventableChannel channel = Connections.get(sig);
if (channel != null) {
return Connections.get(sig).getSockName();
}
else {
ServerSocketChannel acceptor = Acceptors.get(sig);
return new Object[] { acceptor.socket().getLocalPort(),
acceptor.socket().getInetAddress().getHostAddress() };
}
}
public long attachChannel (SocketChannel sc, boolean watch_mode) {
long b = createBinding();
EventableSocketChannel ec = new EventableSocketChannel (sc, b, mySelector);
ec.setAttached();
if (watch_mode)
ec.setWatchOnly();
Connections.put (b, ec);
NewConnections.add (b);
return b;
}
public SocketChannel detachChannel (long sig) {
EventableSocketChannel ec = (EventableSocketChannel) Connections.get (sig);
if (ec != null) {
UnboundConnections.add (sig);
return ec.getChannel();
} else {
return null;
}
}
public void setNotifyReadable (long sig, boolean mode) {
((EventableSocketChannel) Connections.get(sig)).setNotifyReadable(mode);
}
public void setNotifyWritable (long sig, boolean mode) {
((EventableSocketChannel) Connections.get(sig)).setNotifyWritable(mode);
}
public boolean isNotifyReadable (long sig) {
return Connections.get(sig).isNotifyReadable();
}
public boolean isNotifyWritable (long sig) {
return Connections.get(sig).isNotifyWritable();
}
public boolean pauseConnection (long sig) {
return ((EventableSocketChannel) Connections.get(sig)).pause();
}
public boolean resumeConnection (long sig) {
return ((EventableSocketChannel) Connections.get(sig)).resume();
}
public boolean isConnectionPaused (long sig) {
return ((EventableSocketChannel) Connections.get(sig)).isPaused();
}
public long getOutboundDataSize (long sig) {
return Connections.get(sig).getOutboundDataSize();
}
public int getConnectionCount() {
return Connections.size() + Acceptors.size();
}
}

View File

@@ -0,0 +1,40 @@
/**
* $Id$
*
* Author:: Francis Cianfrocca (gmail: blackhedd)
* Homepage:: http://rubyeventmachine.com
* Date:: 15 Jul 2007
*
* See EventMachine and EventMachine::Connection for documentation and
* usage examples.
*
*
*----------------------------------------------------------------------------
*
* Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
* Gmail: blackhedd
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of either: 1) the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version; or 2) Ruby's License.
*
* See the file COPYING for complete licensing information.
*
*---------------------------------------------------------------------------
*
*
*/
package com.rubyeventmachine;
/**
* @author francis
*
*/
public class EmReactorException extends Exception {
static final long serialVersionUID = 0;
public EmReactorException (String msg) {
super (msg);
}
}

View File

@@ -0,0 +1,72 @@
/**
* $Id$
*
* Author:: Francis Cianfrocca (gmail: blackhedd)
* Homepage:: http://rubyeventmachine.com
* Date:: 15 Jul 2007
*
* See EventMachine and EventMachine::Connection for documentation and
* usage examples.
*
*
*----------------------------------------------------------------------------
*
* Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
* Gmail: blackhedd
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of either: 1) the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version; or 2) Ruby's License.
*
* See the file COPYING for complete licensing information.
*
*---------------------------------------------------------------------------
*
*
*/
package com.rubyeventmachine;
import java.nio.ByteBuffer;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
public interface EventableChannel {
public void scheduleOutboundData (ByteBuffer bb);
public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort);
public boolean scheduleClose (boolean afterWriting);
public void startTls();
public long getBinding();
public void readInboundData (ByteBuffer dst) throws IOException;
public void register() throws ClosedChannelException;
/**
* This is called by the reactor after it finishes running.
* The idea is to free network resources.
*/
public void close();
public boolean writeOutboundData() throws IOException;
public long getOutboundDataSize();
public void setCommInactivityTimeout (long seconds);
public Object[] getPeerName();
public Object[] getSockName();
public boolean isWatchOnly();
public boolean isNotifyReadable();
public boolean isNotifyWritable();
}

View File

@@ -0,0 +1,201 @@
/**
* $Id$
*
* Author:: Francis Cianfrocca (gmail: blackhedd)
* Homepage:: http://rubyeventmachine.com
* Date:: 15 Jul 2007
*
* See EventMachine and EventMachine::Connection for documentation and
* usage examples.
*
*
*----------------------------------------------------------------------------
*
* Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
* Gmail: blackhedd
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of either: 1) the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version; or 2) Ruby's License.
*
* See the file COPYING for complete licensing information.
*
*---------------------------------------------------------------------------
*
*
*/
package com.rubyeventmachine;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.DatagramChannel;
import java.util.LinkedList;
import java.io.*;
import java.net.*;
public class EventableDatagramChannel implements EventableChannel {
class Packet {
public ByteBuffer bb;
public SocketAddress recipient;
public Packet (ByteBuffer _bb, SocketAddress _recipient) {
bb = _bb;
recipient = _recipient;
}
}
DatagramChannel channel;
long binding;
Selector selector;
boolean bCloseScheduled;
LinkedList<Packet> outboundQ;
long outboundS;
SocketAddress returnAddress;
public EventableDatagramChannel (DatagramChannel dc, long _binding, Selector sel) throws ClosedChannelException {
channel = dc;
binding = _binding;
selector = sel;
bCloseScheduled = false;
outboundQ = new LinkedList<Packet>();
outboundS = 0;
dc.register(selector, SelectionKey.OP_READ, this);
}
public void scheduleOutboundData (ByteBuffer bb) {
try {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
outboundQ.addLast(new Packet(bb, returnAddress));
outboundS += bb.remaining();
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
}
} catch (ClosedChannelException e) {
throw new RuntimeException ("no outbound data");
}
}
public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) {
try {
if ((!bCloseScheduled) && (bb.remaining() > 0)) {
outboundQ.addLast(new Packet (bb, new InetSocketAddress (recipAddress, recipPort)));
outboundS += bb.remaining();
channel.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, this);
}
} catch (ClosedChannelException e) {
throw new RuntimeException ("no outbound data");
}
}
public boolean scheduleClose (boolean afterWriting) {
System.out.println ("NOT SCHEDULING CLOSE ON DATAGRAM");
return false;
}
public void startTls() {
throw new RuntimeException ("TLS is unimplemented on this Channel");
}
public long getBinding() {
return binding;
}
public void register() throws ClosedChannelException {
// TODO
}
/**
* Terminate with extreme prejudice. Don't assume there will be another pass through
* the reactor core.
*/
public void close() {
try {
channel.close();
} catch (IOException e) {
}
}
public void readInboundData (ByteBuffer dst) {
returnAddress = null;
try {
// If there is no datagram available (we're nonblocking after all),
// then channel.receive returns null.
returnAddress = channel.receive(dst);
} catch (IOException e) {
// probably a no-op. The caller will see the empty (or even partial) buffer
// and presumably do the right thing.
}
}
public boolean writeOutboundData() {
while (!outboundQ.isEmpty()) {
Packet p = outboundQ.getFirst();
int written = 0;
try {
// With a datagram socket, it's ok to send an empty buffer.
written = channel.send(p.bb, p.recipient);
outboundS -= written;
}
catch (IOException e) {
return false;
}
/* Did we consume the whole outbound buffer? If yes, pop it off and
* keep looping. If no, the outbound network buffers are full, so break
* out of here. There's a flaw that affects outbound buffers that are intentionally
* empty. We can tell whether they got sent or not. So we assume they were.
* TODO: As implemented, this ALWAYS discards packets if they were at least
* partially written. This matches the behavior of the C++ EM. My judgment
* is that this is less surprising than fragmenting the data and sending multiple
* packets would be. I could be wrong, so this is subject to change.
*/
if ((written > 0) || (p.bb.remaining() == 0))
outboundQ.removeFirst();
else
break;
}
if (outboundQ.isEmpty()) {
try {
channel.register(selector, SelectionKey.OP_READ, this);
} catch (ClosedChannelException e) {}
}
// ALWAYS drain the outbound queue before triggering a connection close.
// If anyone wants to close immediately, they're responsible for clearing
// the outbound queue.
return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
}
public void setCommInactivityTimeout (long seconds) {
// TODO
System.out.println ("DATAGRAM: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
}
public Object[] getPeerName () {
if (returnAddress != null) {
InetSocketAddress inetAddr = (InetSocketAddress) returnAddress;
return new Object[]{ inetAddr.getPort(), inetAddr.getHostName() };
} else {
return null;
}
}
public Object[] getSockName () {
DatagramSocket socket = channel.socket();
return new Object[]{ socket.getLocalPort(),
socket.getLocalAddress().getHostAddress() };
}
public boolean isWatchOnly() { return false; }
public boolean isNotifyReadable() { return false; }
public boolean isNotifyWritable() { return false; }
public long getOutboundDataSize() { return outboundS; }
}

View File

@@ -0,0 +1,415 @@
/**
* $Id$
*
* Author:: Francis Cianfrocca (gmail: blackhedd)
* Homepage:: http://rubyeventmachine.com
* Date:: 15 Jul 2007
*
* See EventMachine and EventMachine::Connection for documentation and
* usage examples.
*
*
*----------------------------------------------------------------------------
*
* Copyright (C) 2006-07 by Francis Cianfrocca. All Rights Reserved.
* Gmail: blackhedd
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of either: 1) the GNU General Public License
* as published by the Free Software Foundation; either version 2 of the
* License, or (at your option) any later version; or 2) Ruby's License.
*
* See the file COPYING for complete licensing information.
*
*---------------------------------------------------------------------------
*
*
*/
/**
*
*/
package com.rubyeventmachine;
/**
* @author francis
*
*/
import java.nio.channels.*;
import java.nio.*;
import java.util.*;
import java.io.*;
import java.net.Socket;
import javax.net.ssl.*;
import javax.net.ssl.SSLEngineResult.*;
import java.lang.reflect.Field;
import java.security.*;
public class EventableSocketChannel implements EventableChannel {
Selector selector;
SelectionKey channelKey;
SocketChannel channel;
long binding;
LinkedList<ByteBuffer> outboundQ;
long outboundS;
boolean bCloseScheduled;
boolean bConnectPending;
boolean bWatchOnly;
boolean bAttached;
boolean bNotifyReadable;
boolean bNotifyWritable;
boolean bPaused;
SSLEngine sslEngine;
SSLContext sslContext;
public EventableSocketChannel (SocketChannel sc, long _binding, Selector sel) {
channel = sc;
binding = _binding;
selector = sel;
bCloseScheduled = false;
bConnectPending = false;
bWatchOnly = false;
bAttached = false;
bNotifyReadable = false;
bNotifyWritable = false;
outboundQ = new LinkedList<ByteBuffer>();
outboundS = 0;
}
public long getBinding() {
return binding;
}
public SocketChannel getChannel() {
return channel;
}
public void register() throws ClosedChannelException {
if (channelKey == null) {
int events = currentEvents();
channelKey = channel.register(selector, events, this);
}
}
/**
* Terminate with extreme prejudice. Don't assume there will be another pass through
* the reactor core.
*/
public void close() {
if (channelKey != null) {
channelKey.cancel();
channelKey = null;
}
if (bAttached) {
// attached channels are copies, so reset the file descriptor to prevent java from close()ing it
Field f;
FileDescriptor fd;
try {
/* do _NOT_ clobber fdVal here, it will break epoll/kqueue on jdk6!
* channelKey.cancel() above does not occur until the next call to select
* and if fdVal is gone, we will continue to get events for this fd.
*
* instead, remove fdVal in cleanup(), which is processed via DetachedConnections,
* after UnboundConnections but before NewConnections.
*/
f = channel.getClass().getDeclaredField("fd");
f.setAccessible(true);
fd = (FileDescriptor) f.get(channel);
f = fd.getClass().getDeclaredField("fd");
f.setAccessible(true);
f.set(fd, -1);
} catch (java.lang.NoSuchFieldException e) {
e.printStackTrace();
} catch (java.lang.IllegalAccessException e) {
e.printStackTrace();
}
return;
}
try {
channel.close();
} catch (IOException e) {
}
}
public void cleanup() {
if (bAttached) {
Field f;
try {
f = channel.getClass().getDeclaredField("fdVal");
f.setAccessible(true);
f.set(channel, -1);
} catch (java.lang.NoSuchFieldException e) {
e.printStackTrace();
} catch (java.lang.IllegalAccessException e) {
e.printStackTrace();
}
}
channel = null;
}
public void scheduleOutboundData (ByteBuffer bb) {
if (!bCloseScheduled && bb.remaining() > 0) {
if (sslEngine != null) {
try {
ByteBuffer b = ByteBuffer.allocate(32*1024); // TODO, preallocate this buffer.
sslEngine.wrap(bb, b);
b.flip();
outboundQ.addLast(b);
outboundS += b.remaining();
} catch (SSLException e) {
throw new RuntimeException ("ssl error");
}
}
else {
outboundQ.addLast(bb);
outboundS += bb.remaining();
}
updateEvents();
}
}
public void scheduleOutboundDatagram (ByteBuffer bb, String recipAddress, int recipPort) {
throw new RuntimeException ("datagram sends not supported on this channel");
}
/**
* Called by the reactor when we have selected readable.
*/
public void readInboundData (ByteBuffer bb) throws IOException {
if (channel.read(bb) == -1)
throw new IOException ("eof");
}
public long getOutboundDataSize() { return outboundS; }
/**
* Called by the reactor when we have selected writable.
* Return false to indicate an error that should cause the connection to close.
* TODO, VERY IMPORTANT: we're here because we selected writable, but it's always
* possible to become unwritable between the poll and when we get here. The way
* this code is written, we're depending on a nonblocking write NOT TO CONSUME
* the whole outbound buffer in this case, rather than firing an exception.
* We should somehow verify that this is indeed Java's defined behavior.
* @return
*/
public boolean writeOutboundData() throws IOException {
ByteBuffer[] bufs = new ByteBuffer[64];
int i;
long written, toWrite;
while (!outboundQ.isEmpty()) {
i = 0;
toWrite = 0;
written = 0;
while (i < 64 && !outboundQ.isEmpty()) {
bufs[i] = outboundQ.removeFirst();
toWrite += bufs[i].remaining();
i++;
}
if (toWrite > 0)
written = channel.write(bufs, 0, i);
outboundS -= written;
// Did we consume the whole outbound buffer? If yes,
// pop it off and keep looping. If no, the outbound network
// buffers are full, so break out of here.
if (written < toWrite) {
while (i > 0 && bufs[i-1].remaining() > 0) {
outboundQ.addFirst(bufs[i-1]);
i--;
}
break;
}
}
if (outboundQ.isEmpty() && !bCloseScheduled) {
updateEvents();
}
// ALWAYS drain the outbound queue before triggering a connection close.
// If anyone wants to close immediately, they're responsible for clearing
// the outbound queue.
return (bCloseScheduled && outboundQ.isEmpty()) ? false : true;
}
public void setConnectPending() {
bConnectPending = true;
updateEvents();
}
/**
* Called by the reactor when we have selected connectable.
* Return false to indicate an error that should cause the connection to close.
*/
public boolean finishConnecting() throws IOException {
channel.finishConnect();
bConnectPending = false;
updateEvents();
return true;
}
public boolean scheduleClose (boolean afterWriting) {
// TODO: What the hell happens here if bConnectPending is set?
if (!afterWriting) {
outboundQ.clear();
outboundS = 0;
}
if (outboundQ.isEmpty())
return true;
else {
updateEvents();
bCloseScheduled = true;
return false;
}
}
public void startTls() {
if (sslEngine == null) {
try {
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null); // TODO, fill in the parameters.
sslEngine = sslContext.createSSLEngine(); // TODO, should use the parameterized version, to get Kerb stuff and session re-use.
sslEngine.setUseClientMode(false);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this.
} catch (KeyManagementException e) {
throw new RuntimeException ("unable to start TLS"); // TODO, get rid of this.
}
}
System.out.println ("Starting TLS");
}
public ByteBuffer dispatchInboundData (ByteBuffer bb) throws SSLException {
if (sslEngine != null) {
if (true) throw new RuntimeException ("TLS currently unimplemented");
System.setProperty("javax.net.debug", "all");
ByteBuffer w = ByteBuffer.allocate(32*1024); // TODO, WRONG, preallocate this buffer.
SSLEngineResult res = sslEngine.unwrap(bb, w);
if (res.getHandshakeStatus() == HandshakeStatus.NEED_TASK) {
Runnable r;
while ((r = sslEngine.getDelegatedTask()) != null) {
r.run();
}
}
System.out.println (bb);
w.flip();
return w;
}
else
return bb;
}
public void setCommInactivityTimeout (long seconds) {
// TODO
System.out.println ("SOCKET: SET COMM INACTIVITY UNIMPLEMENTED " + seconds);
}
public Object[] getPeerName () {
Socket sock = channel.socket();
return new Object[]{ sock.getPort(), sock.getInetAddress().getHostAddress() };
}
public Object[] getSockName () {
Socket sock = channel.socket();
return new Object[]{ sock.getLocalPort(),
sock.getLocalAddress().getHostAddress() };
}
public void setWatchOnly() {
bWatchOnly = true;
updateEvents();
}
public boolean isWatchOnly() { return bWatchOnly; }
public void setAttached() {
bAttached = true;
}
public boolean isAttached() { return bAttached; }
public void setNotifyReadable (boolean mode) {
bNotifyReadable = mode;
updateEvents();
}
public boolean isNotifyReadable() { return bNotifyReadable; }
public void setNotifyWritable (boolean mode) {
bNotifyWritable = mode;
updateEvents();
}
public boolean isNotifyWritable() { return bNotifyWritable; }
public boolean pause() {
if (bWatchOnly) {
throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead");
}
boolean old = bPaused;
bPaused = true;
updateEvents();
return !old;
}
public boolean resume() {
if (bWatchOnly) {
throw new RuntimeException ("cannot pause/resume 'watch only' connections, set notify readable/writable instead");
}
boolean old = bPaused;
bPaused = false;
updateEvents();
return old;
}
public boolean isPaused() {
return bPaused;
}
private void updateEvents() {
if (channelKey == null)
return;
int events = currentEvents();
if (channelKey.interestOps() != events) {
channelKey.interestOps(events);
}
}
private int currentEvents() {
int events = 0;
if (bWatchOnly)
{
if (bNotifyReadable)
events |= SelectionKey.OP_READ;
if (bNotifyWritable)
events |= SelectionKey.OP_WRITE;
}
else if (!bPaused)
{
if (bConnectPending)
events |= SelectionKey.OP_CONNECT;
else {
events |= SelectionKey.OP_READ;
if (!outboundQ.isEmpty())
events |= SelectionKey.OP_WRITE;
}
}
return events;
}
}