// Copyright 2000-2005 the Contributors, as shown in the revision logs. // Licensed under the Apache Public Source License 2.0 ("the License"). // You may not use this file except in compliance with the License. package org.ibex.mail; import org.ibex.mail.target.*; import org.ibex.util.*; import org.ibex.net.*; import org.ibex.io.*; import java.net.*; import java.io.*; import java.util.*; import java.text.*; // FIXME: inbound throttling/ratelimiting // FIXME: probably need some throttling on outbound mail // FEATURE: rate-limiting // "Address enumeration detection" -- notice when it looks like somebody // is trying a raft of addresses. // RFC2554: SMTP Service Extension for Authentication // - did not implement section 5, though // RFC4616: SASL PLAIN // Note: we can't actually use status codes for feedback if we accept // multiple destination addresses... a failure on one and success on // the other... // FIXME: logging: current logging sucks // FIXME: loop prevention // FEATURE: infer date if not present /* // FEATURE: RFC2822, section 4.5.1: special "postmaster" address Any system that includes an SMTP server supporting mail relaying or delivery MUST support the reserved mailbox "postmaster" as a case- insensitive local name. This postmaster address is not strictly necessary if the server always returns 554 on connection opening (as described in section 3.1). The requirement to accept mail for postmaster implies that RCPT commands which specify a mailbox for postmaster at any of the domains for which the SMTP server provides mail service, as well as the special case of "RCPT TO:" (with no domain specification), MUST be supported. */ // FEATURE: RFC2822, section 5, multiple MX records, preferences, ordering // FEATURE: RFC2822, end of 4.1.2: backslashes in headers // FEATURE: batching retrys by host (retry multiple in one session, keep retry intervals on a host basis not a message basis) // FEATURE: first two attempts should be close together (rec'd by 2821) /* // FEATURE: RFC2822, section 4.5.4.1: retry strategies // per-command, per-attempt timeouts Experience suggests that failures are typically transient (the target system or its connection has crashed), favoring a policy of two connection attempts in the first hour the message is in the queue, and then backing off to one every two or three hours. The SMTP client can shorten the queuing delay in cooperation with the SMTP server. For example, if mail is received from a particular address, it is likely that mail queued for that host can now be sent. Application of this principle may, in many cases, eliminate the requirement for an explicit "send queues now" function such as ETRN [9]. An SMTP client may have a large queue of messages for each unavailable destination host. If all of these messages were retried in every retry cycle, there would be excessive Internet overhead and the sending system would be blocked for a long period. Note that an SMTP client can generally determine that a delivery attempt has failed only after a timeout of several minutes and even a one-minute timeout per connection will result in a very large delay if retries are repeated for dozens, or even hundreds, of queued messages to the same host. When a mail message is to be delivered to multiple recipients, and the SMTP server to which a copy of the message is to be sent is the same for multiple recipients, then only one copy of the message SHOULD be transmitted. That is, the SMTP client SHOULD use the command sequence: MAIL, RCPT, RCPT,... RCPT, DATA instead of the sequence: MAIL, RCPT, DATA, ..., MAIL, RCPT, DATA. However, if there are very many addresses, a limit on the number of RCPT commands per MAIL command MAY be imposed. Implementation of this efficiency feature is strongly encouraged. */ public class SMTP { public static final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, d MMM yyyy HH:mm:ss Z"); private static final SqliteMailbox allmail = (SqliteMailbox)FileBasedMailbox .getFileBasedMailbox("/afs/megacz.com/mail/user/megacz/allmail.sqlite", false); public static final int NUM_OUTGOING_THREADS = 5; public static final int GRAYLIST_MINWAIT = 1000 * 60 * 60; // one hour public static final int GRAYLIST_MAXWAIT = 1000 * 60 * 60 * 24 * 5; // five days public static final int MAX_MESSAGE_SIZE = Integer.parseInt(System.getProperty("org.ibex.mail.smtp.maxMessageSize", "-1")); public static final int RETRY_TIME = 1000 * 60 * 30; // 30min recommended by RFC public static final int GIVE_UP_TIME = 1000 * 60 * 24 * 5; // FIXME: actually use this public static final Graylist graylist; public static final Whitelist whitelist; static { try { graylist = new Graylist(Mailbox.STORAGE_ROOT+"/db/graylist.sqlite"); whitelist = new Whitelist(Mailbox.STORAGE_ROOT+"/db/whitelist.sqlite"); } catch (Exception e) { throw new RuntimeException(e); } } private static final Mailbox spool = FileBasedMailbox.getFileBasedMailbox(Mailbox.STORAGE_ROOT,false).slash("spool",true).slash("smtp",true).getMailbox(); public static void enqueue(Message m) throws IOException { if (!m.envelopeTo.isLocal()) Outgoing.enqueue(m); else { try { allmail.accept(m); } catch (Exception e) { // FIXME incredibly gross hack if (e.toString().indexOf("attempt to insert two messages with identical messageid")==-1) Log.error(SMTP.class, e); } Target.root.accept(m); } } public static class SMTPException extends MailException { int code; String message; public SMTPException(String s) { try { code = Integer.parseInt(s.substring(0, s.indexOf(' '))); message = s.substring(s.indexOf(' ')+1); } catch (NumberFormatException nfe) { code = -1; message = s; } } public String toString() { return "SMTP " + code + ": " + message; } public String getMessage() { return toString(); } } // Server ////////////////////////////////////////////////////////////////////////////// public static class Server { public void handleRequest(Connection conn) throws IOException { conn.setTimeout(5 * 60 * 1000); conn.setNewline("\r\n"); conn.println("220 " + conn.vhost + " ESMTP " + this.getClass().getName()); Address from = null; Vector to = new Vector(); boolean ehlo = false; String remotehost = null; String authenticatedAs = null; int failedRcptCount = 0; for(String command = conn.readln(); ; command = conn.readln()) try { if (command == null) return; Log.warn("**"+conn.getRemoteAddress()+"**", command); String c = command.toUpperCase(); if (c.startsWith("HELO")) { remotehost = c.substring(5).trim(); conn.println("250 HELO " + conn.vhost); from = null; to = new Vector(); } else if (c.startsWith("EHLO")) { remotehost = c.substring(5).trim(); conn.println("250-"+conn.vhost); //conn.println("250-AUTH"); conn.println("250-AUTH PLAIN"); //conn.println("250-STARTTLS"); conn.println("250 HELP"); ehlo = true; from = null; to = new Vector(); } else if (c.startsWith("RSET")) { conn.println("250 reset ok"); from = null; to = new Vector(); } else if (c.startsWith("HELP")) { conn.println("214 you are beyond help. see a trained professional."); } else if (c.startsWith("VRFY")) { conn.println("502 VRFY not supported"); } else if (c.startsWith("EXPN")) { conn.println("502 EXPN not supported"); } else if (c.startsWith("NOOP")) { conn.println("250 OK"); } else if (c.startsWith("QUIT")) { conn.println("221 " + conn.vhost + " closing connection"); return; } else if (c.startsWith("STARTTLS")) { conn.println("220 starting TLS..."); conn.flush(); conn = conn.negotiateSSL(true); from = null; to = new Vector(); } else if (c.startsWith("AUTH")) { if (authenticatedAs != null) { conn.println("503 you are already authenticated; you must reconnect to reauth"); } else { String mechanism = command.substring(4).trim(); String rest = ""; if (mechanism.indexOf(' ')!=-1) { rest = mechanism.substring(mechanism.indexOf(' ')+1).trim(); mechanism = mechanism.substring(0, mechanism.indexOf(' ')); } if (mechanism.equals("PLAIN")) { // 538 Encryption required for requested authentication mechanism? byte[] bytes = Encode.fromBase64(rest); String authenticateUser = null; String authorizeUser = null; String password = null; int start = 0; for(int i=0; i<=bytes.length; i++) { if (i") ? null : new Address(command); conn.println("250 " + from + " is syntactically correct"); // Don't perform SAV; discouraged here // http://blog.fastmail.fm/2007/12/05/sending-email-servers-best-practice/ } else if (c.startsWith("RCPT TO:")) { // some clients are broken and put RCPT first; we will tolerate this command = command.substring(8).trim(); if(command.indexOf(' ') != -1) command = command.substring(0, command.indexOf(' ')); Address addr = new Address(command); if (conn.getRemoteAddress().isLoopbackAddress() || (from!=null&&from.toString().indexOf("johnw")!=-1)) { conn.println("250 you are connected locally, so I will let you send"); to.addElement(addr); if (!whitelist.isWhitelisted(addr)) whitelist.addWhitelist(addr); } else if (authenticatedAs!=null) { conn.println("250 you are authenticated as "+authenticatedAs+", so I will let you send"); to.addElement(addr); if (!whitelist.isWhitelisted(addr)) whitelist.addWhitelist(addr); } else if (addr.isLocal()) { if (to.size() > 3) { conn.println("536 sorry, limit on 3 RCPT TO's per DATA"); } else { // FEATURE: should check the address further and give 550 if undeliverable conn.println("250 " + addr + " is on this machine; I will deliver it"); to.addElement(addr); } } else { conn.println("535 sorry, " + addr + " is not on this machine, you are not connected from localhost, and I will not relay without SMTP AUTH"); Log.warn("","535 sorry, " + addr + " is not on this machine, you are not connected from localhost, and I will not relay without SMTP AUTH"); failedRcptCount++; if (failedRcptCount > 3) { conn.close(); return; } } conn.flush(); } else if (c.startsWith("DATA")) { //if (from == null) { conn.println("503 MAIL FROM command must precede DATA"); continue; } if (to == null || to.size()==0) { conn.println("503 RCPT TO command must precede DATA"); continue; } if (!graylist.isWhitelisted(conn.getRemoteAddress()) && !conn.getRemoteAddress().isLoopbackAddress() && authenticatedAs==null) { long when = graylist.getGrayListTimestamp(conn.getRemoteAddress(), from+"", to+""); if (when == 0 || System.currentTimeMillis() - when > GRAYLIST_MAXWAIT) { graylist.setGrayListTimestamp(conn.getRemoteAddress(), from+"", to+"", System.currentTimeMillis()); conn.println("451 you are graylisted; please try back in one hour to be whitelisted"); Log.warn(conn.getRemoteAddress().toString(), "451 you are graylisted; please try back in one hour to be whitelisted"); conn.flush(); continue; } else if (System.currentTimeMillis() - when > GRAYLIST_MINWAIT) { graylist.addWhitelist(conn.getRemoteAddress()); conn.println("354 (you have been whitelisted) Enter message, ending with \".\" on a line by itself"); Log.warn(conn.getRemoteAddress().toString(), "has been whitelisted"); } else { conn.println("451 you are still graylisted (since "+new java.util.Date(when)+")"); conn.flush(); Log.warn(conn.getRemoteAddress().toString(), "451 you are still graylisted (since "+new java.util.Date(when)+")"); continue; } } else { conn.println("354 Enter message, ending with \".\" on a line by itself"); } conn.flush(); try { // FIXME: deal with messages larger than memory here? StringBuffer buf = new StringBuffer(); buf.append("Received: from " + conn.getRemoteHostname() + " (" + remotehost + ")\r\n"); buf.append(" by "+conn.vhost+" ("+SMTP.class.getName()+") with "+(ehlo?"ESMTP":"SMTP") + "\r\n"); buf.append(" for "); // FIXME: this is leaking BCC addrs // for(int i=0; i MAX_MESSAGE_SIZE && (from+"").indexOf("paperless")==-1) { Log.error("**"+conn.getRemoteAddress()+"**", "sorry, this mail server only accepts messages of less than " + ByteSize.toString(MAX_MESSAGE_SIZE)); throw new MailException.Malformed("sorry, this mail server only accepts messages of less than " + ByteSize.toString(MAX_MESSAGE_SIZE)); } } String message = buf.toString(); Message m = null; for(int i=0; i threads = new HashSet(); private static final HashMap deadHosts = new HashMap(); private static Map nextTry = Collections.synchronizedMap(new HashMap()); private Mailbox.Iterator it; private final String name; public Outgoing(String name) { this.name = name; synchronized(Outgoing.class) { threads.add(this); } } public String toString() { return name; } public static void enqueue(Message m) throws IOException { if (m == null) { Log.warn(Outgoing.class, "attempted to enqueue(null)"); return; } String traces = m.headers.get("Received"); if (traces!=null) { int lines = 0; for(int i=0; i 100) { // required by rfc Log.warn(SMTP.Outgoing.class, "Message with " + lines + " trace hops; dropping\n" + m.summary()); return; } } synchronized(Outgoing.class) { spool.insert(m, Mailbox.Flag.defaultFlags); Outgoing.class.notifyAll(); } } public static boolean attempt(Message m) throws IOException { return attempt(m, false); } public static boolean attempt(Message m, boolean noBounces) throws IOException { if (m.envelopeTo == null) { Log.warn(SMTP.Outgoing.class, "aieeee, null envelopeTo: " + m.summary()); return false; } InetAddress[] mx = DNSUtil.getMailExchangerIPs(m.envelopeTo.host); if (mx.length == 0) { if (!noBounces) { enqueue(m.bounce("could not resolve " + m.envelopeTo.host)); return true; } else { Log.warn(SMTP.Outgoing.class, "could not resolve " + m.envelopeTo.host); return false; } } if (new Date().getTime() - m.arrival.getTime() > 1000 * 60 * 60 * 24 * 5) { if (!noBounces) { enqueue(m.bounce("could not send for 5 days")); return true; } else { Log.warn(SMTP.Outgoing.class, "could not send for 5 days: " + m.summary()); return false; } } for(int i=0; i 3 && s.charAt(3) == '-') s = conn.readln(); //if (s.startsWith("4")||s.startsWith("5")) throw new SMTPException(s); if (!s.startsWith("2")&&!s.startsWith("3")) throw new SMTPException(s); } private static boolean attempt(final Message m, final InetAddress mx) { boolean accepted = false; Connection conn = null; try { conn = new Connection(new Socket(mx, 25), InetAddress.getLocalHost().getHostName()); InetAddress localAddress = conn.getSocket().getLocalAddress(); String reverse = DNSUtil.reverseLookup(localAddress); Log.info(SMTP.Outgoing.class, "outbound connection to " + mx + " uses " + localAddress + " [reverse: " + reverse + "]"); InetAddress relookup = InetAddress.getByName(reverse); if (!relookup.equals(localAddress)) Log.error(SMTP.Outgoing.class, "Warning: local machine fails forward-confirmed-reverse; " + reverse + " resolves to " + localAddress); conn.setNewline("\r\n"); conn.setTimeout(60 * 1000); check(conn.readln(), conn); // banner try { conn.println("EHLO " + reverse); check(conn.readln(), conn); } catch (SMTPException smtpe) { conn.println("HELO " + reverse); check(conn.readln(), conn); } String envelopeFrom = m.envelopeFrom==null ? "" : m.envelopeFrom.toString(); conn.println("MAIL FROM:<" + envelopeFrom +">"); check(conn.readln(), conn); conn.println("RCPT TO:<" + m.envelopeTo.toString()+">"); check(conn.readln(), conn); conn.println("DATA"); check(conn.readln(), conn); Headers head = new Headers(m.headers, new String[] { "return-path", null, "bcc", null }); Stream stream = head.getStream(); for(String s = stream.readln(); s!=null; s=stream.readln()) { if (s.startsWith(".")) conn.print("."); conn.println(s); } conn.println(""); stream = m.getBody().getStream(); for(String s = stream.readln(); s!=null; s=stream.readln()) { if (s.startsWith(".")) conn.print("."); conn.println(s); } conn.println("."); String resp = conn.readln(); if (resp == null) throw new SMTPException("server " + mx + " closed connection without accepting message"); check(resp, conn); Log.warn(SMTP.Outgoing.class, "success: " + mx + " accepted " + m.summary() + "\n["+resp+"]"); accepted = true; conn.close(); } catch (SMTPException e) { if (accepted) return true; Log.warn(SMTP.Outgoing.class, " unable to send; error=" + e); Log.warn(SMTP.Outgoing.class, " message: " + m.summary()); Log.warn(SMTP.Outgoing.class, e); /* // FIXME: we should not be bouncing here! if (e.code >= 500 && e.code <= 599) { try { attempt(m.bounce("unable to deliver: " + e), true); } catch (Exception ex) { Log.error(SMTP.Outgoing.class, "exception while trying to deliver bounce; giving up completely"); Log.error(SMTP.Outgoing.class, ex); } return true; } */ return false; } catch (Exception e) { if (accepted) return true; Log.warn(SMTP.Outgoing.class, " unable to send; error=" + e); Log.warn(SMTP.Outgoing.class, " message: " + m.summary()); Log.warn(SMTP.Outgoing.class, e); //if (conn != null) Log.warn(SMTP.Outgoing.class, conn.dumpLog()); return false; } finally { if (conn != null) conn.close(); } return accepted; } public void wake() { int count = spool.count(Query.all()); Log.info(SMTP.Outgoing.class, "outgoing thread "+name+" woke up; " + count + " messages to send"); try { while(true) { boolean good = false; synchronized(Outgoing.class) { it = spool.iterator(); OUTER: for(; it.next(); ) { for(Outgoing o : threads) if (o!=this && o.it != null && o.it.uid()==it.uid()) continue OUTER; good = true; break; } } if (!good) break; try { String messageid = it.cur().messageid; if (nextTry.get(messageid) == null || System.currentTimeMillis() > nextTry.get(messageid)) { boolean ok = attempt(it.cur()); if (ok) it.delete(); else nextTry.put(messageid, System.currentTimeMillis() + RETRY_TIME); } } catch (Exception e) { Log.error(SMTP.Outgoing.class, e); } Log.info(this, "sleeping for 3s..."); Thread.sleep(3000); } } catch (Exception e) { //if (e instanceof InterruptedException) throw e; Log.error(SMTP.Outgoing.class, e); } Log.info(SMTP.Outgoing.class, "outgoing thread #"+name+" going back to sleep"); it = null; } public void run() { try { while(true) { Log.setThreadAnnotation("[outgoing #"+name+"] "); wake(); Thread.sleep(1000); synchronized(Outgoing.class) { Outgoing.class.wait(5 * 60 * 1000); } } } catch (InterruptedException e) { Log.warn(this, e); } } } }