JAVA通信编程(四)——UDP通讯

JAVA通信编程(四)——UDP通讯

经过TCP和串口通讯编程的了解,相信大家应该掌握CommBuff的套路了,这里首先展示的是通过UDP编程的方式实现CommBuff接口,之后通过简单工厂模式的应用说明如何屏蔽底层通讯差异。

UdpImpl类如下:

package com.zzh.comm;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.Map;

import org.apache.log4j.Logger;

public class UdpImpl implements CommBuff
{
private Logger logger = Logger.getLogger(Object.class.getName());

private int local_port;
private int dest_port;
private String ip;
private int time_out;

DatagramSocket client = null;

private String fileName = "/udp.properties";
public UdpImpl()
{
Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
try
{
local_port = Integer.parseInt(map.get("udp_local_port"));
dest_port = Integer.parseInt(map.get("udp_dest_port"));
time_out = Integer.parseInt(map.get("udp_timeout"));
ip = map.get("udp_dest_ip");
}
catch (Exception e)
{
logger.error(e.getMessage());
}
}

@Override
public byte[] readBuff()
{
if(client == null)
{
throw new RuntimeException("clinet is null!");
}
byte[] recvBuf = new byte[1024];
DatagramPacket recvPacket = new DatagramPacket(recvBuf , recvBuf.length);
try
{
client.receive(recvPacket);
}
catch (IOException e)
{
logger.info(e.getMessage());
return new byte[0];
}
byte[] ans = new byte[recvPacket.getLength()];
System.arraycopy(recvPacket.getData(), 0, ans, 0, recvPacket.getLength());
logger.info("网口接收:"+CommUtil.bytesToHex(ans));
return ans;
}

@Override
public void writeBuff(byte[] message)
{
if(client == null)
{
throw new RuntimeException("clinet is null!");
}

try
{
InetAddress addr = InetAddress.getByName(ip);
DatagramPacket sendPacket = new DatagramPacket(message,message.length,addr,dest_port);
client.send(sendPacket);
logger.info("发送成功: "+CommUtil.bytesToHex(message));
}
catch (UnknownHostException e)
{
logger.error(e.getMessage());
}
catch (IOException e)
{
logger.error(e.getMessage());
}

}

@Override
public void open() {
try
{
client = new DatagramSocket(local_port);
client.setSoTimeout(time_out);
if(client != null)
{
logger.info("client open succeed!");
}
}
catch (SocketException e)
{
logger.error(e.getMessage());
}
}

@Override
public void close()
{
if(client != null)
{
client.close();
}
}

@Override
public Object getInfo()
{
return null;
}

}

UdpImpl实现了CommBuff接口的各个方法。UDP Socket采用的数据包的方式进行通讯的,这个可以与TCP的方式区分开。

下面通过一个简单工厂模式,可以实现底层通讯的便利性。

package com.zzh.comm;

public class CommFactory
{
public CommBuff getCommBuff(String properties) throws Exception
{
if(properties.equals("comm_serial"))
{
return new SerialImpl();
}
else if(properties.equals("comm_tcpServer"))
{
return new TcpServerImpl();
}
else if(properties.equals("comm_tcpClient"))
{
return new TcpClientImpl();
}
else if(properties.equals("comm_udp"))
{
return new UdpImpl();
}
else
{
throw new Exception("Communication para error: no found avaliable communication Object instance.");
}
}
}

上面的getCommBuff方法通过参数properties可以初始化不同的通讯接口实现类,这样上次应用只需调用Commbuff接口的方法,而无需与底层通讯的细节相融合,极大的降低了程序间的耦合性。

本篇就简单的阐述到这里。但是下面会附加一个程序,这个程序通过调用CommFactory的方法生成底层通讯的实例,程序的主要内容是电力行业的某个通讯规约(Modbus)的实现,如果非电力行业的通讯,可以不必了解程序中的细节,可以大概看一下怎么使用.

package com.zzh.protocol;

import java.util.Calendar;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.zzh.comm.CommBuff;
import com.zzh.comm.CommFactory;
import com.zzh.comm.CommUtil;
import com.zzh.comm.ReadProperties;
import com.zzh.dao.ModbusDao;
import com.zzh.dao.ModbusDaoImpl;
import com.zzh.dao.pojo.ModbusPojo;

public class Modbus {
private CommBuff comm;
private int comm_timeout;
private byte devAddr;

private static int RECV_SIZE = 35;
private static int RECV_INNER_SIZE = 30;
private static int MINUTE=60000;
private volatile boolean refreshFlag = false;

private ModbusPojo modbusPojo;

private ConcurrentLinkedDeque<Byte> deque = new ConcurrentLinkedDeque<Byte>();
private String fileName = "/modbus.properties";

public Modbus()
{
Map<String,String> map = new ReadProperties().getPropertiesMap(fileName);
String comm_way = map.get("modbus_comm_way");
String comm_timeouts = map.get("comm_timeout");
comm_timeout = Integer.parseInt(comm_timeouts);
String devAddrs = map.get("devAddr");
devAddr = Byte.parseByte(devAddrs);
if(comm_way!=null)
{
modbusPojo = new ModbusPojo();
try
{
comm = new CommFactory().getCommBuff(comm_way);
}
catch (Exception e)
{
e.printStackTrace();
}
comm.open();

ExecutorService pool = Executors.newFixedThreadPool(2);
Thread thread1 = new Thread(new readThread());
thread1.setDaemon(true);
Thread thread2 = new Thread(new dbThread());
thread2.setDaemon(true);
pool.execute(thread1);
pool.execute(thread2);
}
else
{
throw new RuntimeException("没有配置好合适的串口参数");
}
}

private class readThread implements Runnable
{
@Override
public void run()
{
while(true)
{
byte[] recvBuff = comm.readBuff();
if(recvBuff.length>0)
{
for(int i=0;i<recvBuff.length;i++)
{
deque.add(recvBuff[i]);
}
}
try
{
TimeUnit.MILLISECONDS.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
}

private class dbThread implements Runnable
{
@Override
public void run()
{
while(true)
{
if(refreshFlag == true)
{
Calendar now = Calendar.getInstance();
if(now.get(Calendar.MINUTE)%5==0)
// if(true)
{
synchronized (modbusPojo)
{
filterModbusPojo();
modbusPojo.setNow(TimeUtil.getDateOfMM(now));
// modbusPojo.setNow(new java.sql.Timestamp(new Date().getTime()));
ModbusDao md = new ModbusDaoImpl();
md.addModbus(modbusPojo);
}
}
}
try
{
TimeUnit.MILLISECONDS.sleep(MINUTE);
// TimeUnit.MILLISECONDS.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}

}

public void filterModbusPojo()
{
modbusPojo.setQua(0);
if(modbusPojo.getEnvTemperature()>ModbusUtil.TEMPERATURE_UP)
{
modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_UP);
System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
modbusPojo.setQua(1);
}
if(modbusPojo.getEnvTemperature()<ModbusUtil.TEMPERATURE_LOW)
{
modbusPojo.setEnvTemperature(ModbusUtil.TEMPERATURE_LOW);
System.out.println("getEnvTemperature = "+modbusPojo.getEnvTemperature());
modbusPojo.setQua(1);
}
if(modbusPojo.getTemperature()>ModbusUtil.TEMPERATURE_UP)
{
modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_UP);
System.out.println("getTemperature = "+modbusPojo.getTemperature());
modbusPojo.setQua(1);
}
if(modbusPojo.getTemperature()<ModbusUtil.TEMPERATURE_LOW)
{
modbusPojo.setTemperature(ModbusUtil.TEMPERATURE_LOW);
System.out.println("getTemperature = "+modbusPojo.getTemperature());
modbusPojo.setQua(1);
}
if(modbusPojo.getHumidity()>ModbusUtil.HUMIDITY_UP)
{
modbusPojo.setHumidity(ModbusUtil.HUMIDITY_UP);
System.out.println("getHumidity = "+modbusPojo.getHumidity());
modbusPojo.setQua(1);
}
if(modbusPojo.getHumidity()<ModbusUtil.HUMIDITY_LOW)
{
modbusPojo.setHumidity(ModbusUtil.HUMIDITY_LOW);
System.out.println("getHumidity = "+modbusPojo.getHumidity());
modbusPojo.setQua(1);
}
if(modbusPojo.getPressure()>ModbusUtil.PRESSURE_UP)
{
modbusPojo.setPressure(ModbusUtil.PRESSURE_UP);
System.out.println("getPressure = "+modbusPojo.getPressure());
modbusPojo.setQua(1);
}
if(modbusPojo.getPressure()<ModbusUtil.PRESSURE_LOW)
{
modbusPojo.setPressure(ModbusUtil.PRESSURE_LOW);
System.out.println("getPressure = "+modbusPojo.getPressure());
modbusPojo.setQua(1);
}
if(modbusPojo.getIrradiance()>ModbusUtil.IRRADIANCE_UP)
{
modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_UP);
System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getIrradiance()<ModbusUtil.IRRADIANCE_LOW)
{
modbusPojo.setIrradiance(ModbusUtil.IRRADIANCE_LOW);
System.out.println("getIrradiance = "+modbusPojo.getIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getScaIrradiance()>ModbusUtil.IRRADIANCE_UP)
{
modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_UP);
System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getScaIrradiance()<ModbusUtil.IRRADIANCE_LOW)
{
modbusPojo.setScaIrradiance(ModbusUtil.IRRADIANCE_LOW);
System.out.println("getScaIrradiance = "+modbusPojo.getScaIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getDirIrradiance()>ModbusUtil.IRRADIANCE_UP)
{
modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_UP);
System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getDirIrradiance()<ModbusUtil.IRRADIANCE_LOW)
{
modbusPojo.setDirIrradiance(ModbusUtil.IRRADIANCE_LOW);
System.out.println("getDirIrradiance = "+modbusPojo.getDirIrradiance());
modbusPojo.setQua(1);
}
if(modbusPojo.getWindSpeed()>ModbusUtil.UAVG_UP)
{
modbusPojo.setWindSpeed(ModbusUtil.UAVG_UP);
System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
modbusPojo.setQua(1);
}
if(modbusPojo.getWindSpeed()<ModbusUtil.UAVG_LOW)
{
modbusPojo.setWindSpeed(ModbusUtil.UAVG_LOW);
System.out.println("getWindSpeed = "+modbusPojo.getWindSpeed());
modbusPojo.setQua(1);
}
if(modbusPojo.getWindDir()>ModbusUtil.VAVG_UP)
{
modbusPojo.setWindDir(ModbusUtil.VAVG_UP);
System.out.println("getWindDir = "+modbusPojo.getWindDir());
modbusPojo.setQua(1);
}
if(modbusPojo.getWindDir()<ModbusUtil.VAVG_LOW)
{
modbusPojo.setWindDir(ModbusUtil.VAVG_LOW);
System.out.println("getWindDir = "+modbusPojo.getWindDir());
modbusPojo.setQua(1);
}
}

public void process()
{
try
{
TimeUnit.MILLISECONDS.sleep(comm_timeout);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
recvProcess();
sendProcess();
}

public void recvProcess()
{
refreshFlag = false;
byte[] recvBuff = new byte[RECV_INNER_SIZE];
while(deque.size()>=RECV_SIZE)
{
Byte first = deque.pollFirst();
if(first == devAddr)
{
Byte second = deque.pollFirst();
if(second == 0x03)
{
Byte third = deque.pollFirst();
if(third == RECV_INNER_SIZE)
{
for(int i=0;i<RECV_INNER_SIZE;i++)
{
recvBuff[i] = deque.pollFirst();
}
deque.pollFirst();
deque.pollFirst();
dealRecvBuff(recvBuff);
}
}
}
}
}

public void dealRecvBuff(byte[] recvBuff)
{
System.out.println(CommUtil.bytesToHex(recvBuff));
refreshFlag = true;
getModbusPojo(recvBuff);
// modbusPojo.print();
}

public void getModbusPojo(byte[] recvBuff)
{
int temp;
synchronized (modbusPojo)
{
for(int i=0;i<recvBuff.length;)
{
switch(i)
{
case 0:
temp = ModbusUtil.getSignedAns(recvBuff, 0, 1);
double envTemperature = temp*0.1;
modbusPojo.setEnvTemperature(envTemperature);
break;
case 2:
temp = ModbusUtil.getSignedAns(recvBuff, 2, 3);
double temperature = temp*0.1;
modbusPojo.setTemperature(temperature);
break;
case 4:
temp = ModbusUtil.getUnsignedAns(recvBuff, 4, 5);
double humidity = temp*0.1;
modbusPojo.setHumidity(humidity);
break;
case 6:
temp = ModbusUtil.getUnsignedAns(recvBuff, 6, 7);
double pressure = temp*0.1;
modbusPojo.setPressure(pressure);
break;
case 8:
temp = ModbusUtil.getUnsignedAns(recvBuff, 8, 9);
modbusPojo.setIrradiance(temp);
break;
case 10:
temp = ModbusUtil.getUnsignedAns(recvBuff, 10, 11);
modbusPojo.setScaIrradiance(temp);
break;
case 12:
temp = ModbusUtil.getUnsignedAns(recvBuff, 12, 13);
modbusPojo.setDirIrradiance(temp);
break;
case 14:
temp = ModbusUtil.getUnsignedAns(recvBuff, 14, 15);
modbusPojo.setWindDir(temp);
break;
case 16:
temp = ModbusUtil.getUnsignedAns(recvBuff, 16, 17);
double windSpeed = temp*0.1;
modbusPojo.setWindSpeed(windSpeed);
break;
case 18:
temp = ModbusUtil.getUnsignedAns(recvBuff, 18, 19);
double windSpeedTwo = temp*0.1;
modbusPojo.setWindSpeedTwo(windSpeedTwo);
break;
case 20:
temp = ModbusUtil.getUnsignedAns(recvBuff, 20, 21);
double windSpeedTen = temp*0.1;
modbusPojo.setWindSpeedTen(windSpeedTen);
break;
case 22:
temp = ModbusUtil.getUnsignedAns(recvBuff, 22, 23);
modbusPojo.setDailyExposure(temp);
break;
case 24:
temp = ModbusUtil.getUnsignedAns(recvBuff, 24, 25);
double totalExposure = temp*0.001;
modbusPojo.setTotalExposure(totalExposure);
break;
case 26:
temp = ModbusUtil.getUnsignedAns(recvBuff, 26, 27);
double scaExposure = temp*0.001;
modbusPojo.setScaExposure(scaExposure);
break;
case 28:
temp = ModbusUtil.getUnsignedAns(recvBuff, 28, 29);
double dirExposure = temp*0.001;
modbusPojo.setDirExposure(dirExposure);
break;
}
i=i+2;
}
}
}

public void sendProcess()
{
byte[] message = new byte[8];
int sendLen = 0;
message[sendLen++] = devAddr;
message[sendLen++] = 0x03;
message[sendLen++] = 0x00;
message[sendLen++] = 0x00;
message[sendLen++] = 0x00;
message[sendLen++] = 0x0F;
byte[] crc = CommUtil.CRC16(message,6);
message[sendLen++] = crc[0];
message[sendLen++] = crc[1];
comm.writeBuff(message);
}

}

欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×