快速,持续,稳定,傻瓜式
支持Mysql,Sqlserver数据同步

【oracle连接kafka】

请联系QQ:1793040 索取软件

1、连接kafka

package cn.com.sgcc.jibei.kafka.videoimage;

import cn.com.sgcc.jibei.util.PropertyUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple17;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Properties;


//@Component
public class KafkaToVideoimage {
//@PostConstruct
   public static void main (String [] args){
      System.out.println("================kafka to Videoimage start============");
      Properties pro=new Properties();
      pro.put("bootstrap.servers", PropertyUtil.get("kafka.hosts"));
      pro.put("zookeeper.connect", PropertyUtil.get("kafka.zookper"));
      pro.put("group.id", PropertyUtil.get("kafka.group"));
      StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
      env.getConfig().disableSysoutLogging(); 
      env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
      env.enableCheckpointing(5000);
      DataStream sourceStream=env.addSource(
            new FlinkKafkaConsumer08(PropertyUtil.get("kafka.topic_videoimage"), new SimpleStringSchema(), pro));



      DataStream > sourceStreamTra=sourceStream
          .filter(new FilterFunction() {
               public boolean filter(String value) throws Exception {
                  return StringUtils.isNotBlank(value)&&value.split("\t").length==17;
               }
            }).map(new MapFunction >() {
               private static final long serialVersionUID=1L;

               public Tuple17  map(String value) throws Exception {
                  String[] args=value.trim().split("\t");
                  

                  SimpleDateFormat f=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                  return new Tuple17(
                        args[3].equals("NULL")  null : args[3],
                        args[4].equals("NULL")  null : new BigDecimal(args[4]),
                        args[5].equals("NULL")  null : args[5],
                        args[6].equals("NULL")  null : args[6],
                        args[7],
                        args[8].equals("NULL")  null : args[8],//blob
                        args[9].equals("NULL")  null : args[9],
                        args[10].equals("NULL")  null : args[10],
                        args[11].equals("NULL")  null : args[11],
                        args[12].equals("NULL")  null : args[12],
                        args[13].equals("NULL")  null : args[13],
                        args[14].equals("NULL")  null : args[14],
                        args[15],
                        args[16].equals("NULL")  null : args[16],
                        args[0].equals("NULL")  null : args[0],
                        args[1],
                        args[2]

                  );
               }
            });
      System.out.println("===============!数据拉取成功标志!===============");
      sourceStreamTra.addSink(new VideoimageSink());
      try {
         env.execute("data to ODS_CMST_AIRPRESSURE start");
      } catch (Exception e) {
         e.printStackTrace();
      }
   }
}

2、自定义Oracle sink

package cn.com.sgcc.jibei.kafka.videoimage;

import cn.com.sgcc.jibei.util.PropertyUtil;
import oracle.sql.BLOB;
import org.apache.flink.api.java.tuple.Tuple17;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.io.OutputStream;
import java.math.BigDecimal;
import java.sql.*;
import java.text.SimpleDateFormat;

public class VideoimageSink extends RichSinkFunction> {
   private static final long serialVersionUID=1L;
   private Connection connection;
   private PreparedStatement preparedStatement;
   private ResultSet rs;
   String username=PropertyUtil.get("oracle.user");
   String password=PropertyUtil.get("oracle.pwd");;
   String drivername=PropertyUtil.get("oracle.driver");
   String dburl=PropertyUtil.get("oracle.url");
   byte[] value1=null;
   @Override
   public void invoke(
         Tuple17 value)
         throws Exception {
   //TODO Auto-generated method stub
      Class.forName(drivername);
      connection=DriverManager.getConnection(dburl, username, password);

   //connection.setAutoCommit(false);

      value1=value.f5.getBytes();

      String sql="insert into JBBDKF.SYNC_SBD_CMST_VIDEOIMAGE (obj_id,obj_dispidx," +
            "obj_caption,devicecode,acquisition_time,imagedata,channelno," +
            "presetposition,istypical,category,ishandled,imageorvideo,resave_time," +
            "checkresult,op_type,op_time,cur_time) " +
            "values(,,,,,,,,,,,,,,,,)";;

      preparedStatement=connection.prepareStatement(sql);

      SimpleDateFormat sDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      preparedStatement.setString(1, value.f0);
      preparedStatement.setBigDecimal(2, value.f1);
      preparedStatement.setString(3, value.f2);
      preparedStatement.setString(4, value.f3);
      preparedStatement.setTimestamp(5, value.f4.equals("NULL")null:new Timestamp(sDateFormat.parse(value.f4).getTime()));
      preparedStatement.setBlob(6, BLOB.empty_lob());//blob
      preparedStatement.setString(7, value.f6);
      preparedStatement.setString(8, value.f7);
      preparedStatement.setString(9, value.f8);
      preparedStatement.setString(10, value.f9);
      preparedStatement.setString(11, value.f10);
      preparedStatement.setString(12, value.f11);
      preparedStatement.setTimestamp(13, value.f12.equals("NULL")null:new Timestamp(sDateFormat.parse(value.f12).getTime()));
      preparedStatement.setString(14, value.f13);
      preparedStatement.setString(15,value.f14);
      preparedStatement.setTimestamp(16,value.f15.equals("NULL")null:new Timestamp(sDateFormat.parse(value.f15).getTime()));
      preparedStatement.setTimestamp(17,value.f16.equals("NULL")null:new Timestamp(sDateFormat.parse(value.f16).getTime()));

      int ii=preparedStatement.executeUpdate();
      System.out.println(ii);

      OutputStream os=null;
   //select获得cursor,并写入数据
      String sql2="select imagedata from JBBDKF.SYNC_SBD_CMST_VIDEOIMAGE where obj_id= for update";
      preparedStatement=connection.prepareStatement(sql2);
      preparedStatement.setString(1, value.f0);
      rs=preparedStatement.executeQuery();
      if (rs.next()) {
         BLOB blob=(BLOB) rs.getBlob("imagedata");
         os=blob.getBinaryOutputStream();
         os.write(value1, 0, value1.length);
      }
      os.flush();
      os.close();
      connection.commit();
      connection.close();

   }
}

相关推荐

咨询软件
 
QQ在线咨询
售前咨询热线
QQ1793040